Saturday, 12 August 2017

Word clouds: Text analysis for interesting patterns

This post utilises text analysis to identify trends in popular web pages. Of interest are home pages of news sites, Twitter pages of popular personalities and other sites where content changes frequently. Analysing words and projecting to a cloud provides a visual representation of text data which in turn helps to quickly perceive overall changes in trends and interests. Here the cloud is generated on content from Saturday and Sunday.

About the application:

The web application generates a word cloud for each submitted link at a given time. The application first generates a word count for each page using natural language tool kit and stores the results in a database. The counts are converted to a word cloud using opensource javascript libraries. Web links can be added to the application as needed. The word count task is performed asynchronously using celery worker nodes. 

Software built using:

Django1.10/Python3.5, Asynchronous tasks using Celery 4.1.0, Javascript, Natural Language Processing Tool Kit (NLTK), Postgres Database. 3 hosts. One for web application, one for celery workers and Database and third for the message broker. 

Home pages analysed:

RT.com
BBC.com
Google News
Fox News
Indian Prime Minister Twitter Page
Obama Twitter page
President Trump Twitter Page

Celery Django Architecture

>> Word cloud on Saturday to left and Sunday to right

RT.com

BBC.com

Fox News

Google News

Obama


 President Trump and  the Indian Prime Minister Twitter Handles

















Tuesday, 20 June 2017

Band-aid for profiling in a hurry

This post profiles two solutions to a problem to see which one has low latency. As it will be obvious the better solution employs a simple technique and requires no explicit confirmation that it returns faster. Quickly profiling does not hurt either.

The objective is to consecutively calculate the n-th fibonacci where each n is taken from a sequence. Solution 1 is a recursive algorithm. This divides the problem into sub-problems and builds the final solution by combining solutions to individual sub-problems. This solution does not make any other attempts at gaining faster speeds. Solution 2 is also recursive but, uses a map to hold the results of sub-problems encountered during the calculation. That way for each subsequent n in the sequence, the results of sub-problems from the previous run are re-used. The same can be achieved with memoization.

As seen in the profiling results Solution 2 not only saves execution cycles but also saves itself from the overhead of context switching into the sub-problems.

Solution 1 without state


Solution 2 (with state in a class)

Now we profile the two solutions by calculating over a list of 10 numbers and the results are shown below.


Results

Monday, 15 May 2017

Sharded database on a MongoDB cluster

MongoDB is a NoSQL document store. As with other NoSQL databases there are a number of advantages to it including dynamic schema, scalability/performance and no need for ORM layer. A production deployment of MongoDb should be clustered. This will increase scalability and durability for the data. For high volume applications the number of disk operations and network transfers can overwhelm a single node. Example is any back-end for mobile apps with millions of users. On a single node memory is also a precious resource that can become scarce. Finally the whole data set can be too large to fit in one node or one set of nodes. Sharding helps to address these issues. 

This post shows the setup of a sharded cluster and checking the behavior when the primary in a replicaset fails.

A sharded MongoDB topology looks like the following.
There are at least 3 config servers, 1 router and then the shards themselves. Each shard should ideally include at least 3 nodes/servers. One of the nodes in the shard will be elected primary and data replicates to the others in a shard. There can be many shards depending on data set size. The router routes a query to a particular shard after consulting the config servers. So on a minimum there needs to be 7 hosts for a basic sharded cluster. These have to be separate and not vms on the same machine. If these are vms on a single big server in production then we are not addressing the issues that led to the move towards sharding. However the post used vms all on ssd. The setup is described as below.

Setup details

Build 7 vms or configure separate hosts (recommended) with the following. Either an available image can be used or clone using virtualbox.

Config server
192.168.56.30
192.168.56.31
192.168.56.32

Shard's data nodes
192.168.56.40
192.168.56.41
192.168.56.42

Router
192.168.56.20

Each node runs on 64-bit Ubuntu Server 16.04 and uses MongoDB 3.4.1.

Database used is the MoT dataset from https://data.gov.uk/dataset/anonymised_mot_test

Config servers

A) Start the config servers one by one. Log into the config server and run the following command.
mongod --configsvr --replSet config_set1 --dbpath ~/mongo-data --port 27019

In the version of MongoDB used the config servers themselves are a replicaset. mongo-data is the path for storing the data files. Do this for each config server.

Log into one of the config servers in mongo and initialise them as shown:


Shard nodes

B)  On each node in the shard start the mongo daemon as
mongod --shardsvr --dbpath ./mongo-data/ --replSet data_rs_1

Log into one of the data nodes and initialise them as shown


Router

C) Start the router. On the router node 

mongos --configdb config_set1/192.168.56.30:27019,192.168.56.31:27019,192.168.56.32:27019

Adding shards

D) Connect to the router and add the shard to the cluster.


View the config server to confirm that the shard was added. This shows up in the SHARDING logs.


Shard the database

E) Enable sharding for the database and collection in that order. Again connect to the router to do the following.

mongo> sh.enableSharding("mot")

Then enable sharding for the collection. Also specify the key field that needs to be used for distributing the data between shards.


F) Check the shard status. On the router do a sh.status(). This should look like


Verify

G) Connect a client to the router and populate the database. After this the shard status also shows chunks of data.


H) Check the config on the cluster. Connect MongoDB Compass to the router and look into the config db.



Note that an index is created on the DB for the shard key. This is shown below.


Electing a primary on failure:


Setting the above up can be time consuming but it is all worth it as when the primary node of the shard is killed, one of the other data nodes assumes the primary position through an election. So we go and kill the primary node process. This result in heart beat failure and after waiting for a specific time frame one of the secondaries assumes primary position. This is shown below.

The primary was node 192.168.56.40.


On failure of that 192.168.56.42 is elected as primary. This shows up on the 40 node when it comes back online.



Although queries during primary election can fail it can be caught in exceptions and tired again in the application. This requires no mentioning but the daemons should be run as service.

Reference:

MongoDB docs at

https://docs.mongodb.com/manual/tutorial/deploy-shard-cluster/

This can be a bit outdated but is a good read too
https://www.digitalocean.com/community/tutorials/how-to-create-a-sharded-cluster-in-mongodb-using-an-ubuntu-12-04-vps

Saturday, 22 April 2017

Distributed Processing: Python celery for 1 M tasks

This post is on python distributed processing using celery and a basic profiling of the same. Also some notes on celery is posted at the end. 

Celery is a distributed task queue system in Python. While custom multiprocessing code can be written, for well defined tasks it is better to leverage a framework than re-invent. Celery works together with a message broker from where enqueued tasks are consumed.

Producers of tasks are programs that want one or more tasks done. Workers are consumers of (execute) these tasks. The tasks are submitted to a message broker like Rabbit-MQ. Workers take the tasks from the broker and execute them. The results of tasks can either be ignored or stored some where. Usually this is memcached or a database.

Versions used from pip in virtual env:

celery==3.1.24
flower==0.9.1
python-memcached==1.58

Ubuntu Server 16.04  on which workers run has 8 cores and 16 GB RAM.
Message broker runs on vm with 4 cores and ~ 5 GB RAM. 

For this post 1 Million tasks are run on 2 worker processes. Each worker has 3 queues and the particular task is routed to a particular queue via configuration. Each task is to calculate the nth fibonacci using a recursive algorithm. Since this algorithm does not use any optimization techniques it takes a while to run and simulates a cpu intensive task. Thus demanding distribution of load!

- Message broker is rabbit-mq server and runs on a different virtual machine.
- Workers run on another machine as a service. Named w1 and w2.

Concurrency is set to default which is the number of processors i.e 8 on the machine. A screen of the processors being used is immediately visible as shown below.



Three queues on the workers are shown below. Celery has not yet got the notion of task priorities so color coded queues are used here. The names of the queues can be anything as long as each queue is being used for a particular subset of tasks based on cpu intensive, io driven and the like.



Flower is a tool used to monitor celery tasks. It shows details per worker and also graphs in a monitor page. The flower status page is shown below after start.



10 different threads are used to post 1 M fibonacci tasks. Ids of submitted tasks are stored and results later retrieved based on these.  The same screen after 1 M tasks have finished is shown below.

Profiling:
At the point where all the tasks had succeeded cProfile run at the client shows

The function calls also include calls for most of the results but not all.

Notes:

1. Celery version 4.0.2 has an issue which causes the workers to crash after a restart and that too when there are pending messages in the message queue. 4.0.2 from PyPi exhibited this issue and so 3.1.24 is used. This issue is described in more detail here at https://github.com/celery/celery/pull/3752. There are a number of related issues too. However there is a merge for the same but is not yet available from PyPi at the time of this writing.

2. For a source structure <project>/src/<celery package with the Celery app>, the worker or the service needs to be triggered from the folder src (in this case).

3. Each worker can be configured to take tasks from a particular queue. 

4. By running workers on multiple virtual machines, the solution becomes more distributed.

Celery project links:
1. http://docs.celeryproject.org/en/latest/index.html
2. https://github.com/celery/celery/