Workflow Definition

class botoflow.workflow_definition.WorkflowDefinition(workflow_execution)

Bases: object

Every workflow implementation must be a subclass of WorkflowDefinition.

Usually there should be no need to instantiate the class manually, as instead, the @execute method is called to start the workflow (you can think of ths as having factory class methods in a sense).

Here's an example workflow implementation that has an @execute decorated method and a @signal:

from botoflow import execute, return_, WorkflowDefinition
from botoflow.constants import MINUTES

from my_activities import MyActivities


class MyWorkflow(WorkflowDefinition):

    @execute(version='1.0', execution_start_to_close_timeout=1*MINUTES)
    def start_my_workflow(self, some_input):
        # execute the activity and wait for it's result
        result = yield MyActivities.activity1(some_input)

        # return the result from the workflow
        # in Python3 you can "return result" normally instead
        return_(result)

    @signal()  # has to have () parentheses
    def signal1(self, signal_input):
        self.signal_input = signal_input

Note

As with the @async decorated methods, returning values from the workflow is a little bit inconvenient on Python 2 as it does not allow returning from generator functions (see PEP 0380). Instead of using the familiar return keyword, the return value is raised like this: raise Return("Value").

Parameters
workflow_execution (WorkflowExecution) -- Associated workflow execution.
cancel(details='')

Cancels the workflow execution

If cancel initiated from context of the workflow itself, raises CancelledError. This will cascade the cancel request to all open activities and then cancel the workflow itself.

If cancel initiated from a different (external) context, a cancel request decision is sent to SWF so the proper context can process the request.

Parameters
details (str) -- of request; is recorded in SWF history
Returns
cancel Future that is empty until the message was succesfully delivered to target execution. Does not indicate whether the target execution accepted the request or not.
Return type
botoflow.core.future.Future
Raises
botoflow.core.exceptions.CancelledError -- if workflow to cancel is of current context
cancellation_handler

Override this to take cleanup actions before workflow execution cancels.

workflow_execution

Will contain the WorkflowExecution named tuple of the currently running workflow.

An example of the workflow_execution after starting a workflow:

# create the workflow using boto swf_client and ExampleWorkflow class
wf_worker = WorkflowWorker(swf_client, "SOMEDOMAIN", "MYTASKLIST"
                           ExampleWorkflow)

# start the workflow with a random workflow_id
with wf_worker:
    instance = ExampleWorkflow.execute(arg1=1, arg2=2)
    print(instance.workflow_execution)

Prints something like the following:

WorkflowExecution(workflow_id='73faf493fece67fefb1142739611c391a03bc23b',
                  run_id='12Eg0ETHpm17rSWssUZKqAvEZVd5Ap0RELs8kE7U6igB8=')
workflow_result

This property returns the future associated with the result of the workflow execution.

The main use-case is when you have subworkflows, which results you'd like to yield on and still be able to call signals on that sub-workflow.

Returns
~botoflow.core.future.Future, or None if the workflow has not been started.
workflow_state

This property is used to retrieve current workflow state. The property is expected to perform read only access of the workflow implementation object and is invoked synchronously which disallows use of any asynchronous operations (like calling it with yield).

The latest state reported by the workflow execution is returned to you through visibility calls to the Amazon SWF service and in the Amazon SWF console.

To learn more about workflow execution state, please check out the official documentation on SWF Workflow History.

Example of setting the state between yield s:

class MyWorkflow(WorkflowDefinition):

    @execute(version='1.0', execution_start_to_close_timeout=1*MINUTES)
    def start_workflow(self):
        self.workflow_state = "Workflow started"
        yield SomeActivity.method(1, 2)
        self.workflow_state = "Workflow completing"