Spark cluster on OpenStack with multi-user Jupyter Notebook

Spark on OpenStack with Jupyter

Apache Spark is gaining traction as the defacto analysis suite for big data, especially for those using Python. Spark has a rich API for Python and several very useful built-in libraries like MLlib for machine learning and Spark Streaming for realtime analysis. Jupyter (formerly IPython Notebook) is a convenient interface to perform exploratory data analysis and all kinds of other analytic tasks using Python. In this post I’ll show step-by-step how to set up a Spark cluster on OpenStack and configure Jupyter with multi-user access and an easy-to-use PySpark profile.

Setting up a Spark cluster on OpenStack VMs

To get a Spark standalone cluster up and running, all you need to do is spawn some VMs and start Spark as master on one of them and slave on the others. They will automatically form a cluster that you can connect to from Python, Java and Scala applications using the IP address of the master node. Sounds easy enough, right? But there are some pitfalls, so read on for tips on how to avoid them.

To create the first VM to be used as master node, I use the OpenStack command line client. We’ll get that node up and running first. The distribution is Ubuntu 14.04 Trusty. Cloud-init is an easy way to perform bootstrapping of nodes to get the necessary software installed and set up. To install and start a Spark master node I use the following Cloud-init script:

#!/bin/bash
#
# Cloud-init script to get Spark Master up and running
#
SPARK_VERSION="1.5.0"
APACHE_MIRROR="apache.uib.no"
LOCALNET="10.20.30.0/24"

# Firewall setup
ufw allow from $LOCALNET
ufw allow 80/tcp
ufw allow 443/tcp
ufw allow 4040:4050/tcp
ufw allow 7077/tcp
ufw allow 8080/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Configure Spark master
cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
sed -i 's/# - SPARK_MASTER_OPTS.*/SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4 -Dspark.executor.memory=2G"/' /opt/spark/conf/spark-env.sh

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts

# Start Spark Master with IP address of eth0 as the address to use
/opt/spark/sbin/start-master.sh -h $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/)

Save this as init-spark-master.sh for use with the OpenStack command line client. The script first adds some firewall rules to allow access to the different components and installs the OpenJDK dependency. Next, a Spark tarball is downloaded, unpacked and made available under /opt/spark on the host. The tarball is prepackaged with Hadoop v1 libraries (note the “hadoop1.tgz” suffix), so adjust this if you need Hadoop v2 instead.

The only configuration of Spark we need at this point is to set the options “spark.deploy.defaultCores” and “spark.executor.memory”. They are used to configure how much resources each application will get when it starts. Since the goal is to set up a multi-user environment with Jupyter notebooks, we need to limit the total amount of CPU cores and RAM that each notebook will use. Each notebook is an “application” on the cluster for as long as the notebook is active (i.e until it is shutdown by the user). If we don’t limit the resource allocation, the first notebook created will allocate all available CPU cores on each worker, leaving no CPU cores free for the next user. In addition, the default RAM allocation for each app is only 512 MB on each worker node, which might be a bit too small, so we bump that up to 2 GB.

The echo line adds “spark-master” to /etc/hosts with a reference to the IP address of the VM. Spark tries to resolve the local hostname on startup. Without a resolvable hostname you might encounter “Name or service not known”-errors, resulting in Java exceptions and exits.

On the last line the Spark master process is started. The master process is given the IP address of the local host as an argument to make sure it binds to the correct interface. The IP address is extracted from the output of the “ip addr” command.

One way to launch the master VM with the Cloud-init script is like this:

# Install OpenStack client if not present already
sudo apt-get -y install python-openstackclient

# Customize these values to match your OpenStack cluster
OS_IMAGE="f9c9b0dc-6407-4ac2-ad0e-45cb4e47bb01"
OS_NETID="df7cc182-8794-4134-b700-1fb8f1fbf070"
OS_KEYNAME="arnes"

# Create Spark master VM
openstack server create --flavor m1.medium --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-master.sh spark-master

If you rather prefer using the web UI (Horizon), you could just as easily paste the Cloud-init script into the text box on the Post-Creation tab of the Launch Instance dialog and archieve the same result.

It will take some minutes for the Spark master VM to finish bootstrapping. When it’s done the Spark master UI will be available on port 8080. Remember to associate a floating IP to the VM to be able to access it from outside the OpenStack project:

openstack ip floating add 10.1.1.1 spark-master

Verify that the Spark master UI is reachable and displays metadata about the cluster. If the UI is not reachable, first check that your Security Group rules allow port 8080 to the Spark master VM. Second, check the Cloud-init logs on the VM to ensure all parts of the initialization succeeded. You’ll find the Cloud-init log file on the VM as /var/log/cloud-init.log and the output from the Cloud-init script in /var/log/cloud-init-output.log. You can also try to re-run parts of the Cloud-init script with sudo to narrow down any issues with the initialization. When initialization succeeds the Spark master UI will look like this:

Spark master UI with no workers

As expected there are no workers alive yet, so let’s initialize some. To do so we use a slightly modified version of the Cloud-init script above. The main difference is the startup command, which is now /opt/spark/sbin/start-slave.sh with the address to the master as the only argument. Remember to adjust the variables below to your IP range and master IP address.

#!/bin/bash
#
# Cloud-init script to get Spark Worker up and running
#
SPARK_VERSION="1.5.0"
APACHE_MIRROR="apache.uib.no"
LOCALNET="10.20.30.0/24"
SPARK_MASTER_IP="10.20.30.178"

# Firewall setup
ufw allow from $LOCALNET
ufw allow 8081/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts
# Start Spark worker with address of Spark master to join cluster 
/opt/spark/sbin/start-slave.sh spark://$SPARK_MASTER_IP:7077

Save this as init-spark-worker.sh. Notice how the last line starts up a slave with the address to the cluster master on port 7077 as the only argument. Since the Cloud-init script has no worker-specific details, it is easy to expand the cluster just by creating more worker VMs initialized with the same Cloud-init script. Let’s create the first worker:

# Customize these values to match your OpenStack cluster
OS_IMAGE="f9c9b0dc-6407-4ac2-ad0e-45cb4e47bb01"
OS_NETID="df7cc182-8794-4134-b700-1fb8f1fbf070"
OS_KEYNAME="arnes"

# Create first Spark worker VM, this time with flavor m1.large
openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker1

Pause for a moment to let the worker creation process finish to ensure that Cloud-init does the necessary work without errors. There is no point in initializing more workers until the process is proven to work as expected. Again, it can be useful to check /var/log/cloud-init.log and /var/log/cloud-init-output.log on the new VM to verify that Cloud-init does what it’s supposed to do. On success you’ll see the worker in the Spark master UI:

Spark master UI with one worker registered

Create some more worker nodes to scale the cluster to handle more parallel tasks:

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker2

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker3

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker4

Verify that the new worker nodes show up in the Spark master UI before continuing.

Installing Jupyter and JupyterHub

A shiny new Spark cluster is fine, but we also need interfaces to be able to use it. Spark comes prepackaged with shells for Scala and Python where connection to a cluster is already set up. The same level of usability is possible to get with Jupyter (formerly IPython Notebook), so that when you open a new notebook a connection to the Spark cluster (a SparkContext) is established for you. The SparkContext is available through the variable “sc” in the notebook, ready to use by calling sc.textFile() to create an RDD, for instance.

JupyterHub is a multi-user server for Jupyter notebooks. That makes it possible for several users to use Jupyter independently and have their own notebooks and files in their home directory instead of a shared storage directory for all notebooks. However, this requires that each user has a user account on the VM where JupyterHub is running. Add user accounts for relevant users now if needed. JupyterHub uses unix authentication, meaning that it relays the username and password to the underlying authentication system on the VM for credential check.

In this deployment JupyterHub is installed on the Spark master VM and launched there. It could run on a separate VM, but there is normally no need for that since the Spark master process does not require that much resources. The VM where Jupyter notebooks are executed are called the “driver” in Spark, and that will require some processing power and memory use, depending on the use case.

SSH into the Spark master VM and run the following set of commands:

# Install pip3 and other dependencies
sudo apt-get -y install python3-pip npm nodejs-legacy
sudo npm install -g configurable-http-proxy

# Install JupyterHub and Jupyter
sudo pip3 install jupyterhub
sudo pip3 install "ipython[notebook]"

pip3 is used instead of pip because JupyterHub depends on Python >= 3.3. After installing all software and dependencies, start the JupyterHub service:

sudo jupyterhub --port 80

The benefit of having JupyterHub listen on port 80 instead of the default port 8000 should be obvious, but it requires that you start the service as root. In addition you might want to look into securing JupyterHub with an SSL certificate and have it listen on port 443, since it asks for passwords when users log in. When you have the necessary certificate and keys on the VM, the service can be started like this instead:

sudo jupyterhub --port 443 --ssl-key hub.key --ssl-cert hub.pem

Now try to open the JupyterHub login page on the floating IP address of the VM and log in. After login you should be greeted with an empty home directory with no notebooks. A new notebook can be created by clicking “New” on the right above the notebook list.

Jupyter - notebook list empty

If you create a new notebook, you’ll notice that the only supported kernel is Python3 at the moment. We need to add PySpark to that list to be able to use the Spark cluster from Jupyter.

Configuring Jupyter for PySpark

Jupyter relies on kernels to execute code. The default kernel is Python, but many other languages can be added. To use the Spark cluster from Jupyter we add a separate kernel called PySpark. In addition, kernels can run specific commands on startup, which in this case is used to initialize the SparkContext. First, some dependencies need to be installed:

sudo apt-get -y install python-dev python-setuptools
sudo easy_install pip
sudo pip install py4j
sudo pip install "ipython[notebook]"

It might seem odd to install ipython[notebook] as a dependency, but the reason is that IPython/Jupyter contains a number of Python support modules that kernels rely on. Previously when we installed using pip3, we got the Python3 versions of those modules. When installing again with pip, we get Python2 versions. PySpark depends on Python2.

To add PySpark as a kernel, a file containing a kernel definition must be created. Kernel definitions are JSON files in a specific directory. Kernels can either be enabled globally for all users or for one user only, depending on where the definition file is placed. We want the PySpark kernel to be available for all users, so we’ll add it under /usr/local/share/jupyter/kernels/ like this:

sudo mkdir -p /usr/local/share/jupyter/kernels/pyspark/
cat <<EOF | sudo tee /usr/local/share/jupyter/kernels/pyspark/kernel.json
{
 "display_name": "PySpark",
 "language": "python",
 "argv": [
  "/usr/bin/python2",
  "-m",
  "ipykernel",
  "-f",
  "{connection_file}"
 ],
 "env": {
  "SPARK_HOME": "/opt/spark/",
  "PYTHONPATH": "/opt/spark/python/:/opt/spark/python/lib/py4j-0.8.2.1-src.zip",
  "PYTHONSTARTUP": "/opt/spark/python/pyspark/shell.py",
  "PYSPARK_SUBMIT_ARGS": "--master spark://10.20.30.178:7077 pyspark-shell"
 }
}
EOF

This kernel definition ensures that the Spark built-in “pyspark-shell” is started under the hood as the process where our code will be executed. Notice how the address to the Spark cluster, “spark://10.20.30.178:7077”, is sent as an argument. Remember to customize that address to your specific environment. The address references the Spark master VM (the same host as Jupyter runs on), but could just as easily reference an external host. For instance if you wanted to setup Jupyter on a separate OpenStack VM, or if you already have a Spark cluster running somewhere else that you want to connect to. The Spark master UI shows the right URL to use to connect, right below the Spark logo. Note that the Spark workers depend on being able to establish connections back to the host where the driver process runs (the Jupyter notebook), which may not be possible depending on the firewall setup when connecting to a remote Spark cluster. This is the reason a firewall rule allowing all traffic on the local network (10.20.30.0/24 in my case) is added by Cloud-init on all the Spark VMs.

After adding the kernel definition file for PySpark you’ll have to refresh the Jupyter homepage to see the new kernel in the list. No need to restart JupyterHub.

Debugging PySpark startup errors

If you get an error message about “Dead kernel” when creating new notebooks with the PySpark kernel, there might be several causes. For instance the VM running Jupyter might not be able to connect to the Spark cluster. Or it might lack some dependencies (packages/modules) to initialize the SparkContext. To debug kernel startup issues, first check the output from JupyterHub in the terminal where you started it (might be smart to keep that terminal open until everything works as expected). JupyterHub will for example output log lines like this when a Python module is missing:

[I 2015-09-20 20:31:24.276 ubuntu kernelmanager:85] Kernel started: 8a0b760d-357a-4507-a18b-da4bebd09e3f
/usr/bin/python2: No module named ipykernel
[I 2015-09-20 20:31:27.278 ubuntu restarter:103] KernelRestarter: restarting kernel (1/5)
/usr/bin/python2: No module named ipykernel
...
[W 2015-09-20 20:31:39.295 ubuntu restarter:95] KernelRestarter: restart failed
[W 2015-09-20 20:31:39.295 ubuntu kernelmanager:52] Kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f died, removing from map.
ERROR:root:kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f restarted failed!
[W 2015-09-20 20:31:39.406 ubuntu handlers:441] Kernel deleted before session

This error occured because I hadn’t installed ipython[notebook] using pip yet, so the Python2 modules needed by PySpark were not available. Notice how the error message states that it is /usr/bin/python2 that reports the error. Jupyter tries to restart the kernel a total of five times, but hits the same error every time and eventually gives up. In the notebook UI this is shown as a “Dead kernel” message.

Other errors can pop up in the Spark logs on master or worker nodes. Spark logs to /opt/spark/logs, so have a look there if anything is malfunctioning. The Spark master node logs every new application that is started on the Spark cluster, so if you don’t see output there when opening a new notebook with the PySpark profile, something is not right.

One last debugging tip is to try to start the PySpark shell from Bash on the VM where Jupyter runs. It is useful to inspect what happens when the PySpark shell starts. Here is an example of output when a dependency is missing:

$ python2 /opt/spark/python/pyspark/shell.py
Traceback (most recent call last):
 File "/opt/spark/python/pyspark/shell.py", line 28, in <module>
 import py4j
ImportError: No module named py4j

Remember to use Python2 when starting the shell. The above command mimics what Jupyter does behind the scenes when a new notebook is created.

Ready to use PySpark in Jupyter

If everything went according to plan, you now have a Spark cluster which you can easily use from Jupyter notebooks just by creating them with the PySpark profile 🙂 The variable “sc” is initialized as a SparkContext connected to the Spark cluster and you can start exploring the rich Spark API for data transformation and analysis. Here’s a screenshot from a notebook where I extracted responsetime numbers from Varnish NCSA logs (web cache server logs) and computed common statistics like mean and standard deviation for the responsetime of each backend in use by the cache server:

Example use of PySpark in Jupyter

~ Arne ~

Spark – How to fix “WARN TaskSchedulerImpl: Initial job has not accepted any resources”

Apache Spark and Firewalls

When setting up Apache Spark on your own cluster, in my case on OpenStack VMs, a common pitfall is the following error message:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

This error can pop up in the log output of the interactive Python Spark shell or Jupyter (formerly IPython Notebook) after starting a PySpark session and trying to perform any kind of Spark action (like .count() or .take() on a RDD), rendering PySpark unusable.

As the error message suggests, I investigated resource shortages first. The Spark Master UI reported that my PySpark shell had allocated all the available CPU cores and a small portion of the available memory. I therefore lowered the number of CPU cores for each Spark application on the cluster, by adding the following line in spark-env.sh on the master node and restarting the master:

SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4"

