Commit 1ebb7654 authored by Barry Warsaw's avatar Barry Warsaw

Optimize pendings.

Closes !68
parents 33a64ede ff84c2bb
Pipeline #550015 passed with stage
......@@ -176,6 +176,7 @@ class ProbeVERP(_BaseVERPParser):
@implementer(IPendable)
class _ProbePendable(dict):
"""The pendable dictionary for probe messages."""
PEND_TYPE = 'probe'
def send_probe(member, msg):
......
......@@ -168,6 +168,7 @@ however the message metadata indicates that the message has been approved.
_parsemsg : False
approved : True
moderator_approved: True
type : data
version : 3
......
......@@ -43,7 +43,7 @@ log = logging.getLogger('mailman.error')
@implementer(IPendable)
class PendableRegistration(dict):
PEND_KEY = 'registration'
PEND_TYPE = 'registration'
......
......@@ -68,7 +68,7 @@ class WhichSubscriber(Enum):
@implementer(IPendable)
class Pendable(dict):
pass
PEND_TYPE = 'subscription'
......
......@@ -207,9 +207,9 @@ Message-ID: <first>
# corresponds to a record in the pending database.
token = send_probe(self._member, self._msg)
pendable = getUtility(IPendings).confirm(token)
self.assertEqual(len(pendable.items()), 2)
self.assertEqual(len(pendable.items()), 3)
self.assertEqual(set(pendable.keys()),
set(['member_id', 'message_id']))
set(['member_id', 'message_id', 'type']))
# member_ids are pended as unicodes.
self.assertEqual(uuid.UUID(hex=pendable['member_id']),
self._member.member_id)
......
......@@ -55,7 +55,7 @@ log = logging.getLogger('mailman.vette')
@implementer(IPendable)
class HeldMessagePendable(dict):
PEND_KEY = 'held message'
PEND_TYPE = 'held message'
......@@ -149,8 +149,7 @@ class HoldChain(TerminalChainBase):
request_id = hold_message(mlist, msg, msgdata, None)
# Calculate a confirmation token to send to the author of the
# message.
pendable = HeldMessagePendable(type=HeldMessagePendable.PEND_KEY,
id=request_id)
pendable = HeldMessagePendable(id=request_id)
token = getUtility(IPendings).add(pendable)
# Get the language to send the response in. If the sender is a
# member, then send it in the member's language, otherwise send it in
......
"""Pendable indexes
Add indexes on Pendable fields that can be queried upon.
Revision ID: 47294d3a604
Revises: 33bc0099223
Create Date: 2015-12-02 11:46:47.295174
"""
# revision identifiers, used by Alembic.
revision = '47294d3a604'
down_revision = '3e09bb4a5dc'
import json
import sqlalchemy as sa
from alembic import op
TYPE_CLUES = {
'member_id': 'probe',
'token_owner': 'subscription',
'_mod_message_id': 'data',
}
pended_table = sa.sql.table(
'pended',
sa.sql.column('id', sa.Integer),
)
keyvalue_table = sa.sql.table(
'pendedkeyvalue',
sa.sql.column('id', sa.Integer),
sa.sql.column('key', sa.Unicode),
sa.sql.column('value', sa.Unicode),
sa.sql.column('pended_id', sa.Integer),
)
def upgrade():
op.create_index(
op.f('ix_pended_expiration_date'), 'pended', ['expiration_date'],
unique=False)
op.create_index(op.f('ix_pended_token'), 'pended', ['token'], unique=False)
op.create_index(
op.f('ix_pendedkeyvalue_key'), 'pendedkeyvalue', ['key'], unique=False)
op.create_index(
op.f('ix_pendedkeyvalue_value'), 'pendedkeyvalue', ['value'],
unique=False)
# Data migration.
connection = op.get_bind()
for pended_result in connection.execute(pended_table.select()).fetchall():
pended_id = pended_result['id']
keyvalues = connection.execute(keyvalue_table.select().where(
keyvalue_table.c.pended_id == pended_id
)).fetchall()
kv_type = [kv for kv in keyvalues if kv['key'] == 'type']
if kv_type:
# Convert existing type keys from JSON to plain text.
# The (pended_id, key) tuple is unique.
kv_type = kv_type[0]
try:
new_value = json.loads(kv_type['value'])
except ValueError:
# New-style entry (or already converted).
pass
else:
connection.execute(keyvalue_table.update().where(
keyvalue_table.c.id == kv_type['id']
).values(value=new_value))
else:
# Detect the type and add the corresponding type key.
keys = [kv['key'] for kv in keyvalues]
for clue_key, clue_type in TYPE_CLUES.items():
if clue_key not in keys:
continue
# We found the type, update the DB.
connection.execute(keyvalue_table.insert().values(
key='type', value=clue_type, pended_id=pended_id))
break
def downgrade():
# Data migration.
connection = op.get_bind()
# Remove the introduced type keys.
connection.execute(keyvalue_table.delete().where(sa.and_(
keyvalue_table.c.key == 'type',
keyvalue_table.c.value.in_(TYPE_CLUES.values())
)))
# Convert the other type keys to JSON.
keyvalues = connection.execute(keyvalue_table.select().where(
keyvalue_table.c.key == 'type')).fetchall()
for keyvalue in keyvalues:
connection.execute(keyvalue_table.update().where(
keyvalue_table.c.id == keyvalue['id']
).values(value=json.dumps(keyvalue['value'])))
# Remove indexes.
op.drop_index(op.f('ix_pendedkeyvalue_value'), table_name='pendedkeyvalue')
op.drop_index(op.f('ix_pendedkeyvalue_key'), table_name='pendedkeyvalue')
op.drop_index(op.f('ix_pended_token'), table_name='pended')
op.drop_index(op.f('ix_pended_expiration_date'), table_name='pended')
......@@ -30,6 +30,7 @@ from mailman.config import config
from mailman.database.alembic import alembic_cfg
from mailman.database.helpers import exists_in_db
from mailman.database.model import Model
from mailman.database.transaction import transaction
from mailman.testing.layers import ConfigLayer
......@@ -100,3 +101,71 @@ class TestMigrations(unittest.TestCase):
header_match_table.select()).fetchall()
self.assertEqual(results,
[(1, hm[0], hm[1]) for hm in test_header_matches])
def test_47294d3a604_pendable_keyvalues(self):
# We have 5 pended items:
# - one is a probe request
# - one is a subscription request
# - one is a moderation request
# - one is a held message
# - one is a registration request in the new format
#
# The first three used to have no 'type' key and must be properly
# typed, the held message used to have a type key, but in JSON, and
# must be converted.
pended_table = sa.sql.table(
'pended',
sa.sql.column('id', sa.Integer),
)
keyvalue_table = sa.sql.table(
'pendedkeyvalue',
sa.sql.column('id', sa.Integer),
sa.sql.column('key', sa.Unicode),
sa.sql.column('value', sa.Unicode),
sa.sql.column('pended_id', sa.Integer),
)
def get_from_db():
results = {}
for i in range(1, 6):
query = sa.sql.select(
[keyvalue_table.c.key, keyvalue_table.c.value]
).where(
keyvalue_table.c.pended_id == i
)
results[i] = dict([
(r['key'], r['value']) for r in
config.db.store.execute(query).fetchall()
])
return results
# Start at the previous revision
with transaction():
alembic.command.downgrade(alembic_cfg, '33bc0099223')
for i in range(1, 6):
config.db.store.execute(pended_table.insert().values(id=i))
config.db.store.execute(keyvalue_table.insert().values([
{'pended_id': 1, 'key': 'member_id', 'value': 'test-value'},
{'pended_id': 2, 'key': 'token_owner', 'value': 'test-value'},
{'pended_id': 3, 'key': '_mod_message_id',
'value': 'test-value'},
{'pended_id': 4, 'key': 'type', 'value': '"held message"'},
{'pended_id': 5, 'key': 'type', 'value': 'registration'},
]))
# Upgrading.
with transaction():
alembic.command.upgrade(alembic_cfg, '47294d3a604')
results = get_from_db()
for i in range(1, 5):
self.assertIn('type', results[i])
self.assertEqual(results[1]['type'], 'probe')
self.assertEqual(results[2]['type'], 'subscription')
self.assertEqual(results[3]['type'], 'data')
self.assertEqual(results[4]['type'], 'held message')
self.assertEqual(results[5]['type'], 'registration')
# Downgrading.
with transaction():
alembic.command.downgrade(alembic_cfg, '33bc0099223')
results = get_from_db()
for i in range(1, 4):
self.assertNotIn('type', results[i])
self.assertEqual(results[4]['type'], '"held message"')
self.assertEqual(results[5]['type'], '"registration"')
......@@ -37,6 +37,14 @@ from zope.interface import Interface, Attribute
class IPendable(Interface):
"""A pendable object."""
PEND_TYPE = Attribute(
"""The type of this pendable.
Subclasses must define this attribute, and it must be a unique string;
it's used to efficiently search for all pendables of the given type.
The PEND_TYPE "type" is reserved.
""")
def keys():
"""The keys of the pending event data, all of which are strings."""
......@@ -95,6 +103,16 @@ class IPendings(Interface):
def evict():
"""Remove all pended items whose lifetime has expired."""
def find(mlist=None, pend_type=None):
"""Search for the pendables matching the given criteria.
:param mlist: The MailingList object that the pendables must be
related to.
:param pend_type: The type of the pendables that are looked for, this
corresponds to the `PEND_TYPE` attribute.
:return: An iterator over 2-tuples of the form (token, dict).
"""
def __iter__():
"""An iterator over all pendables.
......
......@@ -26,10 +26,9 @@ token that can be used in urls and such.
>>> from mailman.interfaces.pending import IPendable
>>> @implementer(IPendable)
... class SimplePendable(dict):
... pass
... PEND_TYPE = 'subscription'
>>> subscription = SimplePendable(
... type='subscription',
... address='[email protected]',
... display_name='Anne Person',
... language='en',
......@@ -44,8 +43,14 @@ There's exactly one entry in the pendings database now.
1
You can *confirm* the pending, which means returning the `IPendable` structure
(as a dictionary) from the database that matches the token. If the token
isn't in the database, None is returned.
(as a dictionary) from the database that matches the token.
All `IPendable` classes have a `PEND_TYPE` attribute which must be a string.
It is used to identify and query pendables in the database, and will be
returned as the `type` key in the dictionary. Thus `type` is a reserved key
and pendables may not otherwise set it.
If the token isn't in the database, None is returned.
>>> pendable = pendingdb.confirm(b'missing')
>>> print(pendable)
......
......@@ -124,6 +124,7 @@ an additional key which holds the name of the request type.
_request_type: held_message
bar : no
foo : yes
type : data
Iterating over requests
......@@ -146,6 +147,7 @@ over by type.
_request_type: held_message
bar: no
foo: yes
type: data
Deleting requests
......
......@@ -35,8 +35,8 @@ from mailman.database.transaction import dbconnection
from mailman.interfaces.pending import (
IPendable, IPended, IPendedKeyValue, IPendings)
from mailman.utilities.datetime import now
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Unicode
from sqlalchemy.orm import relationship
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Unicode, and_
from sqlalchemy.orm import aliased, relationship
from zope.interface import implementer
from zope.interface.verify import verifyObject
......@@ -49,8 +49,8 @@ class PendedKeyValue(Model):
__tablename__ = 'pendedkeyvalue'
id = Column(Integer, primary_key=True)
key = Column(Unicode)
value = Column(Unicode)
key = Column(Unicode, index=True)
value = Column(Unicode, index=True)
pended_id = Column(Integer, ForeignKey('pended.id'), index=True)
def __init__(self, key, value):
......@@ -66,20 +66,15 @@ class Pended(Model):
__tablename__ = 'pended'
id = Column(Integer, primary_key=True)
token = Column(Unicode)
expiration_date = Column(DateTime)
key_values = relationship('PendedKeyValue')
def __init__(self, token, expiration_date):
super(Pended, self).__init__()
self.token = token
self.expiration_date = expiration_date
token = Column(Unicode, index=True)
expiration_date = Column(DateTime, index=True)
key_values = relationship('PendedKeyValue', cascade="all, delete-orphan")
@implementer(IPendable)
class UnpendedPendable(dict):
pass
PEND_TYPE = 'unpended'
......@@ -114,7 +109,13 @@ class Pendings:
pending = Pended(
token=token,
expiration_date=now() + lifetime)
pendable_type = pendable.get('type', pendable.PEND_TYPE)
pending.key_values.append(
PendedKeyValue(key='type', value=pendable_type))
for key, value in pendable.items():
# The type has been handled above.
if key == 'type':
continue
# Both keys and values must be strings.
if isinstance(key, bytes):
key = key.decode('utf-8')
......@@ -138,17 +139,18 @@ class Pendings:
'Unexpected token count: {0}'.format(pendings.count()))
pending = pendings[0]
pendable = UnpendedPendable()
# Find all PendedKeyValue entries that are associated with the pending
# object's ID. Watch out for type conversions.
entries = store.query(PendedKeyValue).filter(
PendedKeyValue.pended_id == pending.id)
for keyvalue in entries:
value = json.loads(keyvalue.value)
# Iterate on PendedKeyValue entries that are associated with the
# pending object's ID. Watch out for type conversions.
for keyvalue in pending.key_values:
# The `type` key is special and served. It is not JSONified. See
# the IPendable interface for details.
if keyvalue.key == 'type':
value = keyvalue.value
else:
value = json.loads(keyvalue.value)
if isinstance(value, dict) and '__encoding__' in value:
value = value['value'].encode(value['__encoding__'])
pendable[keyvalue.key] = value
if expunge:
store.delete(keyvalue)
if expunge:
store.delete(pending)
return pendable
......@@ -158,14 +160,26 @@ class Pendings:
right_now = now()
for pending in store.query(Pended).all():
if pending.expiration_date < right_now:
# Find all PendedKeyValue entries that are associated with the
# pending object's ID.
q = store.query(PendedKeyValue).filter(
PendedKeyValue.pended_id == pending.id)
for keyvalue in q:
store.delete(keyvalue)
store.delete(pending)
@dbconnection
def find(self, store, mlist=None, pend_type=None):
query = store.query(Pended)
if mlist is not None:
pkv_alias_mlist = aliased(PendedKeyValue)
query = query.join(pkv_alias_mlist).filter(and_(
pkv_alias_mlist.key == 'list_id',
pkv_alias_mlist.value == json.dumps(mlist.list_id)
))
if pend_type is not None:
pkv_alias_type = aliased(PendedKeyValue)
query = query.join(pkv_alias_type).filter(and_(
pkv_alias_type.key == 'type',
pkv_alias_type.value == pend_type
))
for pending in query:
yield pending.token, self.confirm(pending.token, expunge=False)
@dbconnection
def __iter__(self, store):
for pending in store.query(Pended).all():
......
......@@ -41,6 +41,8 @@ from zope.interface import implementer
class DataPendable(dict):
"""See `IPendable`."""
PEND_TYPE = 'data'
def update(self, mapping):
# Keys and values must be strings (unicodes, but bytes values are
# accepted for now). Any other types for keys are a programming
......
# Copyright (C) 2015 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
"""Test pendings."""
__all__ = [
'TestPendings',
]
import unittest
from mailman.app.lifecycle import create_list
from mailman.config import config
from mailman.interfaces.pending import IPendable, IPendings
from mailman.model.pending import PendedKeyValue
from mailman.testing.layers import ConfigLayer
from zope.component import getUtility
from zope.interface import implementer
@implementer(IPendable)
class SimplePendable(dict):
PEND_TYPE = 'simple'
class TestPendings(unittest.TestCase):
"""Test pendings."""
layer = ConfigLayer
def test_delete_key_values(self):
# Deleting a pending should delete its key-values.
pendingdb = getUtility(IPendings)
subscription = SimplePendable(
type='subscription',
address='[email protected]',
display_name='Anne Person',
language='en',
password='xyz')
token = pendingdb.add(subscription)
self.assertEqual(pendingdb.count, 1)
pendingdb.confirm(token)
self.assertEqual(pendingdb.count, 0)
self.assertEqual(config.db.store.query(PendedKeyValue).count(), 0)
def test_find(self):
# Test getting pendables for a mailing-list.
mlist = create_list('list1[email protected]')
pendingdb = getUtility(IPendings)
subscription_1 = SimplePendable(
type='subscription',
list_id='list1.example.com')
subscription_2 = SimplePendable(
type='subscription',
list_id='list2.example.com')
subscription_3 = SimplePendable(
type='hold request',
list_id='list1.example.com')
subscription_4 = SimplePendable(
type='hold request',
list_id='list2.example.com')
token_1 = pendingdb.add(subscription_1)
pendingdb.add(subscription_2)
token_3 = pendingdb.add(subscription_3)
token_4 = pendingdb.add(subscription_4)
self.assertEqual(pendingdb.count, 4)
# Find the pending subscription in list1.
pendings = list(pendingdb.find(mlist=mlist, pend_type='subscription'))
self.assertEqual(len(pendings), 1)
self.assertEqual(pendings[0][0], token_1)
self.assertEqual(pendings[0][1]['list_id'], 'list1.example.com')
# Find all pending hold requests.
pendings = list(pendingdb.find(pend_type='hold request'))
self.assertEqual(len(pendings), 2)
self.assertSetEqual(
set((p[0], p[1]['list_id']) for p in pendings),
{(token_3, 'list1.example.com'), (token_4, 'list2.example.com')}
)
# Find all pendings for list1.
pendings = list(pendingdb.find(mlist=mlist))
self.assertEqual(len(pendings), 2)
self.assertSetEqual(
set((p[0], p[1]['list_id'], p[1]['type']) for p in pendings),
{(token_1, 'list1.example.com', 'subscription'),
(token_3, 'list1.example.com', 'hold request')}
)
......@@ -51,6 +51,7 @@ The subscription request can be viewed in the REST API.
list_id: ant.example.com
token: ...
token_owner: moderator
type: subscription
when: 2005-08-01T07:49:23
http_etag: "..."
start: 0
......@@ -71,6 +72,7 @@ You can view an individual membership change request by providing the token
list_id: ant.example.com
token: ...
token_owner: moderator
type: subscription
when: 2005-08-01T07:49:23
......
......@@ -122,22 +122,9 @@ class SubscriptionRequests(_ModerationBase, CollectionMixin):
self._mlist = mlist
def _get_collection(self, request):
# There's currently no better way to query the pendings database for
# all the entries that are associated with subscription holds on this
# mailing list. Brute force iterating over all the pendables.
collection = []
for token, pendable in getUtility(IPendings):
if 'token_owner' not in pendable:
# This isn't a subscription hold.
continue
list_id = pendable.get('list_id')
if list_id != self._mlist.list_id:
# Either there isn't a list_id field, in which case it can't
# be a subscription hold, or this is a hold for some other
# mailing list.
continue
collection.append(token)
return collection
pendings = getUtility(IPendings).find(
mlist=self._mlist, pend_type='subscription')
return [token for token, pendable in pendings]
def on_get(self, request, response):
"""/lists/listname/requests"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment