Saturday 22 April 2017

Distributed Processing: Python celery for 1 M tasks

This post is on python distributed processing using celery and a basic profiling of the same. Also some notes on celery is posted at the end. 

Celery is a distributed task queue system in Python. While custom multiprocessing code can be written, for well defined tasks it is better to leverage a framework than re-invent. Celery works together with a message broker from where enqueued tasks are consumed.

Producers of tasks are programs that want one or more tasks done. Workers are consumers of (execute) these tasks. The tasks are submitted to a message broker like Rabbit-MQ. Workers take the tasks from the broker and execute them. The results of tasks can either be ignored or stored some where. Usually this is memcached or a database.

Versions used from pip in virtual env:

celery==3.1.24
flower==0.9.1
python-memcached==1.58

Ubuntu Server 16.04  on which workers run has 8 cores and 16 GB RAM.
Message broker runs on vm with 4 cores and ~ 5 GB RAM. 

For this post 1 Million tasks are run on 2 worker processes. Each worker has 3 queues and the particular task is routed to a particular queue via configuration. Each task is to calculate the nth fibonacci using a recursive algorithm. Since this algorithm does not use any optimization techniques it takes a while to run and simulates a cpu intensive task. Thus demanding distribution of load!

- Message broker is rabbit-mq server and runs on a different virtual machine.
- Workers run on another machine as a service. Named w1 and w2.

Concurrency is set to default which is the number of processors i.e 8 on the machine. A screen of the processors being used is immediately visible as shown below.



Three queues on the workers are shown below. Celery has not yet got the notion of task priorities so color coded queues are used here. The names of the queues can be anything as long as each queue is being used for a particular subset of tasks based on cpu intensive, io driven and the like.



Flower is a tool used to monitor celery tasks. It shows details per worker and also graphs in a monitor page. The flower status page is shown below after start.



10 different threads are used to post 1 M fibonacci tasks. Ids of submitted tasks are stored and results later retrieved based on these.  The same screen after 1 M tasks have finished is shown below.

Profiling:
At the point where all the tasks had succeeded cProfile run at the client shows

The function calls also include calls for most of the results but not all.

Notes:

1. Celery version 4.0.2 has an issue which causes the workers to crash after a restart and that too when there are pending messages in the message queue. 4.0.2 from PyPi exhibited this issue and so 3.1.24 is used. This issue is described in more detail here at https://github.com/celery/celery/pull/3752. There are a number of related issues too. However there is a merge for the same but is not yet available from PyPi at the time of this writing.

2. For a source structure <project>/src/<celery package with the Celery app>, the worker or the service needs to be triggered from the folder src (in this case).

3. Each worker can be configured to take tasks from a particular queue. 

4. By running workers on multiple virtual machines, the solution becomes more distributed.

Celery project links:
1. http://docs.celeryproject.org/en/latest/index.html
2. https://github.com/celery/celery/