After this change my PySpark shell was limited to 4 CPU cores of the 16 CPU cores in my cluster at that time, instead of reserving all available cores (the default setting). However, even though the Spark UI now reported there would be enough free CPU cores and memory to actually run some Spark actions, the error message still popped up and no Spark actions would execute.

While debugging this issue, I came across a Spark-user mailing list post by Marcelo Vanzin of Cloudera where he outlines two possible causes for this particular error:

"...
- You're requesting more resources than the master has available, so
your executors are not starting. Given your explanation this doesn't
seem to be the case.

- The executors are starting, but are having problems connecting 
back to the driver. In this case, you should be able to see 
errors in each executor's log file.
..."

The second of these was causing this error in my case. The host firewall on the host where I ran my PySpark shell rejected the connection attempts back from the worker nodes. After allowing all traffic between all nodes involved, the problem was resolved! The driver host was another VM in the same OpenStack project, so allowing all traffic between the VMs in the same project was OK to do security-wise.

The error message is not particularly useful in the case where executors are unable to connect back to the driver. If you encounter the same error message, remember to check firewall logs from all involved firewalls (host and/or network firewalls).

On a side note, this requirement of Spark to connect back from executors to the driver makes it harder to set up a Spark cluster in a secure way. Unless the driver is in the same security zone as the Spark cluster, it may not be possible to allow the Spark cluster workers to establish connections to the driver host on arbitrary ports. Hopefully the Apache Spark project will address this limitation in a future release, by making sure all necessary connections are established by the driver (client host) only.

~ Arne ~

Analyzing Popular Topics In My Twitter Timeline using Apache Spark

Word cloud of Twitter hashtags
Most popular Twitter topics, generated using Apache Spark and Wordle.net

Over the last weeks I’ve dived into data analysis using Apache Spark. Spark is a framework for efficient, distributed analysis of data, built on the Hadoop platform but with much more flexibility than classic Hadoop MapReduce. To showcase some of the functionality I’ll walk you through an analysis of Twitter data. The code is available as an IPython Notebook on Github.

The question I want to answer using Spark is: What topics are people currently tweeting about? The people are in this case the ones I follow on Twitter, at the moment approx. 600 Twitter users. They represent a diverse set of interests mirroring the topics I’m interested in, such as data analysis, machine learning, networking technology, infrastructure, motor sports and so on. By extracting the hashtags they’ve used in tweets the last week and do a standard word count I’ll generate a list of the most popular topics right now.

The amount of functionality for data analysis in Spark is impressive. Spark features a long list of available transformations and actions, such as map, filter, reduce, several types of joins, cogroup, sum, union, intersect and so on. In addition, Spark has a machine learning library with a growing number of models and algorithms. For instance does Spark MLlib include everything needed to do the Linear Regression example I did on AWS in my previous blog post. Spark also comes with a Streaming component where batch analysis pipelines easily can be set up to run as realtime analysis jobs instead. Compared to classic Hadoop MapReduce Spark is not only more flexible, but also much faster thanks to the in-memory based analysis.

To do the Twitter analysis, I first fetched about 24000 tweets from the Twitter API using a Python module called Tweepy:

Each tweet was saved to a local MongoDB instance for persistence. The loop first checks the database to see if that user has been processed already, to save time if the loop has to be run several times. Due to rate limiting of the Twitter API it took about 2 hours to download the dataset. By the way, the term “friends” is the word Twitter uses to reference the list of users that a user follows.

The code snippet above depends on a valid, authorized API session with the Twitter API and an established connection to MongoDB. See the IPython Notebook for the necessary code to establish those connections. Of the dependencies the Python modules “tweepy” and “pymongo” need to be installed, preferably using pip to get the latest versions.

With the tweets saved in MongoDB, we are ready to start doing some filtering and analysis on them. First, the set of tweets need to be loaded into Spark and filtered:

