Commit 1e25ccd5 authored by J08nY's avatar J08nY

Save the complete workflow stack, not only last step.

parent 057bc070
Pipeline #11314652 passed with stage
in 18 minutes and 28 seconds
......@@ -153,7 +153,7 @@ class TestWorkflow(unittest.TestCase):
# Save the state of an old version of the workflow that would not have
# the cat attribute.
state_manager.save(
self._workflow.token, 'first',
self._workflow.token, '["first"]',
json.dumps({'ant': 1, 'bee': 2}))
# Restore in the current version that needs the cat attribute.
new_workflow = MyWorkflow()
......
......@@ -56,7 +56,7 @@ class Workflow:
self._next.append(step)
def _pop(self):
name = self._next.popleft()
name = self._next.pop()
step = getattr(self, '_step_{}'.format(name))
self._count += 1
if self.debug: # pragma: nocover
......@@ -119,20 +119,12 @@ class Workflow:
assert self.token, 'Workflow token must be set'
state_manager = getUtility(IWorkflowStateManager)
data = {attr: getattr(self, attr) for attr in self.SAVE_ATTRIBUTES}
# Note: only the next step is saved, not the whole stack. This is not
# an issue in practice, since there's never more than a single step in
# the queue anyway. If we want to support more than a single step in
# the queue *and* want to support state saving/restoring, change this
# method and the restore() method.
# Save the workflow stack.
if len(self._next) == 0:
step = None
elif len(self._next) == 1:
step = self._next[0]
steps = '[]'
else:
raise AssertionError(
"Can't save a workflow state with more than one step "
"in the queue")
state_manager.save(self.token, step, json.dumps(data))
steps = json.dumps(list(self._next))
state_manager.save(self.token, steps, json.dumps(data))
def restore(self):
state_manager = getUtility(IWorkflowStateManager)
......@@ -141,8 +133,8 @@ class Workflow:
# The token doesn't exist in the database.
raise LookupError(self.token)
self._next.clear()
if state.step:
self._next.append(state.step)
if state.steps:
self._next.extend(json.loads(state.steps))
data = json.loads(state.data)
for attr in self.SAVE_ATTRIBUTES:
try:
......
# Copyright (C) 2015-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
"""Save whole workflow stack in workflowstate.
Revision ID: 7c5b39d1ecc4
Revises: 4bd95c99b2e7
Create Date: 2017-07-04 16:45:36.470746
"""
import json
import sqlalchemy as sa
from alembic import op
from mailman.database.types import SAUnicode
# revision identifiers, used by Alembic.
revision = '7c5b39d1ecc4'
down_revision = '4bd95c99b2e7'
old_state_table = sa.sql.table(
'workflowstate',
sa.sql.column('token', SAUnicode),
sa.sql.column('step', SAUnicode),
sa.sql.column('data', SAUnicode)
)
new_state_table = sa.sql.table(
'workflowstate',
sa.sql.column('token', SAUnicode),
sa.sql.column('steps', SAUnicode),
sa.sql.column('data', SAUnicode)
)
def upgrade():
# Rename the column
with op.batch_alter_table('workflowstate') as batch_op:
batch_op.alter_column('step', new_column_name='steps',
existing_type=SAUnicode)
connection = op.get_bind()
for token, step, data in connection.execute(
new_state_table.select()).fetchall():
# Wrap the single step in a JSON array.
if step is not None:
new_steps = json.dumps([step])
else:
new_steps = '[]'
connection.execute(
new_state_table.update().where(
new_state_table.c.token == token).values(
steps=new_steps)
)
def downgrade():
# Rename back the column
connection = op.get_bind()
with op.batch_alter_table('workflowstate') as batch_op:
batch_op.alter_column('steps', new_column_name='step',
existing_type=SAUnicode)
for token, steps, data in connection.execute(
old_state_table.select()).fetchall():
# Extract and save at least the last state.
step_stack = json.loads(steps)
if len(step_stack) == 0:
new_step = None
else:
new_step = step_stack.pop()
connection.execute(
old_state_table.update().where(
old_state_table.c.token == token).values(
step=new_step)
)
......@@ -18,6 +18,7 @@
"""Test database schema migrations with Alembic"""
import os
import json
import unittest
import sqlalchemy as sa
import alembic.command
......@@ -495,3 +496,101 @@ class TestMigrations(unittest.TestCase):
self.assertEqual(
len(list(config.db.store.execute(mlist_table.select()))),
0)
def test_7c5b39d1ecc4_workflow_steps_upgrade(self):
old_state_table = sa.sql.table(
'workflowstate',
sa.sql.column('token', SAUnicode),
sa.sql.column('step', SAUnicode),
sa.sql.column('data', SAUnicode)
)
new_state_table = sa.sql.table(
'workflowstate',
sa.sql.column('token', SAUnicode),
sa.sql.column('steps', SAUnicode),
sa.sql.column('data', SAUnicode)
)
with transaction():
# Start at the previous revision.
alembic.command.downgrade(alembic_cfg, '4bd95c99b2e7')
config.db.store.execute(old_state_table.insert().values([
dict(token='12345',
step='some_step',
data='whatever data'),
dict(token='6789',
step=None,
data='other data')
]))
# Now upgrade.
alembic.command.upgrade(alembic_cfg, '7c5b39d1ecc4')
token, steps, data = config.db.store.execute(
new_state_table.select().where(
new_state_table.c.token == '12345'
)).fetchone()
self.assertEqual(token, '12345')
self.assertEqual(steps, json.dumps(['some_step']))
self.assertEqual(data, 'whatever data')
token, steps, data = config.db.store.execute(
new_state_table.select().where(
new_state_table.c.token == '6789'
)).fetchone()
self.assertEqual(token, '6789')
self.assertEqual(steps, json.dumps([]))
self.assertEqual(data, 'other data')
def test_7c5b39d1ecc4_workflow_steps_downgrade(self):
old_state_table = sa.sql.table(
'workflowstate',
sa.sql.column('token', SAUnicode),
sa.sql.column('step', SAUnicode),
sa.sql.column('data', SAUnicode)
)
new_state_table = sa.sql.table(
'workflowstate',
sa.sql.column('token', SAUnicode),
sa.sql.column('steps', SAUnicode),
sa.sql.column('data', SAUnicode)
)
with transaction():
# Start at the revision.
alembic.command.downgrade(alembic_cfg, '7c5b39d1ecc4')
config.db.store.execute(new_state_table.insert().values([
dict(token='12345',
steps=json.dumps(['next_step', 'some_step']),
data='whatever data'),
dict(token='6789',
steps=json.dumps(['only_step']),
data='other data'),
dict(token='abcde',
steps=json.dumps([]),
data='another data')
]))
# Now downgrade.
alembic.command.downgrade(alembic_cfg, '4bd95c99b2e7')
token, step, data = config.db.store.execute(
old_state_table.select().where(
old_state_table.c.token == '12345'
)).fetchone()
self.assertEqual(token, '12345')
self.assertEqual(step, 'some_step')
self.assertEqual(data, 'whatever data')
token, step, data = config.db.store.execute(
old_state_table.select().where(
old_state_table.c.token == '6789'
)).fetchone()
self.assertEqual(token, '6789')
self.assertEqual(step, 'only_step')
self.assertEqual(data, 'other data')
token, step, data = config.db.store.execute(
old_state_table.select().where(
old_state_table.c.token == 'abcde'
)).fetchone()
self.assertEqual(token, 'abcde')
self.assertEqual(step, None)
self.assertEqual(data, 'another data')
......@@ -27,7 +27,7 @@ class IWorkflowState(Interface):
token = Attribute('A unique key identifying the workflow instance.')
step = Attribute("This workflow's next step.")
steps = Attribute("This workflow's next steps.")
data = Attribute('Additional data (may be JSON-encoded).')
......@@ -36,13 +36,13 @@ class IWorkflowState(Interface):
class IWorkflowStateManager(Interface):
"""The workflow states manager."""
def save(token, step, data=None):
def save(token, steps, data=None):
"""Save the state of a workflow.
:param token: A unique token identifying this workflow instance.
:type token: str
:param step: The next step for this workflow.
:type step: str
:param steps: The next steps for this workflow.
:type steps: str
:param data: Additional data (workflow-specific).
:type data: str
"""
......
......@@ -33,12 +33,12 @@ class TestWorkflow(unittest.TestCase):
def test_save_restore_workflow(self):
# Save and restore a workflow.
token = 'bee'
step = 'cat'
steps = 'cat'
data = 'dog'
self._manager.save(token, step, data)
self._manager.save(token, steps, data)
state = self._manager.restore(token)
self.assertEqual(state.token, token)
self.assertEqual(state.step, step)
self.assertEqual(state.steps, steps)
self.assertEqual(state.data, data)
def test_save_restore_workflow_without_step(self):
......@@ -48,17 +48,17 @@ class TestWorkflow(unittest.TestCase):
self._manager.save(token, data=data)
state = self._manager.restore(token)
self.assertEqual(state.token, token)
self.assertIsNone(state.step)
self.assertIsNone(state.steps)
self.assertEqual(state.data, data)
def test_save_restore_workflow_without_data(self):
# Save and restore a workflow that contains no data.
token = 'bee'
step = 'cat'
self._manager.save(token, step)
steps = 'cat'
self._manager.save(token, steps)
state = self._manager.restore(token)
self.assertEqual(state.token, token)
self.assertEqual(state.step, step)
self.assertEqual(state.steps, steps)
self.assertIsNone(state.data)
def test_save_restore_workflow_without_step_or_data(self):
......@@ -67,7 +67,7 @@ class TestWorkflow(unittest.TestCase):
self._manager.save(token)
state = self._manager.restore(token)
self.assertEqual(state.token, token)
self.assertIsNone(state.step)
self.assertIsNone(state.steps)
self.assertIsNone(state.data)
def test_restore_workflow_with_no_matching_token(self):
......@@ -106,13 +106,13 @@ class TestWorkflow(unittest.TestCase):
self._manager.discard('token2')
self.assertEqual(self._manager.count, 3)
state = self._manager.restore('token1')
self.assertEqual(state.step, 'one')
self.assertEqual(state.steps, 'one')
state = self._manager.restore('token2')
self.assertIsNone(state)
state = self._manager.restore('token3')
self.assertEqual(state.step, 'three')
self.assertEqual(state.steps, 'three')
state = self._manager.restore('token4')
self.assertEqual(state.step, 'four')
self.assertEqual(state.steps, 'four')
def test_discard_missing_workflow(self):
self._manager.discard('bogus-token')
......
......@@ -34,7 +34,7 @@ class WorkflowState(Model):
__tablename__ = 'workflowstate'
token = Column(SAUnicode, primary_key=True)
step = Column(SAUnicode)
steps = Column(SAUnicode)
data = Column(SAUnicode)
......@@ -44,9 +44,9 @@ class WorkflowStateManager:
"""See `IWorkflowStateManager`."""
@dbconnection
def save(self, store, token, step=None, data=None):
def save(self, store, token, steps=None, data=None):
"""See `IWorkflowStateManager`."""
state = WorkflowState(token=token, step=step, data=data)
state = WorkflowState(token=token, steps=steps, data=data)
store.add(state)
@dbconnection
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment