botoflow.workflow_definition.
WorkflowDefinition
(workflow_execution)¶Bases: object
Every workflow implementation must be a subclass of WorkflowDefinition
.
Usually there should be no need to instantiate the class manually, as
instead, the @execute
method is called to start the workflow
(you can think of ths as having factory class methods in a sense).
Here's an example workflow implementation that has an @execute
decorated
method and a @signal
:
from botoflow import execute, return_, WorkflowDefinition
from botoflow.constants import MINUTES
from my_activities import MyActivities
class MyWorkflow(WorkflowDefinition):
@execute(version='1.0', execution_start_to_close_timeout=1*MINUTES)
def start_my_workflow(self, some_input):
# execute the activity and wait for it's result
result = yield MyActivities.activity1(some_input)
# return the result from the workflow
# in Python3 you can "return result" normally instead
return_(result)
@signal() # has to have () parentheses
def signal1(self, signal_input):
self.signal_input = signal_input
Note
As with the @async
decorated methods, returning values from the
workflow is a little bit inconvenient on Python 2 as it does not allow returning from generator functions (see
PEP 0380). Instead of using the familiar return
keyword, the return value is raised like this: raise
Return("Value")
.
cancel
(details='')¶Cancels the workflow execution
If cancel initiated from context of the workflow itself, raises CancelledError. This will cascade the cancel request to all open activities and then cancel the workflow itself.
If cancel initiated from a different (external) context, a cancel request decision is sent to SWF so the proper context can process the request.
Future
that is empty until the message was succesfully
delivered to target execution. Does not indicate whether the target execution accepted the request or not.cancellation_handler
¶Override this to take cleanup actions before workflow execution cancels.
workflow_execution
¶Will contain the
WorkflowExecution
named tuple
of the currently running workflow.
An example of the workflow_execution after starting a workflow:
# create the workflow using boto swf_client and ExampleWorkflow class
wf_worker = WorkflowWorker(swf_client, "SOMEDOMAIN", "MYTASKLIST"
ExampleWorkflow)
# start the workflow with a random workflow_id
with wf_worker:
instance = ExampleWorkflow.execute(arg1=1, arg2=2)
print(instance.workflow_execution)
Prints something like the following:
WorkflowExecution(workflow_id='73faf493fece67fefb1142739611c391a03bc23b',
run_id='12Eg0ETHpm17rSWssUZKqAvEZVd5Ap0RELs8kE7U6igB8=')
workflow_result
¶This property returns the future associated with the result of the workflow execution.
The main use-case is when you have subworkflows, which results you'd
like to yield
on and still be able to call signals on that
sub-workflow.
workflow_state
¶This property is used to retrieve current workflow state.
The property is expected to perform read only access of the workflow
implementation object and is invoked synchronously which disallows
use of any asynchronous operations (like calling it with yield
).
The latest state reported by the workflow execution is returned to you through visibility calls to the Amazon SWF service and in the Amazon SWF console.
To learn more about workflow execution state, please check out the official documentation on SWF Workflow History.
Example of setting the state between yield
s:
class MyWorkflow(WorkflowDefinition):
@execute(version='1.0', execution_start_to_close_timeout=1*MINUTES)
def start_workflow(self):
self.workflow_state = "Workflow started"
yield SomeActivity.method(1, 2)
self.workflow_state = "Workflow completing"