In celery, it is very easy to chain and parallelize execution of tasks, e.g. to satisfy this example workflow...
_____
|task1| 1. exec task1
/ \
__/__ __\__
|task2| |task3| 2. parallel task1 & task2
\ /
\ _____/
|task4| 3. exec task4 when both task1 & task2 are done
from celery import chain, group, chord
from orchestrator.tasks.host_tasks import acquire_hosts, prepare_hosts, return_hosts
from orchestrator.tasks.job_tasks import start_job, stop_job
from orchestrator.tasks.vcenter_tasks import acquire_vcenter, return_vcenter
def start_performance_run_workflow(job_id):
print('Starting performance run workflows for job %s' % job_id)
# The effect of this chain, is that only the first one is routed
# to the "regular.priority" queue
# subsequent tasks are routed to "celery" default queue
workflow = chain(
acquire_hosts.si(job_id),
chord(
(prepare_hosts.si(job_id), acquire_vcenter.si(job_id)),
start_job.si(job_id)
)
).apply_async(queue="regular.priority")
No comments:
Post a Comment