In the code snippet above I use sc.parallelize() to load a Python list into Spark, but I could just as easily have used sc.textfile() to load data from a file on disk or sc.newAPIHadoopFile() to load a file from HDFS. Spark also supports use of Hadoop connectors to load data directly from other systems such as MongoDB, but that connector unfortunately does not support PySpark yet. In this case the dataset fits in memory of the Python process so I can use sc.parallelize() to load it into Spark, but if I’d like to run the analysis on a longer timespan than one week that would not be feasible. To see how the MongoDB connector can be used with Python, check out this example code by @notdirkhesse which he demonstrated as part of his excellent Spark talk in June.

“sc” is the SparkContext object, which is the object used to communicate with the Spark API from Python. I’m using a Vagrant box with Spark set up and “sc” initialized automatically, which was provided as part of the very interesting Spark MOOCs CS100 and CS190 (BerkeleyX Big Data X-Series). The SparkContext can be initialized to use remote clusters running on EC2 or Databricks instead of a local Vagrant box, which is how you’d scale out the computations.

Spark has a concept of RDDs, Resilient Distributed Datasets. RDDs represent an entire dataset regardless of how it is distributed around on the cluster of nodes. RDDs are immutable, so a transformation on a RDD returns a new RDD with the results. The last two lines of the code snippet above are transformations to filter() the dataset. An important point to note about Spark is that all transformations are lazily evaluated, meaning they are not computed until an action is called on the resulting RDD. The two filter statements are only recorded by Spark so that it knows how to generate the resulting RDDs when needed.

Let’s inspect the filters in a bit more detail:

tweetsWithTagsRDD = allTweetsRDD.filter(lambda t: len(t['entities']['hashtags']) > 0)

The first filter transformation is called on allTweetsRDD, which is the RDD that represents the entire dataset of tweets. For each of the tweets in allTweetsRDD, the lambda expression is evaluated. Only those tweets where the expression equals True is returned to be included in tweetsWithTagsRDD. All other tweets are silently discarded.

filteredTweetsRDD = tweetsWithTagsRDD.filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)

The second filter transformation is a bit more complex due to the datetime calculations, but follows the same pattern as the first. It is called on tweetsWithTagsRDD, the results of the first transformation, and checks if the tweet timestamp in the “created_at” field is recent enough to be within the time window I defined (one week). The tweet timestamp is parsed using python-dateutil, converted to unixtime and compared to the precomputed limit.

For those of you who are already acquainted with Spark, the following syntax might make more sense:

filteredTweetsRDD = (allTweetsRDD
                     .filter(lambda t: len(t['entities']['hashtags']) > 0)
                     .filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)
                    )

The inspiration from Functional Programming in Sparks programming model is apparent here, with enclosing parentheses around the entire statement in addition to the use of lambda functions. The resulting filteredTweetsRDD is the same as before. However, by assigning a variable name to the results of each filter transformation, it’s easy to compute counts:

tweetCount = allTweetsRDD.count()
withTagsCount = tweetsWithTagsRDD.count()
filteredCount = filteredTweetsRDD.count()

count() is an example of an action in Spark, so when I execute these statements the filter transformations above are also computed. The counts revealed the following about my dataset:

  • Total number of tweets: 24271
  • Tweets filtered away due to no hashtags: 17150
  • Of the tweets who had hashtags, 4665 where too old
  • Resulting set of tweets to analyze: 2456

Now we’re ready to do the data analysis part! With a filtered set of 2456 tweets in filteredTweetsRDD, I proceed to extract all hashtags and do a word count to find the most popular tags:

What’s happening here is that I’m creating a new Pair RDD consisting of tuples of (hashtag, count). The first step is to extract all hashtags with a flatMap(), and remember that every tweet can contain a list of multiple tags.

filteredTweetsRDD.flatMap(lambda tweet: [hashtag['text'].lower() \
    for hashtag in tweet['entities']['hashtags']])

A flatMap() transformation is similar to a map(), which passes each element of a RDD through a user-supplied function. In contrast to map, flatMap ensures that the result is a list instead of a nested datastructure – like a list of lists for instance. Since the analysis I’m doing doesn’t care which tweet has which hashtags, a simple list is sufficient. The lambda function does a list comprehension to extract the “text” field of each hashtag in the data structure and lowercase it. The data structure for tweets looks like this:

