Decorators

botoflow.decorators

botoflow.decorators.workflow(name)

Using this decorator you can override the workflow name (class name) to use.

This can be very useful when you're writing a newer version of the same workflow, but you want to write it in a different class.

class ExampleWorkflow(WorkflowDefinition):

    @execute(version='1.0', execution_start_to_close_timeout=1*MINUTES)
    def example_start(self, a, b):
        pass

@workflow(name='ExampleWorkflow')
class ExampleWorkflowV2(ExampleWorkflow):

    @execute(version='2.0', execution_start_to_close_timeout=1*MINUTES)
    def example_start_v2(self, a, b, c):
        pass

In the example above, you have two classes that handle ExampleWorkflow, but with version 1.0 and version 2.0, which you can start by calling the corresponding example_start() and example_start_v2() class methods.

Parameters
name (str) -- Specifies the name of the workflow type. If not set, the default is to use the workflow definition class name.
botoflow.decorators.activities(task_list='USE_WORKER_TASK_LIST', task_priority=None, activity_name_prefix='', heartbeat_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, schedule_to_close_timeout=None, data_converter=None)

This decorator can be used to declare a set of activity types. Each method on the class annotated with this decorator represents an activity type. An interface cannot have both @workflow and @activities decorators.

Parameters
  • task_list (str) -- Specifies the default task list to be registered with Amazon SWF for this set of activities. The default can be overridden using :py:func`~botoflow.options_overrides.activity_options` when calling the activity. Set to USE_WORKER_TASK_LIST by default. This is a special value which indicates that the task list used by the worker, which is performing the registration, should be used.
  • task_priority (int) -- The default priority for this set of activities. If unset AWS SWF defaults this to 0.
  • activity_name_prefix (str) -- Specifies the prefix of the name of the activity types declared in the interface. If set to empty string (which is the default), the name of the class followed by '.' is used as the prefix.
  • heartbeat_timeout (int or None) -- Specifies the defaultTaskHeartbeatTimeout registered with Amazon SWF for this activity type. Activity workers must provide heartbeat within this duration; otherwise, the task will be timed out. Set to None by default, which is a special value that indicates this timeout should be disabled. See Amazon SWF API reference for more details.
  • schedule_to_start_timeout (int or None) -- Specifies the defaultTaskScheduleToStartTimeout registered with Amazon SWF for this activity type. This is the maximum time a task of this activity type is allowed to wait before it is assigned to a worker. It is required either here or in activities().
  • start_to_close_timeout (int or None) -- Specifies the defaultTaskStartToCloseTimeout registered with Amazon SWF for this activity type. This timeout determines the maximum time a worker can take to process an activity task of this type.
  • schedule_to_close_timeout (int or None) -- Specifies the defaultScheduleToCloseTimeout registered with Amazon SWF for this activity type. This timeout determines the total duration that the task can stay in open state. Set to None by default, which indicates this timeout should be disabled.
  • data_converter (AbstractDataConverter) -- Specifies the type of the DataConverter to use for serializing/deserializing data when creating tasks of this activity type and its results. Set to None by default, which indicates that the JsonDataConverter should be used.
botoflow.decorators.execute(version, execution_start_to_close_timeout, task_list='USE_WORKER_TASK_LIST', task_priority=None, task_start_to_close_timeout=30, child_policy='TERMINATE', data_converter=None, description=None, skip_registration=False)

This decorator indicates the entry point of the workflow.

The entry point of the workflow can be invoked remotely by your application using :py:class`~botoflow.workflow_starter.workflow_starter` or direct API call from boto3 or AWS CLI.

Parameters
  • version (str) -- Required version of the workflow type. Maximum length is 64 characters.
  • execution_start_to_close_timeout (int) -- Specifies the defaultExecutionStartToCloseTimeout registered with Amazon SWF for the workflow type. This specifies the total time a workflow execution of this type may take to complete. See the Amazon SWF API reference for more details.
  • task_list (str) -- The default task list for the decision tasks for executions of this workflow type. The default can be overridden using :py:func`~botoflow.options_overrides.workflow_options` when starting a workflow execution. Set to USE_WORKER_TASK_LIST by default. This is a special value which indicates that the task list used by the worker, which is performing the registration, should be used.
  • task_priority (int) -- The default priority for decision tasks for executions of this workflow type. The default can be overridden using :py:func`~botoflow.options_overrides.workflow_options` when starting a workflow execution. If unset AWS SWF defaults this to 0.
  • task_start_to_close_timeout (int) -- Specifies the defaultTaskStartToCloseTimeout registered with Amazon SWF for the workflow type. This specifies the time a single decision task for a workflow execution of this type may take to complete. See the Amazon SWF API reference for more details. The default is 30 seconds.
  • child_policy (str) -- Specifies the policy to use for the child workflows if an execution of this type is terminated. The default value is CHILD_TERMINATE.
  • data_converter (AbstractDataConverter) -- Specifies the type of the DataConverter to use for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. Set to None by default, which indicates that the JsonDataConverter should be used.
  • description (str or None) -- Textual description of the workflow definition. By default will use the docstring of the workflow definition class if available. The maximum length is 1024 characters, so a long docstring will be truncated to that length.
  • skip_registration (bool) -- Indicates that the workflow type should not be registered with Amazon SWF.
