Workers and Executors

Activity Worker

class botoflow.workers.activity_worker.ActivityWorker(session, aws_region, domain, task_list, *activity_definitions)

Bases: botoflow.workers.base_worker.BaseWorker

For implementing activity workers, you can use the ActivityWorker class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class runs a loop to poll for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided and calls the activity method to process the task. Unlike the WorkflowWorker, which calls the @execute decorated method (factory method) to create a new instance for every decision task, the ActivityWorker simply uses the object you provided.

The ActivityWorker class uses the botoflow decorators to determine the registration and execution options.

Parameters
  • session (botocore.session.Session) -- botocore session object.
  • aws_region (str) -- aws region to connect to
  • domain (str) -- SWF domain to operate on.
  • task_list (str) -- default task list on which to put all the workflow requests.
  • activities (object) -- Can either be a single, or a set of activities objects.

Here's an example for setting up an ActivityWorker:

# instantiate a list of activites we want to serve on this worker
activities = [SomeActivities(), OtherActivities()]

# create the worker object
activity_worker = ActivityWorker(session, "us-east-1", "SOMEDOMAIN",
                                 "MYTASKLIST", *activities_obj)
activity_worker.run()
poll_for_activities()

Returns a closure function ready for execution

request_heartbeat(task, details=None)

Sends heartbeat of activity in SWF and returns response.

Parameters
  • details (str) -- If specified, contains details about the progress of the task.
  • task (awsflow.workers.activity_task.ActivityTask) -- The taskToken of the ActivityTask.
Returns

dict response of {'cancelRequested': <bool> }

run()

Run this worker forever (or till SIGINT).

run_once()

Run this worker once (perform one decision loop).

Workflow Worker

class botoflow.workers.workflow_worker.GenericWorkflowWorker(session, aws_region, domain, task_list, get_workflow)

Bases: botoflow.workers.base_worker.BaseWorker

As the name suggests, this worker class is intended for use by the workflow implementation. It is configured with a workflow finding function.

The worker class runs a loop to poll for decision tasks in the specified task list. When a decision task is received, it first looks up the workflow definition class using get_workflow and then creates an instance of it and calls the @ execute() decorated method to process the task.

Parameters
  • session (botocore.session.Session) -- botocore session object.
  • aws_region (str) -- aws region to connect to
  • domain (str) -- SWF domain to operate on.
  • task_list (str) -- default task list on which to put all the workflow requests.
  • get_workflow (func) -- Callable that returns workflow information. This function takes (workflow_name, workflow_version) and returns a tuple of (workflow_definition, workflow_type_object, function_name). (see also get_workflow_entrypoint())

This worker also acts as a context manager for starting new workflow executions. See the following example on how to start a workflow:

run()

Run this worker forever (or till SIGINT).

run_once()

Run this worker once (perform one decision loop).

class botoflow.workers.workflow_worker.WorkflowWorker(session, aws_region, domain, task_list, *workflow_definitions)

Bases: botoflow.workers.workflow_worker.GenericWorkflowWorker

As the name suggests, this worker class is intended for use by the workflow implementation. It is configured with a task list and the workflow implementation type. The worker class runs a loop to poll for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Parameters
  • session (botocore.session.Session) -- botocore session object.
  • aws_region (str) -- aws region to connect to
  • domain (str) -- SWF domain to operate on.
  • task_list (str) -- default task list on which to put all the workflow requests.
  • workflow_definitions -- WorkflowDefinition subclass(es)

This worker also acts as a context manager for starting new workflow executions. See the following example on how to start a workflow:

# create the workflow worker using botocore endpoint and register
# ExampleWorkflow class
wf_worker = WorkflowWorker(session, "us-east-1", "SOMEDOMAIN", "MYTASKLIST",
                           ExampleWorkflow)
wf_worker.run()
botoflow.workers.workflow_worker.get_workflow_entrypoint(definition_class, workflow_name, workflow_version)

Get the entry point information from workflow_class.

This function provides a convenient way to extract the parameters that need to be returned the get_workflow argument to GenericWorkflowWorker

Parameters
  • definition_class (child class of botoflow.workflow_definition.WorkflowDefinition) -- Class which defines the workflow
  • workflow_name (str) -- The name of the workflow
  • workflow_version (str) -- The version of the workflow
Returns

Return a tuple of (definition_class, workflow_type, entrypoint_func_name)

Multiprocessing Activity Executor

class botoflow.workers.multiprocessing_activity_executor.MultiprocessingActivityExecutor(worker)

Bases: botoflow.workers.multiprocessing_executor.MultiprocessingExecutor

This is an executor for ActivityWorker that uses multiple processes to parallelize the activity work.

start(pollers=1, workers=1)

Start the worker. This method does not block.

Parameters
  • pollers (int) -- Count of poller processes to use. Must be equal or less than the workers attribute.
  • workers (int) -- Count of worker processes to use.

Multiprocessing Workflow Executor

class botoflow.workers.multiprocessing_workflow_executor.MultiprocessingWorkflowExecutor(worker)

Bases: botoflow.workers.multiprocessing_executor.MultiprocessingExecutor

This is a multiprocessing workflow executor, suitable for handling lots of workflow decisions in parallel on CPython.

start(pollers=1)

Start the worker.

Parameters
pollers (int) -- Poller/worker count to start. Because the expected lifetime of the decider is short (should be seconds at most), we don't need a separate worker queue.

Example of starting and terminating the worker:

worker.start(pollers=10)
time.sleep(360)
worker.stop()
worker.join()  # will block

Threaded Activity Executor

class botoflow.workers.threaded_activity_executor.ThreadedActivityExecutor(worker)

Bases: botoflow.workers.threaded_executor.ThreadedExecutor

This is an executor for ActivityWorker that uses threads to parallelize the activity work.

Because of the GIL in CPython, it is recomended to use this worker only on Jython or IronPython.

start(pollers=1, workers=1)

Start the worker. This method does not block.

Parameters
  • pollers (int) -- Count of poller threads to use. Must be equal or less than the workers attribute.
  • workers (int) -- Count of worker threads to use.

Threaded Workflow Executor

class botoflow.workers.threaded_workflow_executor.ThreadedWorkflowExecutor(worker)

Bases: botoflow.workers.threaded_executor.ThreadedExecutor

This is a threaded workflow executor.

It will execute a WorkflowWorker in multiple threads.

As in the case with the ThreadedActivityWorker it is not recomended to use it on CPython because of the GIL unless the poller/worker count is low (less than 5).

start(pollers=1)

Start the worker.

Parameters
pollers (int) -- Poller/worker count to start. Because the expected lifetime of the decider is short (should be seconds at most), we don't need a separate worker queue.

Example of starting and terminating the worker:

worker.start(pollers=2)
time.sleep(360)
worker.stop()
worker.join()  # will block