botoflow.decorators.
workflow
(name)¶Using this decorator you can override the workflow name (class name) to use.
This can be very useful when you're writing a newer version of the same workflow, but you want to write it in a different class.
class ExampleWorkflow(WorkflowDefinition):
@execute(version='1.0', execution_start_to_close_timeout=1*MINUTES)
def example_start(self, a, b):
pass
@workflow(name='ExampleWorkflow')
class ExampleWorkflowV2(ExampleWorkflow):
@execute(version='2.0', execution_start_to_close_timeout=1*MINUTES)
def example_start_v2(self, a, b, c):
pass
In the example above, you have two classes that handle ExampleWorkflow, but with version 1.0 and version 2.0, which you can start by calling the corresponding example_start() and example_start_v2() class methods.
botoflow.decorators.
activities
(task_list='USE_WORKER_TASK_LIST', task_priority=None, activity_name_prefix='', heartbeat_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, schedule_to_close_timeout=None, data_converter=None)¶This decorator can be used to declare a set of activity types.
Each method on the class annotated with this decorator represents an
activity type. An interface cannot have both @workflow
and @activities
decorators.
USE_WORKER_TASK_LIST
by default. This is a special value which
indicates that the task list used by the worker, which is performing the registration, should be used.activities()
.AbstractDataConverter
) -- Specifies the type of the DataConverter to use for
serializing/deserializing data when creating tasks of this activity type and its results.
Set to None by default, which indicates that the JsonDataConverter should be used.botoflow.decorators.
execute
(version, execution_start_to_close_timeout, task_list='USE_WORKER_TASK_LIST', task_priority=None, task_start_to_close_timeout=30, child_policy='TERMINATE', data_converter=None, description=None, skip_registration=False)¶This decorator indicates the entry point of the workflow.
The entry point of the workflow can be invoked remotely by your application
using :py:class`~botoflow.workflow_starter.workflow_starter` or direct API call from boto3
or
AWS CLI.
USE_WORKER_TASK_LIST
by default. This is
a special value which indicates that the task list used by the worker,
which is performing the registration, should be used.CHILD_TERMINATE
.AbstractDataConverter
) -- Specifies the type of the DataConverter to use for
serializing/deserializing data when sending requests to and receiving
results from workflow executions of this workflow type. Set to None
by default, which indicates that the JsonDataConverter should be used.botoflow.decorators.
activity
(version, name=None, task_list='USE_WORKER_TASK_LIST', task_priority=None, heartbeat_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, schedule_to_close_timeout=None, description=None, skip_registration=False, manual=False)¶Indicates an activity type
activity()
, the framework will not automatically prepend a
prefix to it. You are free to use your own naming scheme.USE_WORKER_TASK_LIST
by
default. This is a special value which indicates that the task list
used by the worker, which is performing the registration, should be
used.activities()
.botoflow.decorators.
manual_activity
(version, name=None, task_list='USE_WORKER_TASK_LIST', task_priority=None, heartbeat_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, schedule_to_close_timeout=None, description=None, skip_registration=False)¶Indicates a manual activity type
activity()
, the framework will not automatically prepend a
prefix to it. You are free to use your own naming scheme.USE_WORKER_TASK_LIST
by
default. This is a special value which indicates that the task list
used by the worker, which is performing the registration, should be
used.activities()
.botoflow.decorators.
signal
(name=None)¶When used on a method in the WorkflowDefinition subclass, identifies a signal that can be received by executions of the workflow. Use of this decorator is required to define a signal method.
botoflow.decorators.
retry_activity
(stop_max_attempt_number=None, stop_max_delay=None, wait_fixed=None, wait_random_min=None, wait_random_max=None, wait_incrementing_start=None, wait_incrementing_increment=None, wait_exponential_multiplier=None, wait_exponential_max=None, retry_on_exception=None, retry_on_result=None, wrap_exception=False, stop_func=None, wait_func=None)¶Decorator to indicate retrying policy for the activity
Based on the retrying library, but uses seconds instead of milliseconds for waiting as SWF does not support subsecond granularity as well as to make it consistent with the rest of botoflow.
Examples:
@retry_activity
@activity(version='1.0', start_to_close_timeout=float('inf'))
def never_give_up_never_surrender_activity():
print("Retry forever ignoring Exceptions, don't "
"wait between retries")
@activity(version='1.0', start_to_close_timeout=float('inf'))
@retry_activity
def never_give_up_never_surrender_activity():
print("Retry forever ignoring Exceptions, don't "
"wait between retries and don't care about order")
@retry_activity(stop_max_attempt_number=7)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def stop_after_7_attempts_activity():
print("Stopping after 7 attempts")
@retry_activity(stop_max_delay=10*SECONDS)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def stop_after_10_s_activity():
print("Stopping after 10 seconds")
@retry_activity(wait_random_min=1*SECONDS,
wait_random_max=2*SECONDS)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def wait_random_1_to_2_s_activity():
print("Randomly wait 1 to 2 seconds between retries")
@retry_activity(wait_exponential_multiplier=1*SECONDS,
wait_exponential_max=10*SECONDS)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def wait_exponential_1_activity():
print("Wait 2^x * 1 second between each retry, up to 10 "
"seconds, then 10 seconds afterwards")
@retry_activity(retry_on_exception=retry_on_exception(IOError, OSError))
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def might_io_os_error():
print("Retry forever with no wait if an IOError or OSError "
"occurs, raise any other errors")
def retry_if_io_error(exception):
"""Return True if we should retry (in this case when it's an IOError),
False otherwise
"""
if isinstance(exception, ActivityTaskFailedError):
return isinstance(exception.cause, IOError)
@retry_activity(retry_on_exception=retry_if_io_error)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def might_io_error():
print("Retry forever with no wait if an IOError occurs, "
"raise any other errors")
def retry_if_result_none(result):
"""Return True if we should retry (in this case when result is None),
False otherwise
"""
return result is None
@retry_activity(retry_on_result=retry_if_result_none)
@activity(version='1.0', start_to_close_timeout=10*MINUTES)
def might_return_none():
print("Retry forever ignoring Exceptions with no wait if "
"return value is None")