botoflow.decorators.activity(version, name=None, task_list='USE_WORKER_TASK_LIST', task_priority=None, heartbeat_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, schedule_to_close_timeout=None, description=None, skip_registration=False, manual=False)

Indicates an activity type

Parameters
  • version (str) -- Specifies the version of the activity type.
  • name (str) -- Specifies the name of the activity type. The default is None, which indicates that the default prefix and the activity method name should be used to determine the name of the activity type (which is of the form {prefix}{name}). Note that when you specify a name in an activity(), the framework will not automatically prepend a prefix to it. You are free to use your own naming scheme.
  • task_list (str) -- Specifies the default task list to be registered with Amazon SWF for this activity type. The default can be overridden using :py:func`~botoflow.options_overrides.activity_options` when calling the activity. Set to USE_WORKER_TASK_LIST by default. This is a special value which indicates that the task list used by the worker, which is performing the registration, should be used.
  • task_priority (int) -- The default priority for this set of activities. If unset AWS SWF defaults this to 0.
  • heartbeat_timeout (int or None) -- Specifies the defaultTaskHeartbeatTimeout registered with Amazon SWF for this activity type. Activity workers must provide heartbeat within this duration; otherwise, the task will be timed out. Set to None by default, which is a special value that indicates this timeout should be disabled. See Amazon SWF API reference for more details.
  • schedule_to_start_timeout (int or None) -- Specifies the defaultTaskScheduleToStartTimeout registered with Amazon SWF for this activity type. This is the maximum time a task of this activity type is allowed to wait before it is assigned to a worker. It is required either here or in activities().
  • start_to_close_timeout (int or None) -- Specifies the defaultTaskStartToCloseTimeout registered with Amazon SWF for this activity type. This timeout determines the maximum time a worker can take to process an activity task of this type.
  • schedule_to_close_timeout (int or None) -- Specifies the defaultScheduleToCloseTimeout registered with Amazon SWF for this activity type. This timeout determines the total duration that the task can stay in open state. Set to None by default, which indicates this timeout should be disabled.
  • description (str or None) -- Textual description of the activity type. By default will use the docstring of the activity if available. The maximum length is 1024 characters, so a long docstring will be truncated to that length.
  • skip_registration (bool) -- Indicates that the activity type should not be registered with Amazon SWF.
  • manual (bool) -- Indicates that this is a manual activity, if set to true.
botoflow.decorators.manual_activity(version, name=None, task_list='USE_WORKER_TASK_LIST', task_priority=None, heartbeat_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, schedule_to_close_timeout=None, description=None, skip_registration=False)

Indicates a manual activity type

Parameters
  • version (str) -- Specifies the version of the activity type.
  • name (str) -- Specifies the name of the activity type. The default is None, which indicates that the default prefix and the activity method name should be used to determine the name of the activity type (which is of the form {prefix}{name}). Note that when you specify a name in an activity(), the framework will not automatically prepend a prefix to it. You are free to use your own naming scheme.
  • task_list (str) -- Specifies the default task list to be registered with Amazon SWF for this activity type. The default can be overridden using :py:func`~botoflow.options_overrides.activity_options` when calling the activity. Set to USE_WORKER_TASK_LIST by default. This is a special value which indicates that the task list used by the worker, which is performing the registration, should be used.
  • task_priority (int) -- The default priority for this set of activities. If unset AWS SWF defaults this to 0.
  • heartbeat_timeout (int or None) -- Specifies the defaultTaskHeartbeatTimeout registered with Amazon SWF for this activity type. Activity workers must provide heartbeat within this duration; otherwise, the task will be timed out. Set to None by default, which is a special value that indicates this timeout should be disabled. See Amazon SWF API reference for more details.
  • schedule_to_start_timeout (int or None) -- Specifies the defaultTaskScheduleToStartTimeout registered with Amazon SWF for this activity type. This is the maximum time a task of this activity type is allowed to wait before it is assigned to a worker. It is required either here or in activities().
  • start_to_close_timeout (int or None) -- Specifies the defaultTaskStartToCloseTimeout registered with Amazon SWF for this activity type. This timeout determines the maximum time a worker can take to process an activity task of this type.
  • schedule_to_close_timeout (int or None) -- Specifies the defaultScheduleToCloseTimeout registered with Amazon SWF for this activity type. This timeout determines the total duration that the task can stay in open state. Set to None by default, which indicates this timeout should be disabled.
  • description (str or None) -- Textual description of the activity type. By default will use the docstring of the activity if available. The maximum length is 1024 characters, so a long docstring will be truncated to that length.
  • skip_registration (bool) -- Indicates that the activity type should not be registered with Amazon SWF.
