Commit 5f917c8a authored by Barry Warsaw's avatar Barry Warsaw

Clean up pass through abompard's branch.

parent adb9e316
......@@ -40,20 +40,24 @@ log = logging.getLogger('mailman.error')
def make_link(header, pattern, chain=None):
"""Create a Link object.
The link action is always to defer, since at the end of all the header
checks, we'll jump to the chain defined in the configuration file, should
any of them have matched.
The link action is to defer by default, since at the end of all the
header checks, we'll jump to the chain defined in the configuration
file, should any of them have matched. However, it is possible to
create a link which jumps to a specific chain.
:param header: The email header name to check, e.g. X-Spam.
:type header: string
:param pattern: A regular expression for matching the header value.
:type pattern: string
:param chain: When given, this is the chain to jump to if the
pattern matches the header.
:type chain: string
:return: The link representing this rule check.
:rtype: `ILink`
"""
rule = HeaderMatchRule(header, pattern)
if chain is None:
return Link(rule, LinkAction.defer)
return Link(rule)
chain = config.chains[chain]
return Link(rule, LinkAction.jump, chain)
......@@ -135,15 +139,17 @@ class HeaderMatchChain(Chain):
parts = line.split(':', 1)
if len(parts) != 2:
log.error('Configuration error: [antispam]header_checks '
'contains bogus line: {0}'.format(line))
'contains bogus line: {}'.format(line))
continue
yield make_link(parts[0], parts[1].lstrip())
# Then return all the explicitly added links.
yield from self._extended_links
# If any of the above rules matched, jump to the chain
# defined in the configuration file. This takes precedence over
# list-specific matches for security considerations.
yield Link(config.rules['any'], LinkAction.jump,
# If any of the above rules matched, they will have deferred their
# action until now, so jump to the chain defined in the configuration
# file. For security considerations, this takes precedence over
# list-specific matches.
yield Link(config.rules['any'],
LinkAction.jump,
config.chains[config.antispam.jump_chain])
# Then return all the list-specific header matches.
for entry in mlist.header_matches:
......
......@@ -25,16 +25,16 @@ __all__ = [
import unittest
from mailman.app.lifecycle import create_list
from mailman.chains.headers import HeaderMatchRule
from mailman.chains.headers import HeaderMatchRule, make_link
from mailman.config import config
from mailman.core.chains import process
from mailman.email.message import Message
from mailman.interfaces.chain import LinkAction, HoldEvent
from mailman.interfaces.mailinglist import IHeaderMatchSet
from mailman.testing.layers import ConfigLayer
from mailman.testing.helpers import (
configuration, event_subscribers, get_queue_messages, LogFileMark,
LogFileMark, configuration, event_subscribers,
specialized_message_from_string as mfs)
from mailman.testing.layers import ConfigLayer
......@@ -46,6 +46,24 @@ class TestHeaderChain(unittest.TestCase):
def setUp(self):
self._mlist = create_list('[email protected]')
def test_make_link(self):
# Test that make_link() with no given chain creates a Link with a
# deferred link action.
link = make_link('Subject', '[tT]esting')
self.assertEqual(link.rule.header, 'Subject')
self.assertEqual(link.rule.pattern, '[tT]esting')
self.assertEqual(link.action, LinkAction.defer)
self.assertIsNone(link.chain)
def test_make_link_with_chain(self):
# Test that make_link() with a given chain creates a Link with a jump
# action to the chain.
link = make_link('Subject', '[tT]esting', 'accept')
self.assertEqual(link.rule.header, 'Subject')
self.assertEqual(link.rule.pattern, '[tT]esting')
self.assertEqual(link.action, LinkAction.jump)
self.assertEqual(link.chain, config.chains['accept'])
@configuration('antispam', header_checks="""
Foo: a+
Bar: bb?
......@@ -129,9 +147,9 @@ class TestHeaderChain(unittest.TestCase):
# mailing-list configuration.
chain = config.chains['header-match']
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('Foo', 'a+', None)
header_matches.add('Foo', 'a+')
links = [link for link in chain.get_links(self._mlist, Message(), {})
if link.rule.name != 'any']
if link.rule.name != 'any']
self.assertEqual(len(links), 1)
self.assertEqual(links[0].action, LinkAction.defer)
self.assertEqual(links[0].rule.header, 'foo')
......@@ -146,11 +164,12 @@ class TestHeaderChain(unittest.TestCase):
header_matches.add('Bar', 'b+', 'discard')
header_matches.add('Baz', 'z+', 'accept')
links = [link for link in chain.get_links(self._mlist, Message(), {})
if link.rule.name != 'any']
if link.rule.name != 'any']
self.assertEqual(len(links), 3)
self.assertListEqual(
[(link.rule.header, link.rule.pattern, link.action, link.chain.name)
for link in links],
self.assertEqual([
(link.rule.header, link.rule.pattern, link.action, link.chain.name)
for link in links
],
[('foo', 'a+', LinkAction.jump, 'reject'),
('bar', 'b+', LinkAction.jump, 'discard'),
('baz', 'z+', LinkAction.jump, 'accept'),
......
......@@ -18,7 +18,8 @@ from mailman.database.helpers import is_sqlite, exists_in_db
def upgrade():
# Create the new table
header_match_table = op.create_table('headermatch',
header_match_table = op.create_table(
'headermatch',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('mailing_list_id', sa.Integer(), nullable=True),
sa.Column('header', sa.Unicode(), nullable=False),
......@@ -26,17 +27,17 @@ def upgrade():
sa.Column('chain', sa.Unicode(), nullable=True),
sa.ForeignKeyConstraint(['mailing_list_id'], ['mailinglist.id'], ),
sa.PrimaryKeyConstraint('id')
)
# Now migrate the data. It can't be offline because we need to read the
)
# Now migrate the data. It can't be offline because we need to read the
# pickles.
connection = op.get_bind()
# Don't import the table definition from the models, it may break this
# migration when the model is updated in the future (see the Alembic doc).
mlist_table = sa.sql.table('mailinglist',
mlist_table = sa.sql.table(
'mailinglist',
sa.sql.column('id', sa.Integer),
sa.sql.column('header_matches', sa.PickleType)
)
)
for mlist_id, old_matches in connection.execute(mlist_table.select()):
for old_match in old_matches:
connection.execute(header_match_table.insert().values(
......@@ -44,8 +45,7 @@ def upgrade():
header=old_match[0],
pattern=old_match[1],
chain=None
))
))
# Now that data is migrated, drop the old column (except on SQLite which
# does not support this)
if not is_sqlite(connection):
......@@ -56,23 +56,25 @@ def downgrade():
if not exists_in_db(op.get_bind(), 'mailinglist', 'header_matches'):
# SQLite will not have deleted the former column, since it does not
# support column deletion.
op.add_column('mailinglist', sa.Column(
'header_matches', sa.PickleType, nullable=True))
# Now migrate the data. It can't be offline because we need to read the
op.add_column(
'mailinglist',
sa.Column('header_matches', sa.PickleType, nullable=True))
# Now migrate the data. It can't be offline because we need to read the
# pickles.
connection = op.get_bind()
# Don't import the table definition from the models, it may break this
# migration when the model is updated in the future (see the Alembic doc).
mlist_table = sa.sql.table('mailinglist',
mlist_table = sa.sql.table(
'mailinglist',
sa.sql.column('id', sa.Integer),
sa.sql.column('header_matches', sa.PickleType)
)
header_match_table = sa.sql.table('headermatch',
)
header_match_table = sa.sql.table(
'headermatch',
sa.sql.column('mailing_list_id', sa.Integer),
sa.sql.column('header', sa.Unicode),
sa.sql.column('pattern', sa.Unicode),
)
)
for mlist_id, header, pattern in connection.execute(
header_match_table.select()).fetchall():
mlist = connection.execute(mlist_table.select().where(
......@@ -84,5 +86,4 @@ def downgrade():
connection.execute(mlist_table.update().where(
mlist_table.c.id == mlist_id).values(
header_matches=header_matches))
op.drop_table('headermatch')
......@@ -46,9 +46,9 @@ class TestMigrations(unittest.TestCase):
md = sa.MetaData(bind=config.db.engine)
md.reflect()
# We have circular dependencies between user and address, thus we can't
# use drop_all() without getting a warning. Setting use_alter to True
# use drop_all() without getting a warning. Setting use_alter to True
# on the foreign keys helps SQLAlchemy mark those loops as known.
for tablename in ("user", "address"):
for tablename in ('user', 'address'):
if tablename not in md.tables:
continue
for fk in md.tables[tablename].foreign_keys:
......@@ -70,17 +70,19 @@ class TestMigrations(unittest.TestCase):
('test-header-1', 'test-pattern-1'),
('test-header-2', 'test-pattern-2'),
('test-header-3', 'test-pattern-3'),
]
mlist_table = sa.sql.table('mailinglist',
]
mlist_table = sa.sql.table(
'mailinglist',
sa.sql.column('id', sa.Integer),
sa.sql.column('header_matches', sa.PickleType)
)
header_match_table = sa.sql.table('headermatch',
header_match_table = sa.sql.table(
'headermatch',
sa.sql.column('mailing_list_id', sa.Integer),
sa.sql.column('header', sa.Unicode),
sa.sql.column('pattern', sa.Unicode),
)
# Downgrading
)
# Downgrading.
config.db.store.execute(mlist_table.insert().values(id=1))
config.db.store.execute(header_match_table.insert().values(
[{'mailing_list_id': 1, 'header': hm[0], 'pattern': hm[1]}
......@@ -92,7 +94,7 @@ class TestMigrations(unittest.TestCase):
self.assertEqual(results[0].header_matches, test_header_matches)
self.assertFalse(exists_in_db(config.db.engine, 'headermatch'))
config.db.store.commit()
# Upgrading
# Upgrading.
alembic.command.upgrade(alembic_cfg, '42756496720')
results = config.db.store.execute(
header_match_table.select()).fetchall()
......
......@@ -20,6 +20,7 @@
__all__ = [
'IAcceptableAlias',
'IAcceptableAliasSet',
'IHeaderMatch',
'IListArchiver',
'IListArchiverSet',
'IMailingList',
......@@ -843,7 +844,7 @@ class IListArchiverSet(Interface):
class IHeaderMatch(Interface):
"""A header matching rule for mailinglist messages."""
"""A mailing list-specific message header matching rule."""
mailing_list = Attribute(
"""The mailing list for the header match.""")
......@@ -868,26 +869,32 @@ class IHeaderMatchSet(Interface):
def clear():
"""Clear the set of header matching rules."""
def add(header, pattern, chain):
"""Add the given header matching rule to this mailinglist's set.
def add(header, pattern, chain=None):
"""Add the given header matching rule to this mailing list's set.
:param header: The email header to filter on. It will be converted to
lowercase for easier removal.
:param header: The email header to filter on. It will be converted to
lower case for consistency.
:type header: string
:param pattern: The regular expression to use.
:type pattern: string
:param chain: The chain to jump to, or None to use the site-wide
configuration. Defaults to None.
configuration. Defaults to None.
:type chain: string or None
:raises ValueError: there can be only one couple of header and pattern
for a mailinglist.
:raises ValueError: if the header/pattern pair already exists for this
mailing list.
"""
def remove(header, pattern):
"""Remove the given header matching rule from this mailinglist's set.
"""Remove the given header matching rule from this mailing list's set.
:param header: The email header part of the rule to be removed.
:type header: string
:param pattern: The regular expression part of the rule to be removed.
:type pattern: string
"""
def __iter__():
"""An iterator over all the IHeaderMatches defined in this set.
:return: iterator over `IHeaderMatch`.
"""
......@@ -37,8 +37,8 @@ from mailman.interfaces.digests import DigestFrequency
from mailman.interfaces.domain import IDomainManager
from mailman.interfaces.languages import ILanguageManager
from mailman.interfaces.mailinglist import (
IAcceptableAlias, IAcceptableAliasSet, IListArchiver, IListArchiverSet,
IHeaderMatch, IHeaderMatchSet, IMailingList, Personalization,
IAcceptableAlias, IAcceptableAliasSet, IHeaderMatch, IHeaderMatchSet,
IListArchiver, IListArchiverSet, IMailingList, Personalization,
ReplyToMunging, SubscriptionPolicy)
from mailman.interfaces.member import (
AlreadySubscribedError, MemberRole, MissingPreferredAddressError,
......@@ -684,3 +684,8 @@ class HeaderMatchSet:
raise ValueError('Pattern does not exist')
else:
self._mailing_list.header_matches.remove(existing)
@dbconnection
def __iter__(self, store):
yield from store.query(HeaderMatch).filter(
HeaderMatch.mailing_list == self._mailing_list)
......@@ -173,37 +173,48 @@ class TestHeaderMatch(unittest.TestCase):
self._mlist = create_list('[email protected]')
def test_lowercase_header(self):
with transaction():
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertEqual(self._mlist.header_matches[0].header, 'header')
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertEqual(self._mlist.header_matches[0].header, 'header')
def test_chain_defaults_to_none(self):
with transaction():
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertEqual(self._mlist.header_matches[0].chain, None)
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertEqual(self._mlist.header_matches[0].chain, None)
def test_duplicate(self):
with transaction():
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('Header', 'pattern')
self.assertRaises(ValueError,
header_matches.add, 'Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('Header', 'pattern')
self.assertRaises(
ValueError, header_matches.add, 'Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
def test_remove_non_existent(self):
with transaction():
header_matches = IHeaderMatchSet(self._mlist)
self.assertRaises(ValueError,
header_matches.remove, 'header', 'pattern')
header_matches = IHeaderMatchSet(self._mlist)
self.assertRaises(
ValueError, header_matches.remove, 'header', 'pattern')
def test_add_remove(self):
with transaction():
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
header_matches.remove('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 0)
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
header_matches.remove('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 0)
def test_iterator(self):
header_matches = IHeaderMatchSet(self._mlist)
header_matches.add('Header', 'pattern')
header_matches.add('Subject', 'patt.*')
header_matches.add('From', '.*@example.com', 'discard')
header_matches.add('From', '.*@example.org', 'accept')
matches = sorted((match.header, match.pattern, match.chain)
for match in IHeaderMatchSet(self._mlist))
self.assertEqual(
matches,
[('from', '.*@example.com', 'discard'),
('from', '.*@example.org', 'accept'),
('header', 'pattern', None),
('subject', 'patt.*', None),
])
......@@ -120,11 +120,11 @@ List-specific header matching
Each mailing list can also be configured with a set of header matching regular
expression rules. These can be used to impose list-specific header filtering
with the same semantics as the global `[antispam]` section, or to have a
with the same semantics as the global ``[antispam]`` section, or to have a
different action.
To follow the global antispam action, the header match rule must not specify a
`chain` to jump to. If the default antispam action is changed in the
``chain`` to jump to. If the default antispam action is changed in the
configuration file and Mailman is restarted, those rules will get the new jump
action.
......@@ -147,8 +147,8 @@ A message with a spam score of two pluses does not match.
x-spam-score: [+]{3,}
But a message with a spam score of three pluses does match. Because a message
with the previous Message-Id is already in the moderation queue, we need to
give this message a new Message-Id.
with the previous ``Message-Id`` is already in the moderation queue, we need
to give this message a new ``Message-Id``.
>>> msgdata = {}
>>> del msg['x-spam-score']
......@@ -174,8 +174,8 @@ As does a message with a spam score of four pluses.
x-spam-score: [+]{3,}
No rules missed
Now, the list administrator wants to match on three plus signs, but wants those
emails to be discarded instead of held.
Now, the list administrator wants to match on three plus signs, but wants
those emails to be discarded instead of held.
>>> header_matches.remove('x-spam-score', '[+]{3,}')
>>> header_matches.add('x-spam-score', '[+]{3,}', 'discard')
......@@ -187,10 +187,10 @@ will be discarded.
>>> del msg['x-spam-score']
>>> msg['X-Spam-Score'] = '+++'
>>> del msg['message-id']
>>> msg['Message-Id'] = '<dee>'
>>> msg['Message-Id'] = '<dog>'
>>> with event_subscribers(handler):
... process(mlist, msg, msgdata, 'header-match')
DiscardEvent discard <dee>
DiscardEvent discard <dog>
>>> hits_and_misses(msgdata)
Rule hits:
x-spam-score: [+]{3,}
......
......@@ -39,7 +39,6 @@ from mailman.interfaces.archiver import ArchivePolicy
from mailman.interfaces.autorespond import ResponseAction
from mailman.interfaces.bans import IBanManager
from mailman.interfaces.bounce import UnrecognizedBounceDisposition
from mailman.interfaces.chain import LinkAction
from mailman.interfaces.digests import DigestFrequency
from mailman.interfaces.languages import ILanguageManager
from mailman.interfaces.mailinglist import IAcceptableAliasSet, IHeaderMatchSet
......@@ -132,18 +131,18 @@ def nonmember_action_mapping(value):
def action_to_chain(value):
# Converts an action number in Mailman 2.1 to the name of the corresponding
# chain in 3.x. The actions "approve", "subscribe" and "unsubscribe" are
# ignored. The defer action is converted to None, because it is not a jump
# to a terminal chain.
# chain in 3.x. The actions 'approve', 'subscribe' and 'unsubscribe' are
# ignored. The defer action is converted to None, because it is not
# a jump to a terminal chain.
return {
0: None,
#1: "approve",
2: "reject",
3: "discard",
#4: "subscribe",
#5: "unsubscribe",
6: "accept",
7: "hold",
#1: 'approve',
2: 'reject',
3: 'discard',
#4: 'subscribe',
#5: 'unsubscribe',
6: 'accept',
7: 'hold',
}[value]
......@@ -333,7 +332,7 @@ def import_config_pck(mlist, config_dict):
# When .add() rejects this, the line probably contains a regular
# expression. Make that explicit for MM3.
alias_set.add('^' + address)
# Handle header_filter_rules conversion to header_matches
# Handle header_filter_rules conversion to header_matches.
header_match_set = IHeaderMatchSet(mlist)
header_filter_rules = config_dict.get('header_filter_rules', [])
for line_patterns, action, _unused in header_filter_rules:
......@@ -343,26 +342,28 @@ def import_config_pck(mlist, config_dict):
log.warning('Unsupported header_filter_rules action: %r',
action)
continue
# now split the pattern in a header and a pattern
# Now split the line into a header and a pattern.
for line_pattern in line_patterns.splitlines():
if not line_pattern.strip():
if len(line_pattern.strip()) == 0:
continue
for sep in (': ', ':.', ':'):
header, sep, pattern = line_pattern.partition(sep)
if sep:
break # found it.
# We found it.
break
else:
# matches any header. Those are not supported. XXX
# Matches any header, which is not supported. XXX
log.warning('Unsupported header_filter_rules pattern: %r',
line_pattern)
continue
header = header.strip().lstrip("^").lower()
header = header.strip().lstrip('^').lower()
header = header.replace('\\', '')
if not header:
log.warning('Can\'t parse the header in header_filter_rule: %r',
line_pattern)
log.warning(
'Cannot parse the header in header_filter_rule: %r',
line_pattern)
continue
if not pattern:
if len(pattern) == 0:
# The line matched only the header, therefore the header can
# be anything.
pattern = '.*'
......
......@@ -44,7 +44,7 @@ from mailman.interfaces.bans import IBanManager
from mailman.interfaces.bounce import UnrecognizedBounceDisposition
from mailman.interfaces.languages import ILanguageManager
from mailman.interfaces.mailinglist import (
IAcceptableAliasSet, IHeaderMatchSet, SubscriptionPolicy)
IAcceptableAliasSet, SubscriptionPolicy)
from mailman.interfaces.member import DeliveryMode, DeliveryStatus
from mailman.interfaces.nntp import NewsgroupModeration
from mailman.interfaces.templates import ITemplateLoader
......@@ -344,7 +344,8 @@ class TestBasicImport(unittest.TestCase):
('X-Git-Module: rhq.*git', 6, False),
('Approved: verysecretpassword', 6, False),
('^Subject: dev-\r\n^Subject: staging-', 3, False),
('from: .*[email protected]\r\nfrom: .*@jw-express.com', 2, False),
('from: .*[email protected]\r\nfrom: .*@jw-express.com',
2, False),
('^Received: from smtp-.*\\.fedoraproject\\.org\r\n'
'^Received: from mx.*\\.redhat.com\r\n'
'^Resent-date:\r\n'
......@@ -356,15 +357,17 @@ class TestBasicImport(unittest.TestCase):
('^Received: from fedorahosted\\.org.*by fedorahosted\\.org\r\n'
'^Received: from hosted.*\\.fedoraproject.org.*by '
'hosted.*\\.fedoraproject\\.org\r\n'
'^Received: from hosted.*\\.fedoraproject.org.*by fedoraproject\\.org\r\n'
'^Received: from hosted.*\\.fedoraproject.org.*by fedorahosted\\.org',
'^Received: from hosted.*\\.fedoraproject.org.*by '
'fedoraproject\\.org\r\n'
'^Received: from hosted.*\\.fedoraproject.org.*by '
'fedorahosted\\.org',
6, False),
]
error_log = LogFileMark('mailman.error')
self._import()
self.assertListEqual(
[ (hm.header, hm.pattern, hm.chain)
for hm in self._mlist.header_matches ], [
[(hm.header, hm.pattern, hm.chain)
for hm in self._mlist.header_matches ], [
('x-spam-status', 'Yes.*', 'discard'),
('x-spam-status', 'Yes', 'reject'),
('x-spam-level', '\\*\\*\\*.*$', 'discard'),
......@@ -386,19 +389,26 @@ class TestBasicImport(unittest.TestCase):
('resent-message-id', '.*', 'hold'),
('resent-to', '.*', 'hold'),
('subject', '[^mtv]', 'hold'),
('received', 'from fedorahosted\\.org.*by fedorahosted\\.org', 'accept'),
('received', 'from hosted.*\\.fedoraproject.org.*by hosted.*\\.fedoraproject\\.org', 'accept'),
('received', 'from hosted.*\\.fedoraproject.org.*by fedoraproject\\.org', 'accept'),
('received', 'from hosted.*\\.fedoraproject.org.*by fedorahosted\\.org', 'accept'),
('received', 'from fedorahosted\\.org.*by fedorahosted\\.org',
'accept'),
('received',
'from hosted.*\\.fedoraproject.org.*by '
'hosted.*\\.fedoraproject\\.org', 'accept'),
('received',
'from hosted.*\\.fedoraproject.org.*by '
'fedoraproject\\.org', 'accept'),
('received',
'from hosted.*\\.fedoraproject.org.*by '
'fedorahosted\\.org', 'accept'),
])
loglines = error_log.read().strip()
self.assertEqual(len(loglines), 0)
def test_header_matches_header_only(self):
# Check that an empty pattern is skipped
# Check that an empty pattern is skipped.
self._pckdict['header_filter_rules'] = [
('SomeHeaderName', 3, False),
]
]
error_log = LogFileMark('mailman.error')
self._import()
self.assertListEqual(self._mlist.header_matches, [])
......@@ -406,10 +416,10 @@ class TestBasicImport(unittest.TestCase):
error_log.readline())
def test_header_matches_anything(self):
# Check that an empty pattern is skipped
# Check that a wild card header pattern is skipped.
self._pckdict['header_filter_rules'] = [
('.*', 7, False),
]
]
error_log = LogFileMark('mailman.error')
self._import()
self.assertListEqual(self._mlist.header_matches, [])
......@@ -417,10 +427,10 @@ class TestBasicImport(unittest.TestCase):
error_log.readline())
def test_header_matches_invalid_re(self):
# Check that an empty pattern is skipped
# Check that an invalid regular expression pattern is skipped.
self._pckdict['header_filter_rules'] = [
('SomeHeaderName: *invalid-re', 3, False),
]
]
error_log = LogFileMark('mailman.error')
self._import()
self.assertListEqual(self._mlist.header_matches, [])
......@@ -431,20 +441,20 @@ class TestBasicImport(unittest.TestCase):
# Check that a defer action is properly converted.
self._pckdict['header_filter_rules'] = [
('^X-Spam-Status: Yes', 0, False),
]
]
self._import()
self.assertListEqual(
[ (hm.header, hm.pattern, hm.chain)
for hm in self._mlist.header_matches ],
[ ('x-spam-status', 'Yes', None) ]
)
[(hm.header, hm.pattern, hm.chain)
for hm in self._mlist.header_matches],
[('x-spam-status', 'Yes', None)]
)