{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Sun Jul 12 19:29:09 +0000 2015',
 u'entities': {u'hashtags': [{u'indices': [75, 83], 
                              u'text': u'TurnAMC'},
                             {u'indices': [139, 140], 
                              u'text': u'RenewTURN'}],
               u'symbols': [],
               u'urls': [],
...

So the result of the lambda function on this tweet would be:

['turnamc', 'renewturn']

After the flatmap(), a standard word count using map() and reduceByKey() follows:

.map(lambda tag: (tag, 1))
.reduceByKey(lambda a, b: a + b)

A word count is the “Hello World”-equivalent for Spark. First, each hashtag is transformed to a key-value tuple of (hashtag, 1). Second, all tuples with the same key are reduced using the lambda function, which takes two counts and returns the sum. Spark runs both map() and reduceByKey() in parallel on the data partition residing on each worker node in a cluster, before the results of the local reduceByKey() are shuffled so that all values belonging to a key is processed by one worker. This behaviour mimics the use of a Combiner in classic Hadoop MapReduce. Since both map() and reduceByKey() are transformations, the result is a new RDD.

To actually perform the computations and get results, I call the action takeOrdered() with a cursom sort function to get the top 20 hashtags by count. The sort function simply orders key-value pairs descending by value.

The 20 most used hashtags in my dataset turned out to be:

[(u'bigdata', 114),
 (u'openstack', 92),
 (u'gophercon', 71),
 (u'machinelearning', 68),
 (u'sdn', 66),
 (u'datascience', 58),
 (u'docker', 56),
 (u'dtm', 46),
 (u'audisport', 44),
 (u'dtmzandvoort', 42),
 (u'hpc', 40),
 (u'welcomechallenges', 38),
 (u'devops', 37),
 (u'analytics', 36),
 (u'awssummit', 36),
 (u'infosec', 33),
 (u'security', 32),
 (u'openstacknow', 29),
 (u'renewturn', 29),
 (u'mobil1scgp', 28)]

In this list it’s easy to recognize several of the interests I mentioned earlier. Big Data is the top hashtag, which together with Machine Learning and Data Science make up a significant portion of the interesting tweets I see in my Twitter timeline. OpenStack is another top hashtag, which is a natural topic given my current job in infrastructure. SDN is a closely related topic and an integrated part of the OpenStack scene. Docker is taking over in the infrastructure world and the DevOps mindset that follows with it is also a popular topic.

What’s interesting to see is that conferences spark a lot of engagement on Twitter. Both GopherCon and AWS Summit turn up in the top 20 list since they took place during the week of tweets I analyzed. The same goes for motor sports (hashtags DTM Zandvoort, Audi Sport, Welcome Challenges), although in that case it’s the professional teams, in contrast to conference goers, that make sure their Twitter followers are constantly updated on standings and news.

As I’m sure you’ve noticed, the word cloud at the beginning of this blog post is generated from the list of hashtags in the dataset and their counts. To finish off the analysis I also computed the average number of hashtags per tweet that had at least one hashtag:

# Count the number of hashtags used
totalHashtags = countsRDD.map(lambda (key, value): value) \
                         .reduce(lambda a, b: a + b)

# Compute average number of hashtags per tweet
print('A total of {} hashtags gives an average number of ' +
      'tags per tweet at {}.'.format(totalHashtags, 
      round(totalHashtags/float(filteredTweetsRDD.count()), 2)))

Here I do another map + reduce, but this time the map function extracts the count for each hashtag and the reduce function sums it all up. It is very easy to build such pipelines of transformations to get the desired results. The speed and flexibility of Spark lowers the entry point and invites the user to do more such computations. In closing, here are the numbers:

  • A total of 4030 hashtags analyzed
  • An average number of tags per tweet at 1.64

~ Arne ~