botoflow.decorators.signal(name=None)

When used on a method in the WorkflowDefinition subclass, identifies a signal that can be received by executions of the workflow. Use of this decorator is required to define a signal method.

Parameters
name (str) -- Specifies the name portion of the signal name. If not set, the name of the method is used.
botoflow.decorators.retry_activity(stop_max_attempt_number=None, stop_max_delay=None, wait_fixed=None, wait_random_min=None, wait_random_max=None, wait_incrementing_start=None, wait_incrementing_increment=None, wait_exponential_multiplier=None, wait_exponential_max=None, retry_on_exception=None, retry_on_result=None, wrap_exception=False, stop_func=None, wait_func=None)

Decorator to indicate retrying policy for the activity

Based on the retrying library, but uses seconds instead of milliseconds for waiting as SWF does not support subsecond granularity as well as to make it consistent with the rest of botoflow.

Examples:

Retry forever:
@retry_activity
@activity(version='1.0', start_to_close_timeout=float('inf'))
def never_give_up_never_surrender_activity():
    print("Retry forever ignoring Exceptions, don't "
          "wait between retries")
Order of the activity decorators does not matter:
@activity(version='1.0', start_to_close_timeout=float('inf'))
@retry_activity
def never_give_up_never_surrender_activity():
    print("Retry forever ignoring Exceptions, don't "
          "wait between retries and don't care about order")
Retry only a certain number of times:
@retry_activity(stop_max_attempt_number=7)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def stop_after_7_attempts_activity():
    print("Stopping after 7 attempts")
Stop retrying after 10 seconds
@retry_activity(stop_max_delay=10*SECONDS)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def stop_after_10_s_activity():
    print("Stopping after 10 seconds")
Wait a random amount of time:
@retry_activity(wait_random_min=1*SECONDS,
                wait_random_max=2*SECONDS)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def wait_random_1_to_2_s_activity():
    print("Randomly wait 1 to 2 seconds between retries")
Wait an exponentially growing amount of time:
@retry_activity(wait_exponential_multiplier=1*SECONDS,
                wait_exponential_max=10*SECONDS)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def wait_exponential_1_activity():
    print("Wait 2^x * 1 second between each retry, up to 10 "
          "seconds, then 10 seconds afterwards")
Retry on exception:
@retry_activity(retry_on_exception=retry_on_exception(IOError, OSError))
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def might_io_os_error():
    print("Retry forever with no wait if an IOError or OSError "
          "occurs, raise any other errors")
Custom exception retryer:
def retry_if_io_error(exception):
"""Return True if we should retry (in this case when it's an IOError),
      False otherwise
"""
    if isinstance(exception, ActivityTaskFailedError):
        return isinstance(exception.cause, IOError)

@retry_activity(retry_on_exception=retry_if_io_error)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def might_io_error():
    print("Retry forever with no wait if an IOError occurs, "
          "raise any other errors")
Custom retryer based on result:
def retry_if_result_none(result):
"""Return True if we should retry (in this case when result is None),
      False otherwise
"""
    return result is None

@retry_activity(retry_on_result=retry_if_result_none)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def might_return_none():
    print("Retry forever ignoring Exceptions with no wait if "
          "return value is None")
Parameters
  • stop_max_attempt_number (int) -- Stop retrying after reaching this attempt number. Default is 3 attempts.
  • stop_max_delay (float) -- Retry for at most this given period of time in seconds. Default is 1 second.
  • wait_fixed (float) -- Wait a fixed amount of seconds before retrying. Default is 1 second.
  • wait_random_min (float) -- Random wait time minimum in seconds. Default is 0 seconds.
  • wait_random_max (float) -- Maximum random wait time in seconds. Default is 1 second.
  • wait_incrementing_start (float) -- Starting point for waiting an incremental amount of time. Default is 0.
  • wait_incrementing_increment (float) -- For each attempt, by how much we increment the waiting period. Default is 1 second.
  • wait_exponential_multiplier (float) -- Exponential wait time multiplier. Default is 1.
  • wait_exponential_max (float) -- Maximum number in seconds for the exponential multiplier to get to. Default is 1073741.
  • retry_on_exception (callable) -- A function that returns True if an exception needs to be retried on
  • retry_on_result (callable) -- A function that returns True if a retry is needed based on a result.
  • wrap_exception (bool) -- Whether to wrap the non-retryable exceptions in RetryError
  • stop_func (callable) -- Function that decides when to stop retrying
  • wait_func (callable) -- Function that looks like f(previous_attempt_number, delay_since_first_attempt_ms) and returns a a new time in ms for the next wait period