Workers and Executors

SWF Operation Callable

class botoflow.workers.swf_op_callable.SWFOp(endpoint, op)

Callable wrapper for SWF Operations that inspects the replies and raises appropriate botoflow.swf_exceptions.

Workers Activity Task

class botoflow.workers.activity_task.ActivityTask(task_dict)

Bases: object

INTERNAL Unit of work sent to an activity worker.

Used to construct the object from SWF task dictionary

Base Worker

class botoflow.workers.base_worker.BaseWorker(session, aws_region, domain, task_list)

Bases: object

Base for the Workflow and Activity workers

client

Returns the botocore SWF client :rtype: botocore.client.swf

domain

Returns the worker's domain

identity

Returns the worker's worker's identity

This value ends up stored in the identity field of the corresponding Start history event. Default is "pid":"host".

run()

Should be implemented by the worker

Raises
NotImplementedError
run_once()

Should be implemented by the worker

Raises
NotImplementedError
task_list

Returns the task list

unhandled_exception_handler

Returns the current unhandled exception handler.

Handler notified about poll request and other unexpected failures. The default implementation logs the failures using ERROR level.

Multiprocessing Executor

class botoflow.workers.multiprocessing_executor.MultiprocessingExecutor(worker)

Bases: object

A base for all multiprocessing executors

initializer

If set, the initializer function will be called after the subprocess is started with the worker object as the first argument.

You can use this to, for example, set the process name suffix, to distinguish between activity and workflow workers (when starting them from the same process):

from setproctitle import getproctitle, setproctitle

def set_worker_title(worker):
    name = getproctitle()
    if isinstance(worker, WorkflowWorker):
        setproctitle(name + ' (WorkflowWorker)')
    elif isinstance(worker, ActivityWorker):
        setproctitle(name + ' (ActivityWorker)')

worker.initializer = set_worker_title
is_running

Returns True if the worker is running

join()

Will wait till all the processes are terminated

start()

Start the worker. This method does not block.

stop()

Stops the worker processes. To wait for all the processes to terminate, call:

worker.start()
time.sleep(360)
worker.stop()
worker.join()  # will block

Threaded Executor

class botoflow.workers.threaded_executor.ThreadedExecutor(worker)

Bases: object

This will execute a worker using multiple threads.

initializer

If set, the initializer function will be called after the thread is started with the worker object as the first argument.

is_running

Returns True if the worker is running

join()

Will wait till all the threads are terminated

start()

Start the worker. This method does not block.

stop()

Stops the worker threads. To wait for all the threads to terminate, call:

worker.start()
time.sleep(360)
worker.stop()
worker.join()  # will block