AWS SWF Tutorial with Python

I’m preparing to get the “AWS Certified Developer – Associate” certification and the one thing I have no experience with is Simple Workflow Service, and this is mentioned throughout their training materials and exam prep. Unfortunately I’m not a fan of their tutorial so for me to learn how SWF actually works I am going to start from scratch and hack it until it works using Python and EC2. Below I will document my discoveries, but first what are we building?

As such I came up with the most useless use case of all time, I am going to build a Workflow that does the following:

  1. Takes text input
  2. Makes text lowercase
  3. Stores text in dynamodb (mocking this method to do nothing)

Basic settings and session method

The below function will handle all of the authentication and settings for the small SWF project were gonna build.

#creds.py
"""
Stores credentials and global settings
"""
from botocore.client import Config
from boto3.session import Session

def init_aws_session(service):
    """
    Returns AWS Client Session for Given Service
    """

    # Amazon API Creds and Settings
    access_key_id = ''
    secret_access_key = ''

    # Create Session
    session = Session(
        aws_access_key_id=access_key_id,
        aws_secret_access_key=secret_access_key,
        region_name='us-east-1',
    )
    client = session.client(service)
    return client

# Global Configuration Settings
DOMAIN = 'a-test-domain'
WORKFLOW = 'a-test-workflow'
TASKLIST = 'a-test-tasklist'

# Update Timeouts
config = Config(connect_timeout=70, read_timeout=70)

Register your domain, workflows and activities

To make things easy for you to follow along I’ve gone ahead and automated the setup of the Domain, Workflows and Activities below.

#!/usr/bin/env python
"""
Registers domains, workflows and activities
"""
from botocore.exceptions import ClientError
from creds import init_aws_session, DOMAIN, WORKFLOW
import boto3

def register_domain(session):
    try:
        response = session.register_domain(
            name=DOMAIN,
            description='Testing domain for learning SWF',
            workflowExecutionRetentionPeriodInDays='10',
        )
        print "[*] Registered domain '{}' successfully".format(DOMAIN)
    except ClientError:
        print "[*] Domain '{}' is already registered".format(DOMAIN)

def register_workflow(session):
    try:
        response = session.register_workflow_type(
            domain=DOMAIN,
            name=WORKFLOW,
            version='1.0',
            description='Test workflow for learning SWF',
        )
        print "[*] Registered workflow type '{}'".format(WORKFLOW)
    except ClientError:
        print "[*] Workflow Type '{}' is already registered".format(WORKFLOW)

def register_activities(session):
    activities = ['make_lowercase', 'store_in_db']
    for act in activities:
        try:
            response = session.register_activity_type(
                domain=DOMAIN,
                name=act,
                version='1.0',
            )
            print "[*] Registered activity type '{}'".format(act)
        except ClientError:
            print "[*] Activity Type '{}' already registered".format(act)

if __name__ == '__main__':
session = init_aws_session('swf')
    register_domain(session)
    register_workflow(session)
    register_activities(session)

Adding tasks to the tasklist

This initially threw me off because I was thinking this would be pretty self explanatory; however it is not. TaskList actually relates to workflow executions so in order to add a task to the task list you need to start an execution.

def create_task(session):
    """
    Starts a workflow execution (i.e creates a task)
    """
    response = session.start_workflow_execution(
        domain=DOMAIN,
        workflowId=str(uuid.uuid4()),
        workflowType={
            'name': WORKFLOW,
            'version': '1.0',
        },
        taskList={
            'name': TASKLIST,
        },
        executionStartToCloseTimeout='600',
        taskStartToCloseTimeout='600',
        childPolicy='TERMINATE',
        input='HeLlO wOrLd'
    )
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return True

Implement decider

A decider is an implementation of the coordination logic of your workflow type that runs during the execution of your workflow. You can run multiple deciders for a single workflow type.

The decider polls the taskList and then allows you to define your logic of what activities can be run on it. Below is a simple implementation with the task creation method baked in for testing.

#!/usr/bin/env python
"""
AWS SWF Decider - Python
"""
from botocore.exceptions import ClientError
from botocore.vendored.requests.exceptions import ReadTimeout
from boto3.session import Session
from creds import init_aws_session, DOMAIN, WORKFLOW, TASKLIST
import boto3, uuid

def create_task(session):
    """
    Starts a workflow execution (i.e creates a task)
    """
    response = session.start_workflow_execution(
        domain=DOMAIN,
        workflowId=str(uuid.uuid4()),
        workflowType={
            'name': WORKFLOW,
            'version': '1.0',
        },
        taskList={
            'name': TASKLIST,
        },
        executionStartToCloseTimeout='600',
        taskStartToCloseTimeout='600',
        childPolicy='TERMINATE',
        input='HeLlO wOrLd'
    )
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return True

def poll_for_tasks(session):
    """
    Polls for new workflow execution and handles the passing to
    different workers
    """
    print "Starting poll on tasklist '{}'".format(TASKLIST)
    response = session.poll_for_decision_task(
        domain=DOMAIN,
        taskList={
            'name': TASKLIST,
        },
        identity='decider-1',
    )
    return response

