Monday, 15 May 2017

Sharded database on a MongoDB cluster

MongoDB is a NoSQL document store. As with other NoSQL databases there are a number of advantages to it including dynamic schema, scalability/performance and no need for ORM layer. A production deployment of MongoDb should be clustered. This will increase scalability and durability for the data. For high volume applications the number of disk operations and network transfers can overwhelm a single node. Example is any back-end for mobile apps with millions of users. On a single node memory is also a precious resource that can become scarce. Finally the whole data set can be too large to fit in one node or one set of nodes. Sharding helps to address these issues. 

This post shows the setup of a sharded cluster and checking the behavior when the primary in a replicaset fails.

A sharded MongoDB topology looks like the following.
There are at least 3 config servers, 1 router and then the shards themselves. Each shard should ideally include at least 3 nodes/servers. One of the nodes in the shard will be elected primary and data replicates to the others in a shard. There can be many shards depending on data set size. The router routes a query to a particular shard after consulting the config servers. So on a minimum there needs to be 7 hosts for a basic sharded cluster. These have to be separate and not vms on the same machine. If these are vms on a single big server in production then we are not addressing the issues that led to the move towards sharding. However the post used vms all on ssd. The setup is described as below.

Setup details

Build 7 vms or configure separate hosts (recommended) with the following. Either an available image can be used or clone using virtualbox.

Config server
192.168.56.30
192.168.56.31
192.168.56.32

Shard's data nodes
192.168.56.40
192.168.56.41
192.168.56.42

Router
192.168.56.20

Each node runs on 64-bit Ubuntu Server 16.04 and uses MongoDB 3.4.1.

Database used is the MoT dataset from https://data.gov.uk/dataset/anonymised_mot_test

Config servers

A) Start the config servers one by one. Log into the config server and run the following command.
mongod --configsvr --replSet config_set1 --dbpath ~/mongo-data --port 27019

In the version of MongoDB used the config servers themselves are a replicaset. mongo-data is the path for storing the data files. Do this for each config server.

Log into one of the config servers in mongo and initialise them as shown:


Shard nodes

B)  On each node in the shard start the mongo daemon as
mongod --shardsvr --dbpath ./mongo-data/ --replSet data_rs_1

Log into one of the data nodes and initialise them as shown


Router

C) Start the router. On the router node 

mongos --configdb config_set1/192.168.56.30:27019,192.168.56.31:27019,192.168.56.32:27019

Adding shards

D) Connect to the router and add the shard to the cluster.


View the config server to confirm that the shard was added. This shows up in the SHARDING logs.


Shard the database

E) Enable sharding for the database and collection in that order. Again connect to the router to do the following.

mongo> sh.enableSharding("mot")

Then enable sharding for the collection. Also specify the key field that needs to be used for distributing the data between shards.


F) Check the shard status. On the router do a sh.status(). This should look like


Verify

G) Connect a client to the router and populate the database. After this the shard status also shows chunks of data.


H) Check the config on the cluster. Connect MongoDB Compass to the router and look into the config db.



Note that an index is created on the DB for the shard key. This is shown below.


Electing a primary on failure:


Setting the above up can be time consuming but it is all worth it as when the primary node of the shard is killed, one of the other data nodes assumes the primary position through an election. So we go and kill the primary node process. This result in heart beat failure and after waiting for a specific time frame one of the secondaries assumes primary position. This is shown below.

The primary was node 192.168.56.40.


On failure of that 192.168.56.42 is elected as primary. This shows up on the 40 node when it comes back online.



Although queries during primary election can fail it can be caught in exceptions and tired again in the application. This requires no mentioning but the daemons should be run as service.

Reference:

MongoDB docs at

https://docs.mongodb.com/manual/tutorial/deploy-shard-cluster/

This can be a bit outdated but is a good read too
https://www.digitalocean.com/community/tutorials/how-to-create-a-sharded-cluster-in-mongodb-using-an-ubuntu-12-04-vps

Saturday, 22 April 2017

Distributed Processing: Python celery for 1 M tasks

This post is on python distributed processing using celery and a basic profiling of the same. Also some notes on celery is posted at the end. 

Celery is a distributed task queue system in Python. While custom multiprocessing code can be written, for well defined tasks it is better to leverage a framework than re-invent. Celery works together with a message broker from where enqueued tasks are consumed.

Producers of tasks are programs that want one or more tasks done. Workers are consumers of (execute) these tasks. The tasks are submitted to a message broker like Rabbit-MQ. Workers take the tasks from the broker and execute them. The results of tasks can either be ignored or stored some where. Usually this is memcached or a database.

Versions used from pip in virtual env:

celery==3.1.24
flower==0.9.1
python-memcached==1.58

Ubuntu Server 16.04  on which workers run has 8 cores and 16 GB RAM.
Message broker runs on vm with 4 cores and ~ 5 GB RAM. 

For this post 1 Million tasks are run on 2 worker processes. Each worker has 3 queues and the particular task is routed to a particular queue via configuration. Each task is to calculate the nth fibonacci using a recursive algorithm. Since this algorithm does not use any optimization techniques it takes a while to run and simulates a cpu intensive task. Thus demanding distribution of load!

- Message broker is rabbit-mq server and runs on a different virtual machine.
- Workers run on another machine as a service. Named w1 and w2.

Concurrency is set to default which is the number of processors i.e 8 on the machine. A screen of the processors being used is immediately visible as shown below.



Three queues on the workers are shown below. Celery has not yet got the notion of task priorities so color coded queues are used here. The names of the queues can be anything as long as each queue is being used for a particular subset of tasks based on cpu intensive, io driven and the like.



Flower is a tool used to monitor celery tasks. It shows details per worker and also graphs in a monitor page. The flower status page is shown below after start.



10 different threads are used to post 1 M fibonacci tasks. Ids of submitted tasks are stored and results later retrieved based on these.  The same screen after 1 M tasks have finished is shown below.

Profiling:
At the point where all the tasks had succeeded cProfile run at the client shows

The function calls also include calls for most of the results but not all.

Notes:

1. Celery version 4.0.2 has an issue which causes the workers to crash after a restart and that too when there are pending messages in the message queue. 4.0.2 from PyPi exhibited this issue and so 3.1.24 is used. This issue is described in more detail here at https://github.com/celery/celery/pull/3752. There are a number of related issues too. However there is a merge for the same but is not yet available from PyPi at the time of this writing.

2. For a source structure <project>/src/<celery package with the Celery app>, the worker or the service needs to be triggered from the folder src (in this case).

3. Each worker can be configured to take tasks from a particular queue. 

4. By running workers on multiple virtual machines, the solution becomes more distributed.

Celery project links:
1. http://docs.celeryproject.org/en/latest/index.html
2. https://github.com/celery/celery/