9.08.2017

Celery Tasks Parallel and Chained Execution Workflow

In celery, it is very easy to chain and parallelize execution of tasks, e.g. to satisfy this example workflow...
      _____
     |task1|      1. exec task1
    /      \
 __/__    __\__
|task2|  |task3|  2. parallel task1 & task2
   \        /
    \ _____/
     |task4|      3. exec task4 when both task1 & task2 are done

from celery import chain, group, chord
from orchestrator.tasks.host_tasks import acquire_hosts, prepare_hosts, return_hosts
from orchestrator.tasks.job_tasks import start_job, stop_job
from orchestrator.tasks.vcenter_tasks import acquire_vcenter, return_vcenter


def start_performance_run_workflow(job_id):
    print('Starting performance run workflows for job %s' % job_id)
    # The effect of this chain, is that only the first one is routed
    # to the "regular.priority" queue
    # subsequent tasks are routed to "celery" default queue
    workflow = chain(
        acquire_hosts.si(job_id),
        chord(
            (prepare_hosts.si(job_id), acquire_vcenter.si(job_id)),
            start_job.si(job_id)
        )
    ).apply_async(queue="regular.priority")

No comments:

Post a Comment