def decider(session, response):
    """
    Handle Dispatching to Activity Worker
    and overall process flow
    """
    if 'taskToken' not in response:
        print "Poll timed out without returning a task"
        return False

    elif 'events' in response:

        # Get Event History and Last Event
        event_history = [event for event in response['events'] if not event['eventType'].startswith('Decision')]
        last_event = event_history[-1]

        # Start once Workflow Executed
        if last_event['eventType'] == 'WorkflowExecutionStarted':

            # Get initial input
            my_input = last_event['workflowExecutionStartedEventAttributes']['input']

            # Send input to first worker
            session.respond_decision_task_completed(
                taskToken=response['taskToken'],
                decisions=[
                    {
                        'decisionType': 'ScheduleActivityTask',
                        'scheduleActivityTaskDecisionAttributes': {
                            'activityType': {
                                'name': 'make_lowercase',
                                'version': '1.0',
                            },
                            'activityId': 'activityid-{}'.format(str(uuid.uuid4())),
                            'control': 'MakeLowerCase',
                            'scheduleToCloseTimeout': 'NONE',
                            'scheduleToStartTimeout': 'NONE',
                            'startToCloseTimeout': 'NONE',
                            'heartbeatTimeout': 'NONE',
                            'taskList': {
                                'name': TASKLIST,
                            },
                            'input': my_input
                        }
                    }
                ]
            )

        # Pickup and dispatch to the appropriate worker
        elif last_event['eventType'] == 'ActivityTaskCompleted':

            # Get the last activity ran
            completed_activity_id = last_event['activityTaskCompletedEventAttributes']['scheduledEventId'] - 1
            activity_data = response['events'][completed_activity_id]
            activity_attrs = activity_data['activityTaskScheduledEventAttributes']
            activity_name = activity_attrs['activityType']['name']

            # Get the result from the last activity
            result = last_event['activityTaskCompletedEventAttributes'].get('result')

            # Determine Next Activity
            next_activity = None
            if activity_name == 'make_lowercase':
                next_activity = 'store_in_db'
            # Here you can define your "coordination logic"
            # Per docs at http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-dev-deciders.html#swf-dg-apply-coord-logic

            # Send data to next activity
            if next_activity is not None:
                session.respond_decision_task_completed(
                    taskToken=response['taskToken'],
                    decisions=[
                        {
                            'decisionType': 'ScheduleActivityTask',
                                'scheduleActivityTaskDecisionAttributes': {
                                    'activityType': {
                                        'name': next_activity,
                                        'version': '1.0',
                                    },
                                    'activityId': 'activityid-{}'.format(str(uuid.uuid4())),
                                    'scheduleToCloseTimeout': 'NONE',
                                    'scheduleToStartTimeout': 'NONE',
                                    'startToCloseTimeout': 'NONE',
                                    'heartbeatTimeout': 'NONE',
                                    'taskList': {
                                        'name': TASKLIST,
                                    },
                                    'input': result
                                }
                        }
                    ]
                )
                print "Task passed to worker '{}'".format(next_activity)

            # Otherwise Complete workflow
            else:
                session.respond_decision_task_completed(
                    taskToken=response['taskToken'],
                    decisions=[
                        {
                            'decisionType': 'CompleteWorkflowExecution',
                            'completeWorkflowExecutionDecisionAttributes': {
                                'result': 'success'
                            }
                        }
                    ]
                )
                print "Workflow Completed!"

if __name__ == '__main__':
    swf = init_aws_session('swf')
    if create_task(swf):
        while True:
            try:
                response = poll_for_tasks(swf)
                decider(swf, response)
            except ReadTimeout:
                print "Read Timeout while polling..."
                pass

Above we are continuously polling for new workflow executions and then passing them to the ‘make_lowercase’ worker once a new execution is found. After the first activity has completed we then determine the next activity based on the past activity and move forward otherwise we mark the task as completed.

Our activity worker

So we have seen how our process flow is managed with the decider but what about actually working with the data and doing things? This is done with what is known as an “Activity Worker”. Below is a fairly simple one that handles both activities “make_lowercase’” and “store_in_db”

#!/usr/bin/env python
"""
AWS SWF Worker - Example
"""
from botocore.exceptions import ClientError
from botocore.vendored.requests.exceptions import ReadTimeout
from boto3.session import Session
from creds import init_aws_session, DOMAIN, TASKLIST
import boto3, uuid

def make_lowercase(text):
    """
    A useless function
    """
    return text.lower()

def store_in_db(text):
    """
    Mock DynamoDB Put Method that just lowercases
    and doesn't actually store data in a db
    """
    return text.lower()

def poll_for_tasks(session):
    """
    Poll for activity task
    """
    print "Polling for activity tasks on '{}'".format(TASKLIST)
    task = session.poll_for_activity_task(
        domain=DOMAIN,
        taskList={
            'name': TASKLIST,
        },
        identity='worker-1'
    )

    if 'taskToken' not in task:
        print "Poll timed out no new task"

    else:

        # Define a map of our activities to methods/funcs
        if 'activityId' in task:
            activity = {
                'make_lowercase': make_lowercase,
                'store_in_db': store_in_db,
            }.get(task['activityType']['name'])
            new_string = activity(task.get('input'))

            # Let the decider know we are done with said task
            session.respond_activity_task_completed(
                taskToken=task['taskToken'],
                result=new_string
            )
            print "Completed task"
        else:
            print "Unable to find activityId"

if __name__ == '__main__':
    swf = init_aws_session('swf')
    while True:
        try:
            poll_for_tasks(swf)
        except ReadTimeout:
            pass

This was a good two days of hacking around and reading documentation to get this to work. Hopefully this spells out the bare minimum of what SWF does, and how you can get started with it in Python.

Write a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.