========== Workflow ========== .. currentmodule:: workflow What is Workflow? ================= The Workflow library provides a method of running Finite State Machines with memory. It can be used to execute a set of methods, complete with conditions and patterns. Workflow allows for a number of independent pieces of data to be processed by the same logic, while allowing for the entire process to be forwarded, backwarded, paused, inspected, re-executed, modified and stored. How do I use Workflow? ====================== In the following sections we will take a look at Workflow's features by working on examples of increasing complexity. Please keep an eye out for comments in the code as they provide crucial information. Basic workflow use ------------------ Basic use is comprised of the following steps: 1. Instantiate a **workflow engine**. For this example we will use the simplest provided one, but you may also extend it to add custom behaviour. .. code-block:: python from workflow.engine import GenericWorkflowEngine my_engine = GenericWorkflowEngine() 2. Create **tasks**. These are purpose-built function functions that the workflow engine can execute. The engine always passes **(current_token, current_engine)** as arguments to these functions, so they need to support them. Note the **add_data** function needs to be able to accept more arguments. For this we use a closure. .. code-block:: python from functools import wraps def print_data(obj, eng): """Print the data found in the token.""" print obj.data def add_data(number_to_add): """Add number_to_add to obj.data.""" @wraps(add_data) def _add_data(obj, eng): obj.data += number_to_add return _add_data 2. Create a **workflow definition** (also known as **callbacks**). This is a (sometimes nested) list of **tasks** that we wish to run. .. code-block:: python my_workflow_definition = [ add_data(1), print_data ] 3. Define **tokens**. This is the data that we wish to feed the workflow. Since the data we will deal with in this example is immutable, we need to place it in **token wrappers** . Another reason you may wish to wrap your data is to be able to store `metadata` in the object. .. code-block:: python class MyObject(object): def __init__(self, data): self.data = data my_object0 = MyObject(0) my_object1 = MyObject(1) 4. **Run** the engine on a list of such wrappers with our workflow definition. The engine passes the tokens that we give it one at a time through the workflow. .. code-block:: python my_engine.callbacks.replace(my_workflow_definition) my_engine.process([my_object0, my_object1]) # The engine prints: "1\n2" my_object0 == 1 my_object1 == 2 5. **Bonus**! Once the engine has ran, it can be reused. .. code-block:: python my_engine.process([my_object0, my_object1]) # The engine prints: "2\n3" my_object0 == 2 my_object1 == 3 Loops and interrupts -------------------- Let's take a look at a slightly more advanced example. There are two things to note here: * How control flow is done. We provide, among others, **IF_ELSE** and **FOR** statements. They are simple functions - therefore you can make your own if you wish to. We will see examples of this in the Details section. * Control flow can reach outside the engine via exceptions. We will raise the **WorkflowHalt** exception to return the control to our code before the workflow has even finished and then even resume it. In this example, we have a series of lists composed of 0 and 1 and we want to: 1. Add [0, 1] at the end of the list. 2. Repeat a until list >= [0, 1, 0, 1]. 3. Add [1] when we are done. Here are some example transformations that describe the above: * [] --> [0, 1, 0, 1, 1] * [0, 1] --> [0, 1, 0, 1, 1] * [0, 1, 0, 1] --> [0, 1, 0, 1, 0, 1, 1] Time for some code! Let's start with the imports. Pay close attention as their arguments are explained briefly here. .. code-block:: python from workflow.engine import GenericWorkflowEngine from workflow.errors import HaltProcessing from workflow.patterns.controlflow import ( FOR, # Simple for-loop, a-la python. First argument is an iterable, # second defines where to save the current value, and the third # is the code that runs in the loop. HALT, # Halts the engine. This brings it to a state where it can be # inspected, resumed, restarted, or other. IF_ELSE, # Simple `if-else` statement that accepts 3 arguments. # (condition, tasks if true, tasks if false) CMP, # Simple function to support python comparisons directly from a # workflow engine. ) Now to define some functions of our own. Note that the first function leverages `eng.extra_data`. This is a simple dictionary that the `GenericWorkflowEngine` exposes and it acts as a shared storage that persists during the execution of the engine. The two latter functions wrap engine functionality that's already there, but add `print` statements for the example. .. code-block:: python def append_from(key): """Append data from a given `key` of the engine's `extra_data`.""" def _append_from(obj, eng): obj.append(eng.extra_data[key]) print "new data:", obj return _append_from def interrupt_workflow(obj, eng): """Raise the `HaltProcessing` exception. This is not handled by the engine and bubbles up to our code. """ print "Raising HaltProcessing" eng.halt("interrupting this workflow.") def restart_workflow(obj, eng): """Restart the engine with the current object, from the first task.""" print "Restarting the engine" eng.restart('current', 'first') We are now ready to create the workflow: .. code-block:: python my_workflow = [ FOR(range(2), "my_current_value", # For-loop, from 0 to 1, that sets # the current value to # `eng.extra_data["my_current_value"]` [ append_from("my_current_value"), # Gets the value set above # and appends it to our token ] ), # END FOR IF_ELSE( CMP((lambda o, e: o), [0, 1 ,0, 1], "<"), # Condition: # "if obj < [0,1,0,1]:" [ restart_workflow ], # Tasks to run if condition # is True: # "return back to the FOR" [ # Tasks to run if condition # is False: append_from("my_current_value"), # "append 1 (note we still # have access to it) interrupt_workflow # and interrupt" ] ) # END IF_ELSE ] Because our workflow interrupts itself, we will wrap the call to `process` and `restart`, in `try-except` statements. .. code-block:: python # Create the engine as in the previous example my_engine = GenericWorkflowEngine() my_engine.callbacks.replace(my_workflow) try: # Note how we don't need to keep a reference to our tokens - the engine # allows us to access them via `my_engine.objects` later. my_engine.process([[], [0,1], [0,1,0,1]]) except HaltProcessing: # Our engine was built to throw this exception every time an object is # completed. At this point we can inspect the object to decide what to # do next. In any case, we will ask it to move to the next object, # until it stops throwing the exception (which, in our case, means it # has finished with all objects). while True: try: # Restart the engine with the next object, starting from the # first task. my_engine.restart('next', 'first') except HaltProcessing: continue else: print "Done!", my_engine.objects break Here is what the execution prints:: new data: [0] new data: [0, 1] Restarting the engine new data: [0, 1, 0] new data: [0, 1, 0, 1] new data: [0, 1, 0, 1, 1] Raising HaltProcessing new data: [0, 1, 0] new data: [0, 1, 0, 1] new data: [0, 1, 0, 1, 1] Raising HaltProcessing new data: [0, 1, 0, 1, 0] new data: [0, 1, 0, 1, 0, 1] new data: [0, 1, 0, 1, 0, 1, 1] Raising HaltProcessing Done! [[0, 1, 0, 1, 1], [0, 1, 0, 1, 1], [0, 1, 0, 1, 0, 1, 1]] Celery support -------------- Celery is a widely used distributed task queue. The independent nature of workflows and their ability to be restarted and resumed makes it a good candidate for running in a task queue. Let's take a look at running a workflow inside celery. Assuming workflow is already installed, let's also install celery:: $ pip install 'celery[redis]' Onto the code next: .. code-block:: python from celery import Celery # `app` is required by the celery worker. app = Celery('workflow_sample', broker='redis://localhost:6379/0') # Define a couple of basic tasks. def add(obj, eng): obj["value"] += 2 def print_res(obj, eng): print obj.get("value") # Create a workflow out of them. flow = [add, print_res] # Mark our execution process as a celery task with this decorator. @app.task def run_workflow(data): # Note that the imports that this function requires must be done inside # it since our code will not be running in the global context. from workflow.engine import GenericWorkflowEngine wfe = GenericWorkflowEngine() wfe.setWorkflow(flow) wfe.process(data) # Code that runs when we call this script directly. This way we can start # as many workflows as we wish and let celery handle how they are # distributed and when they run. if __name__ == "__main__": run_workflow.delay([{"value": 10}, {"value": 20}, {"value": 30}]) Time to bring celery up: 1. Save this file as `/some/path/workflow_sample.py` 2. Bring up a worker in one terminal:: $ cd /some/path $ celery -A workflow_sample worker --loglevel=info 3. Use another terminal to request `run_workflow` to be ran with the above arguments:: $ cd /some/path $ python workflow_sample.py You should see the worker working. Try running `python workflow_sample.py` again. Storage-capable engine ---------------------- The Workflow library comes with an alternative engine which is built to work with SQLAlchemy databases (`DbWorkflowEngine`). This means that one can store the state of the engine and objects for later use. This opens up new possibilities: * A front-end can be attached to the engine with ease. * Workflows can be stored for resume at a later time.. * ..or even shared between processing nodes. In this example we will see a simple implementation of such a database-stored, resumable workflow. We will reveal the problem that we will be solving much later. For now, we can start by creating a couple of SQLAlchemy schemas: * One to attach to the workflow itself (a workflow will represent a student) * and one where a single grade (a grade will be the grade of a single test) Note that the `Workflow` model below can store an element pointer. This pointer (found at `engine.state.token_pos`) indicates the object that is currently being processed and saving it is crucial so that the engine can resume at a later time from that point. .. code-block:: python from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, Boolean from sqlalchemy.orm import sessionmaker, relationship # Create an engine and a session engine = create_engine('sqlite://') Base = declarative_base(bind=engine) DBSession = sessionmaker(bind=engine) session = DBSession() class Workflow(Base): __tablename__ = 'workflow' id = Column(Integer, primary_key=True) state_token_pos = Column(Integer, default=-1) grades = relationship('Grade', backref='workflow', cascade="all, delete, delete-orphan") def save(self, token_pos): """Save object to persistent storage.""" self.state_token_pos = token_pos session.begin(subtransactions=True) try: session.add(self) session.commit() except Exception: session.rollback() raise class Grade(Base): __tablename__ = 'grade' id = Column(Integer, primary_key=True) data = Column(Integer, nullable=False, default=0) user_id = Column(Integer, ForeignKey('workflow.id')) def __init__(self, grade): self.data = grade session.add(self) Base.metadata.create_all(engine) Next, we have to tell `DbWorkflowEngine` how and when to use our storage. To do that we need to know a bit about the engine's `processing_factory` property, which is expected to provide this structure of methods and properties: * `before_processing` * `after_processing` * `before_object` * `after_object` * `action_mapper` (property) * `before_callbacks` * `after_callbacks` * `before_each_callback` * `after_each_callback` * `transition_exception_mapper` (property) * `StopProcessing` * `HaltProcessing` * `ContinueNextToken` * `JumpToken` * `Exception` * ... (Can be extended by adding any method that has the name of an expected exception) The `transition_exception_mapper` can look confusing at first. It contains not Exceptions, but methods that are called when exceptions with the same name are raised. * Some exceptions are internal to the engine only and never bubble up. (eg `JumpToken`, `ContinueNextToken`) * Others are partly handled internally and then bubbled up to the user to take action. (eg `Exception`) Let's use the above to ask our engine to: 1. Save the first objects that it is given. 2. Save to our database every time it finished processing an object and when there is an expected failure. For now, all we need to know is that in our example, `HaltProcessing` is an exception that we will intentionally raise and we want to save the engine when it occurs. Once again, follow the comments carefully to understand the code. .. code-block:: python from workflow.engine_db import DbWorkflowEngine from workflow.errors import HaltProcessing from workflow.engine import TransitionActions, ProcessingFactory class MyDbWorkflowEngine(DbWorkflowEngine): def __init__(self, db_obj): """Load an old `token_pos` from the db into the engine.""" # The reason we save token_pos _first_, is because calling `super` # will reset. token_pos = db_obj.state_token_pos self.db_obj = db_obj super(DbWorkflowEngine, self).__init__() # And now we inject it back into the engine's `state`. if token_pos is not None: self.state.token_pos = token_pos self.save() # For this example we are interested in saving `token_pos` as explained # previously, so we override `save` to do that. def save(self, token_pos=None): """Save the state of the workflow.""" if token_pos is not None: self.state.token_pos = token_pos self.db_obj.save(self.state.token_pos) # We want our own processing factory, so we tell the engine that we # have subclassed it below. @staticproperty def processing_factory(): """Provide a processing factory.""" return MyProcessingFactory class MyProcessingFactory(ProcessingFactory): """Processing factory for persistence requirements.""" # We also have our own `transition_actions` @staticproperty def transition_exception_mapper(): """Define our for handling transition exceptions.""" return MyTransitionActions # Before any processing is done, we wish to save the `objects` (tokens) # that have been passed to the engine, if they aren't already stored. @staticmethod def before_processing(eng, objects): """Make sure the engine has a relationship with its objects.""" if not eng.db_obj.grades: for obj in objects: eng.db_obj.grades.append(obj) # We wish to save on every successful completion of a token. @staticmethod def after_processing(eng, objects): """Save after we processed all the objects successfully.""" eng.save() class MyTransitionActions(TransitionActions): # But we also wish to save when `HaltProcessing` is raised, because this # is going to be an expected situation. @staticmethod def HaltProcessing(obj, eng, callbacks, e): """Save whenever HaltProcessing is raised, so that we don't lose the state.""" eng.save() raise e And now, for the problem that we want to solve itself. Imagine an fictional exam where a student has to take 6 tests in one day. The tests are processed in a specific order by a system. Whenever the system locates a failing grade, as punishment, the student is asked to take the failed test again the next day. Then the checking process is resumed until the next failing grade is located and the student must show up again the following day. Assume that a student, Zack P. Hacker, has just finished taking all 6 tests. A workflow that does the following checking can now be implemented like so: .. code-block:: python from workflow.patterns.controlflow import IF, HALT from workflow.utils import staticproperty my_workflow_instance = Workflow() my_db_engine = MyDbWorkflowEngine(my_workflow_instance) def grade_is_not_passing(obj, eng): print 'Testing grade #{0}, with data {1}'.format(obj.id, obj.data) return obj.data < 5 callbacks = [ IF(grade_is_not_passing, [ HALT() ]), ] my_db_engine.callbacks.replace(callbacks) try: my_db_engine.process([ Grade(6), Grade(5), Grade(4), Grade(5), Grade(2), Grade(6) ]) except HaltProcessing: print 'The student has failed this test!' # At this point, the engine has already saved its state in the database, # regardless of the outcome. The above script prints:: Testing grade #1, with data 6 Testing grade #2, with data 5 Testing grade #3, with data 4 The student has failed this test! "Obviously this system is terrible and something must be done", thinks Zack who was just notified about his "4" and logs onto the system, armed with a small python script: .. code-block:: python def amend_grade(obj, eng): print 'Amending this grade..' obj.data = 5 evil_callbacks = [ IF(grade_is_not_passing, [ amend_grade ]), ] # Load yesterday's workflow and bring up an engine for it. revived_workflow = session.query(Workflow).one() my_db_engine = MyDbWorkflowEngine(revived_workflow) print '\nWhat Zak sees:', [grade.data for grade in revived_workflow.grades] # Let's fix that. my_db_engine.callbacks.replace(evil_callbacks) print 'Note how the engine resumes from the last failing test:' my_db_engine.restart('current', 'first', objects=revived_workflow.grades) These words are printed in Zack's terminal:: What Zak sees: [6, 5, 4, 5, 2, 6] Note how the engine resumes from the last failing test: Testing grade #3, with data 4 Amending this grade.. Testing grade #4, with data 5 Testing grade #5, with data 2 Amending this grade.. Testing grade #6, with data 6 When someone logs into the system to check how Zack did.. .. code-block:: python revived_workflow = session.query(Workflow).one() print '\nWhat the professor sees:', [grade.data for grade in revived_workflow.grades] Everything looks good:: What the professor sees: [6, 5, 5, 5, 5, 6] The moral of this story is to keep off-site logs and back-ups. Also, workflows are complex but powerful. .. DbWorkflowEngine interface .. ~~~~~~~~~~~~~~~~~~~~~~~~~~ .. `DBWorkflowEngine` provides a more extended interface out of the box. .. has a `save` method out of the box. By default it .. calls `.save()` on `db_obj`, allowing for an optional argument: .. `status`. `status` allows saving the status (eg HALTED) of the engine .. the database. For simplicity, we do not support this in our .. database. You can read more about the default status storage representations in the " Signals support =============== Adding to the exception and override-based mechanisms, Workflow supports a few signals out of the box if the `blinker` package is installed. The following exceptions are triggered by the `GenericWorkflowEngine`. ============================ ================================================= Signal Called by ============================ ================================================= `workflow_started` ProcessingFactory.before_processing `workflow_finished` ProcessingFactory.after_processing `workflow_halted` TransitionActions.HaltProcessing ============================ ================================================= Useful engine methods ===================== Other than `eng.halt`, the GenericWorkflowEngine provides more convenience methods out of the box. =============================== ============================== Sample call Description =============================== ============================== `eng.stop()` stop the workflow `eng.halt("list exhausted")` halt the workflow `eng.continue_next_token()` continue from the next token `eng.jump_token(-2)` jump `offset` tokens `eng.jump_call(3)` jump `offset` steps of a loop `eng.break_current_loop()` break out of the current loop =============================== ============================== By calling these, any **task** can influence the whole pipeline. You can read more about the methods our engines provide at the end of this document. Patterns ======== The workflow module also comes with many patterns that can be directly used in the definition of the pipeline, such as **PARALLEL_SPLIT**. Consider this example of a task: .. code-block:: python def if_else(call): def inner_call(obj, eng): if call(obj, eng): # if True, continue processing.. eng.jump_call(1) else: # ..else, skip the next step eng.jump_call(2) return inner_call We can then write a **workflow definition** like this: .. code-block:: python [ if_else(stage_submission), [ [ if_else(fulltext_available), [extract_metadata, populate_empty_fields], [] ], [ if_else(check_for_duplicates), [stop_processing], [synchronize_fields, replace_values] ], check_mandatory_fields, ], [ check_mandatory_fields, check_preferred_values, save_record ] ] Example: Parallel split ----------------------- .. raw:: html This pattern is called Parallel split (as tasks B,C,D are all started in parallel after task A). It could be implemented like this: .. code-block:: python def PARALLEL_SPLIT(*args): """ Tasks A,B,C,D... are all started in parallel @attention: tasks A,B,C,D... are not addressable, you can't you can't use jumping to them (they are invisible to the workflow engine). Though you can jump inside the branches @attention: tasks B,C,D... will be running on their own once you have started them, and we are not waiting for them to finish. Workflow will continue executing other tasks while B,C,D... might be still running. @attention: a new engine is spawned for each branch or code, all operations works as expected, but mind that the branches know about themselves, they don't see other tasks outside. They are passed the object, but not the old workflow engine object @postcondition: eng object will contain lock (to be used by threads) """ def _parallel_split(obj, eng, calls): lock = thread.allocate_lock() eng.store['lock'] = lock for func in calls: new_eng = eng.duplicate() new_eng.setWorkflow([lambda o, e: e.store.update({'lock': lock}), func]) thread.start_new_thread(new_eng.process, ([obj], )) return lambda o, e: _parallel_split(o, e, args) Subsequently, we can use PARALLEL_SPLIT like this. .. code-block:: python from workflow.patterns import PARALLEL_SPLIT from my_module_x import task_a,task_b,task_c,task_d [ task_a, PARALLEL_SPLIT(task_b,task_c,task_d) ] Note that PARALLEL_SPLIT is already provided in `workflow.patterns.PARALLEL_SPLIT`. Example: Synchronisation ------------------------ .. raw:: html After the execution of task B, task C, and task D, task E can be executed (I will present the threaded version, as the sequential version would be dead simple). .. code-block:: python def SYNCHRONIZE(*args, **kwargs): """ After the execution of task B, task C, and task D, task E can be executed. @var *args: args can be a mix of callables and list of callables the simplest situation comes when you pass a list of callables they will be simply executed in parallel. But if you pass a list of callables (branch of callables) which is potentially a new workflow, we will first create a workflow engine with the workflows, and execute the branch in it @attention: you should never jump out of the synchronized branches """ timeout = MAX_TIMEOUT if 'timeout' in kwargs: timeout = kwargs['timeout'] if len(args) < 2: raise Exception('You must pass at least two callables') def _synchronize(obj, eng): queue = MyTimeoutQueue() #spawn a pool of threads, and pass them queue instance for i in range(len(args)-1): t = MySpecialThread(queue) t.setDaemon(True) t.start() for func in args[0:-1]: if isinstance(func, list) or isinstance(func, tuple): new_eng = duplicate_engine_instance(eng) new_eng.setWorkflow(func) queue.put(lambda: new_eng.process([obj])) else: queue.put(lambda: func(obj, eng)) #wait on the queue until everything has been processed queue.join_with_timeout(timeout) #run the last func args[-1](obj, eng) _synchronize.__name__ = 'SYNCHRONIZE' return _synchronize Configuration (i.e. what would admins write): .. code-block:: python from workflow.patterns import SYNCHRONIZE from my_module_x import task_a,task_b,task_c,task_d [ SYNCHRONIZE(task_b,task_c,task_d, task_a) ] .. .. automodule:: workflow .. :members: GenericWorkflowEngine API ========================= This documentation is automatically generated from Workflow's source code. .. autoclass:: workflow.engine.GenericWorkflowEngine :members: .. autoclass:: workflow.engine.MachineState :members: .. autoclass:: workflow.engine.Callbacks :members: .. autoclass:: workflow.engine._Signal :members: DbWorkflowEngine API ==================== .. autoclass:: workflow.engine_db.DbWorkflowEngine :members: .. autoclass:: workflow.engine_db.WorkflowStatus :members: .. autoclass:: workflow.engine_db.ObjectStatus :members: .. include:: ../CHANGES.rst .. include:: ../CONTRIBUTING.rst License ======= .. include:: ../LICENSE .. include:: ../AUTHORS.rst