Stream API responses directly to disk with Python

Python is the de-facto language for Data Science work. It’s very convenient to rapidly prototype a solution in Python and see if it works. However when faced with setting up the same solution in production, there are new space and time constraints to take into account. You’ll likely find that memory usage is the number one resource constraint you need to pay attention to. So let me share a useful trick to reduce memory usage in the early stages of a Python job.

To get your hands on training data, a very common step is to call an API and get data back in JSON format. For example for fetching tweets from Twitter. The simplest approach when using Python is to send an API call using Requests, store the response in a variable, decode the JSON and save it to disk for later processing.

This works well for small datasets, but when you try this on bigger JSON-based datasets, it results in very high memory usage. The problem is that you buffer data in memory and decode it, before saving to disk. Luckily, there is a better way.

Streaming API responses directly to disk

The Requests library has support for streaming. This enables you to iterate over the results you get as they arrive, instead of waiting for the entire response and buffering it in memory. Here’s a code snippet showing how this can be done:

with, data=ojb, headers=headers, stream=True) as response:
  with'out.gz', mode='wt', encoding='utf-8') as f:
    for chunk in response.iter_content(10240, decode_unicode=True):

With this code pattern the JSON is saved directly to disk in a compressed format as soon as it arrives over the network. The iterator ensures that you process chunks of 10240 bytes at a time, meaning that this is the maximum number of bytes your Python job has in memory at any time. The number of bytes is something you can tune and experiment with to figure out what works best in each case.

The impact of streaming response directly to disk is potentially huge. I’ve seen reductions in memory usage from tens of GB to almost nothing for jobs that migrated to this optimized approach.

One caveat of this approach is that it becomes a bit harder to extract parts of the JSON response if you need to do so. One example is APIs that use pagination, where the start and stop markers are part of the JSON response. A way to handle this is to resort to a simple text search of each chunk before the write call. This can work if what you’re trying to find fits nicely into one chunk. For instance if the very last chunk contains the necessary pagination values.

I believe this optimisation is important to use whenever possible, even if JSON parsing becomes a bit harder. It’s a good way to control the memory usage of a Python job that is fetching big datasets from an API. Using this approach the job should be able to stay within the memory constraints in production while fetching data.

~ Arne ~

Using Facebook Prophet Forecasting Library to Predict the Weather

Facebook recently released a forecasting library for Python and R, called Prophet. It’s designed for forecasting future values of time series of any kind, and is remarkably easy to get started with. One of my favorite data sets are temperature time series, so here I’ll explore how good Prophet is at predicting future temperatures based on past weather observations.

The dataset consists of temperature readings every 10 minutes from my Netatmo Weather Station, stored in InfluxDB over at I extracted the mean temperature per hour for the last year, resulting in approx. 9000 hourly temperature observations.The timestamps are in ISO8601 format: “2016-02-11T08:00:00Z”. The dataset also has gaps due to several shorter periods of malfunctioning data collection and system maintenance. Prophet claims to handle such gaps without issues, so let’s see if it does.

Update: If you want to try this out yourself, here’s the dataset!

Installing Prophet on Ubuntu Linux

Installing Prophet for Python is done using pip. Since Prophet depends on the Stan statistical library and is optimized for speed using C, it needs Cython and PyStan. In addition it depends on NumPy and Pandas, so make sure you have those installed too.

$ sudo -H pip install cython pystan numpy pandas
$ sudo -H pip install fbprophet

These can also be installed without sudo if you don’t have administrative privileges on the system.

Producing the first temperature predictions

To produce the initial predictions, we simply run through the following steps without changing default parameters.

Import packages and prepare input data

import pandas as pd
import numpy as np
from fbprophet import Prophet
df = pd.read_csv('outdoor-temperature-hourly.csv')
df = df[df.temperature != 'DIFF']

Preparing the dataset consists of loading it as a DataFrame using Pandas. The input dataset is a merge of two time series and some of the values are invalid. They are filtered out by excluding all rows with the value DIFF.

df['ds'] = df['time']
df['y'] = df['temperature']
df = df.drop(['name', 'time', 'temperature', 'seriesA', 'seriesB'], axis=1)

Prophet requires one column named “ds” with dates or datetimes, and one column named “y” with numeric values. All other columns are ignored. The two required columns are created by duplicating two existing columns “time” and “temperature”, before all irrelevant columns are dropped from the dataframe. The preview shows the resulting dataframe which is used as input to Prophet. The values are degrees Celcius and timestamps are UTC. The input looks like this:

2017-02-26 Prophet - Input dataframe start.png

Fit model and use it to make predictions


Fitting a model to the input data is as simple as “”. To make predictions, you first need a DataFrame with datestamps to predict for. Prophet has a useful make_future_dataframe() method to do just that. By default it generates one row per day, but by setting the frequency parameter to “H” we get hours instead. In this example I generated a dataframe with 50 days of hourly timestamps, starting right after the most recent timestamp in the input dataset.

To make predictions based on the model, all you need to do is call “model.predict(future)”. Using the model and dataframe of future datetimes, Prophet predicts values for each future datetime.

Initial results

Prophet includes built-in plotting of the results using Matplotlib. Here’s the prediction for the hourly temperatures two months into the future, plotted as a continuation of the existing input data:


Pretty impressive output for so little work required! Prophet is fast too – these results were computed in only seconds. The temperature trends are clearly visible with higher hourly temperatures during the summer months than in the winter. The forecast for the next two months says that the temperatures are about to rise from around zero degrees Celcius currently to around 5 degrees in the start of April. However, the uncertainty intervals are quite large in this forecast (around 10 degrees). The somewhat large variations between day and night temperatures might cause this.

Before trying to reduce the uncertainty interval, let’s look at another output of the prediction model: The components of the model.


These subplots show several interesting patterns. The first subplot, “Trend” shows a slight temperature rise on the large scale, from year to year. As the input data only covers one year, this does not qualify as a generic trend. However, it does say that the recorded 2017-temperatures are slightly warmer than the 2016-temperatures.

The weekly trends are a bit strange. According to these results, the start of the week is colder than the rest of the week. As the weather doesn’t care about what day it is, this remains a curiosity.

The last plot shows the seasonal distribution of temperatures during the year of input data. Since I only had one year of input data, this plot follows the data as seen in the main plot pretty closely. Like in the trend subplot, the seasonal distributions would benefit from a lot more input data.

Tuning the model to only cover 2017

The initial results used the entire dataset, but how will Prophet behave if it doesn’t have input data from the same season last year to base the predictions on? Let’s investigate the results of using a smaller time period.

recent = df[df.ds > '2017-01-01']

Here I’m making a new input dataframe by selecting only the rows that have timestamps in 2017. Let’s make a new model and some new predictions too:

model_recent = Prophet()
future_recent = model_recent.make_future_dataframe(periods=24*10, freq='H')
forecast_recent = model_recent.predict(future_recent)

I set the period to make predictions for to 10 days into the future. Since the input data doesn’t cover nearly as much as in the initial results, it makes sense to reduce the number of days to predict for. The results are shown in the following graph.

2017-02-26 Prophet - Hourly Temperatures for 2017 only.png

Since this plot covers a smaller time range, we can see more clearly the daily variations between day and night. Interestingly, Prophet is making the same type of prediction for the coming days as in the previous model; the temperature is going to rise. In this plot we can see why too. At around 2017-02-14 the temperature begun to rise and the last days of input data show temperatures well into the positive Celcius range. Prophet has successfully picked up this trend change and is using that to predict the future.

The Yearly trend subplot confirms that Prophet picked up on the trend change:

2017-02-26 Prophet - Hourly 2017 Temperatures - Yearly component.png

Tuning the model to reduce uncertainty intervals

In the initial results above, the uncertainty intervals were as big as 10 degrees Celcius. This is a bit too much to be useful in a weather forecasting system. To reduce the uncertainty, let’s make the input data a bit more uniform. To avoid having Prophet deal with day/night temperature differences, I filtered out all temperature measurements except for the one at 12:00 UTC each day. The theory is that these values, one per day, will be more uniform and lead to less variance in the model output.

Filtering the measurements could certainly be done using Pandas, but I chose to use the good old shell tools:

$ head -1 outdoor-temperature-hourly.csv > outdoor-temperature-12UTC.csv
$ fgrep "T12" outdoor-temperature-hourly.csv >> outdoor-temperature-12UTC.csv

Here I generate a new CSV file with only the temperature values for timestamps that contain “T12”. In the ISO8601 time format “T” is the date-time separator, so this selects all measurements having 12 as the hour component. I first save line 1 from the original file as line 1 in the new file to not lose column headings and having to tell Pandas about them manually.

Back in my Jupyter Notebook, the new input data is loaded into a dataframe like this:

df2 = pd.read_csv('outdoor-temperature-12UTC.csv', na_values = 'DIFF', usecols = ['time', 'temperature'])
df2['ds'] = df2['time']
df2['y'] = df2['temperature']
df2 = df2.drop('time', axis=1)
df2 = df2.drop('temperature', axis=1)

You might notice slight variations compared to the first example above. Here I added arguments to Pandas read_csv() to ignore lines with value DIFF and ignore all columns except the two with datetimes and values. Either way works, but I think this variant is a bit cleaner. To make sure we only got one temperature value per day, let’s have a look at the dataframe:

2017-02-26 Prophet - Daily Temperatures.png

Looks good. By now you know the drill for making Prophet generate predictions:

model2 = Prophet()
future2 = model2.make_future_dataframe(periods=30)
forecast2 = model2.predict(future2)

The results


This is better. The uncertanty interval is down by approx. 30% to about 7 degrees. That means filtering the input data to one value per day increased the usefulness of the predictions, even though Prophet had less data to base predictions on.

The components of the model look similar to the first example:


Note that the first subplot, “Trend”, has higher values on the Y-axis than in the first example. That might be because I filtered out a lot of colder temperatures. 12 UTC is not the median temperature value during a typical day, it’s closer to the maximum (which is in the afternoon / early evening).

In addition to the subplots, we can inspect the predictions and components in depth by looking at the dataframe with predictions. The most interesting part is the timestamps in the future, so filter for them first:

import datetime
forecast2[forecast2.ds >]

This returns a dataframe with details about all predictions for all timestamps from now on. Here’s a screenshot of some of the contents (certain columns removed for brewity):

2017-02-26 Prophet - Inspecting Prediction Component Values.png

For each timestamp, you get the predicted value (“yhat”) in the rightmost column and all the components making up the prediction. For example we can see that the yearly components are well below zero degrees Celcius. This makes sense in the winter season. In addition, “yhat_lower” and “yhat_upper” show the exact range of the uncertainty interval.

Pro tip: Plot components with uncertainty intervals too

In the previous examples all the component subplots lack uncertainty intervals. Prophet can generate such intervals too, at the cost of longer computation time. This is done by adding a parameter to the model creation:

model2 = Prophet(mcmc_samples=500)

This gives you full Bayesian sampling and takes longer to complete, however it’s still just in the range of some minutes.

With this activated, we get component subplots with uncertainty intervals:

2017-02-26 Prophet - Uncertainty intervals for components.png

These plots reuse the filtered input data from the previous example, with one temperature value per day. We can now reconsider the strange weekly trend in the middle subplot, where the start of the week seems to be colder than the rest. The uncertainty is quite big in that plot. In addition the zero-degrees line is well within the uncertainty interval, for all days of the week. This means it’s not possible to say that there is any difference between the various weekdays.

Closing remarks

Using Prophet to generate predictions turned out to be very easy, and there are several ways to adjust the predictions and inspect the results. While there are other libraries that have more functionality and flexibility, Prophet hits the sweet spot of predictive power versus ease of use. No more looking at weird plots of predicted values because you chose the wrong algorithm for your use case. And no more spending hours to fix input data that has gaps or timestamps in the wrong format.

The Prophet forecasting library seems very well designed. It’s now my new favorite tool for ad-hoc trend analysis and forecasting!

~ Arne ~

Visualize Your Netatmo Data with Grafana

Netatmo Weather Station is the ultimate personal weather station and gives you access to your measurements both through apps and a slick web interface. In addition to that, Netatmo has Developer APIs for access to raw data. Owning a Netatmo station, I just had to try to set up a Grafana dashboard with my weather data. Grafana is both flexible and has great-looking dashboards, so it should be a good fit for weather measurements. Keep on reading for tips on how to do this yourself too!

If you just want to see your Netatmo data in a beautiful dashboard, go to my free web service to register for an account where the dashboard is already set up for you. And if you don’t have a Netatmo Weather Station yet but want to try anyway, create a Netatmo account and go to the publicly available Netatmo HQ station in Paris to associate the station with your account.

Netatmo Grafana dashboard big

To get started with Grafana on your own server or laptop, the first thing to do is to get access to measurement data from your Netatmo station. Register for developer access at Netatmo to get that. After creating an account, continue to “Create an App” to get the necessary client ID and client secret.

Authentication and tokens

Using the client credentials, there are basically two modes of authentication, an easy one and a more complicated one. The easy one is called Client Credentials in the Netatmo docs and gives you an OAuth2 access token in just one step. Just send a request to the Netatmo API with your client credentials and your Netatmo username+password to get a token. The access token is then used in API calls to get measurement data for your station. The more complicated method is called Authorization Code and makes it possible for your application (“client”) to request access to other Netatmo weather stations than your own by redirecting the owner of that station to an OAuth2 authorization webpage.

Using Python, the following code issues a POST request for an access token the easy way. Before running it, replace the NETATMO_* constants with your values for client ID, client secret, username and password.

 data = dict(grant_type='password', client_id=NETATMO_CLIENT_ID,
        client_secret=NETATMO_CLIENT_SECRET, username=NETATMO_USERNAME,
        password=NETATMO_PASSWORD, scope='read_station')

 resp ='', data=data)
 if resp.status_code == 200:
     token = resp.json()
     token['expiry'] = int(time.time()) + token['expires_in']

Now you have a “token” variable as a dictionary with several fields.

The returned token data comes with an expires_in field that says how many seconds the token is valid. In the last line above I transform that into an expiry field containing the unixtime of expiry. That’s necessary to be able to check periodically if the token needs to be refreshed. Here is an example of a token dictionary including the additional expiry field:

{u'access_token': u'abcdefgh|1234567890',
 u'expire_in': 10800,
 u'expires_in': 10800,
 u'expiry': 1468168541,
 u'refresh_token': u'ababababab|2727272727',
 u'scope': [u'read_station']}

When the expiry time gets close, the refresh_token is used to renew the access_token by issuing another POST request:

data = dict(grant_type='refresh_token', refresh_token=token['refresh_token'], client_id=NETATMO_CLIENT_ID, client_secret=NETATMO_CLIENT_SECRET)
resp ='', data=data)
if resp.status_code == 200:
    token = resp.json()
    token['expiry'] = int(time.time()) + token['expires_in']

Measurement JSON data format

With a valid access_token we can fetch the interesting measurements from Netatmo. The APIs contain several methods for different Netatmo products, but for the Weather Station only /api/getstationdata is needed. Issue a GET request to see all the available measurements:

resp = requests.get('' + token['access_token'])
if resp.status_code == 200:
    data = resp.json()

The data structure returned has a lot of fields and varies by the number of extra modules attached to your Netatmo station (and even more if you’ve connected more than one station to your Netatmo account, like the public Netatmo HQ station in Paris). Here’s an excerpt of data returned in JSON format:

    [{u'_id': u'70:ee:aa:bb:cc:dd',
      u'co2_calibrating': False,
        {u'AbsolutePressure': 985,
        u'CO2': 431,
        u'Humidity': 46,
        u'Noise': 37,
        u'Pressure': 1001.9,
        u'Temperature': 26.3,
        u'date_max_temp': 1468101837,
        u'date_min_temp': 1468125907,
        u'max_temp': 26.7,
        u'min_temp': 24.8,
        u'pressure_trend': u'stable',
        u'temp_trend': u'stable',
        u'time_utc': 1468157806},
        [{u'_id': u'02:00:aa:bb:cc:dd',
            {u'Humidity': 52,
            u'Temperature': 22.8,
            u'date_max_temp': 1468127398,
            u'date_min_temp': 1468115964,
            u'max_temp': 26,
            u'min_temp': 9.9,
            u'temp_trend': u'down',
            u'time_utc': 1468157799},

The dashboard_data section has the actual readings, while data_type informs us of the measurement types that this station reports. Values are reported in the unit the user selected on setup, meaning they could be Fahrenheit instead of Celcius for instance. A separate user part of the returned JSON has details about which units are used.

In addition to the data from the indoor Weather Station, stations also have a modules parameter which holds measurements from all connected modules (outdoor module, rain gauge, wind gauge and so on). As seen above, for each module the JSON fields are the same as for the station, with the measurements in dashboard_data and reported measurements in data_type. This greatly simplifies parsing of the JSON response, as you can use the same code for parsing the devices list as for each entry in the modules list.

Storing data in InfluxDB

InfluxDB is a time series database with high performance, good compression and an easy-to-use write API and query language. After installing and starting it up with default config options, it’s ready to use as a data store for time-series data like weather measurements. The write API is available through HTTP. To write to InfluxDB, issue POST requests with the actual data as newline-delimited strings in the body of the request. InfluxDB documentation refers to this as the line protocol. An example write request can look like this:

payload = """
Humidity,station_name=MyStation,module_name=Outdoors value=52 1468157799
Temperature,station_name=MyStation,module_name=Outdoors value=22.8 1468157799
Rain,station_name=MyStation,module_name=Rain_Gauge value=0 1468157799

resp ='http://localhost:8086/write?precision=s&db=netatmo', data=payload)

This will save three measurements into time series named Humidity, Temperature and Rain in database netatmo. The value field is the actual measurement and the timestamp is from the time_utc field alongside the measurements. It’s trivial to convert the returned JSON into the line format that InfluxDB expects.

The station_name and module_name are custom tags attached to the time series to make it possible to distinguish between different stations and modules. The tags are available for filtering in Grafana using WHERE statements. Station names and module names defined when setting up the Netatmo Weather Station are available in the returned JSON from the Netatmo API.

Setting up a Grafana dashboard

After downloading and installing Grafana, go to the Datasource part of the web UI and create a new data source with the following settings:

Grafana - Data source setup

The User and Password under InfluxDB Details are root/root, but are not really used unless InfluxDB authentication was configured with non-default settings before starting up the database.

With a data source in place, the next step is to create a dashboard. There are many ways to visualize weather data, but at least add graphs for each time series you’ve stored in InfluxDB. That way you get a feel for how the metric changes over time. For some metrics the trends are most interesting, for other metrics only the current value is necessary to display. If everything works as expected, you should get suggestions when you set up the metric queries in Grafana, like this:

Create graph - Suggestions

Under the WHERE section you can filter on tags associated with each time series, like for example the module name to only get outdoor temperatures instead of both indoor and outdoor.

Awesome visualizations

With Grafana and InfluxDB set up to store data from your Netatmo Weather Station, you can create some pretty awesome visualizations. Like for instance this outdoor temperature graph over several months, with a moving_average() layered on top:

Outdoor Temperature with Moving Average.png

This makes it easy to see that the temperature moved in the right direction during these months, and that there were some periods with higher average temperature than others (the two first weeks of May in particular).

If you’re interested in more awesome visualizations of Netatmo Weather Station data, head over to my web service to get your own personalized weather dashboard!

~ Arne ~

Spark cluster on OpenStack with multi-user Jupyter Notebook

Spark on OpenStack with Jupyter

Apache Spark is gaining traction as the defacto analysis suite for big data, especially for those using Python. Spark has a rich API for Python and several very useful built-in libraries like MLlib for machine learning and Spark Streaming for realtime analysis. Jupyter (formerly IPython Notebook) is a convenient interface to perform exploratory data analysis and all kinds of other analytic tasks using Python. In this post I’ll show step-by-step how to set up a Spark cluster on OpenStack and configure Jupyter with multi-user access and an easy-to-use PySpark profile.

Setting up a Spark cluster on OpenStack VMs

To get a Spark standalone cluster up and running, all you need to do is spawn some VMs and start Spark as master on one of them and slave on the others. They will automatically form a cluster that you can connect to from Python, Java and Scala applications using the IP address of the master node. Sounds easy enough, right? But there are some pitfalls, so read on for tips on how to avoid them.

To create the first VM to be used as master node, I use the OpenStack command line client. We’ll get that node up and running first. The distribution is Ubuntu 14.04 Trusty. Cloud-init is an easy way to perform bootstrapping of nodes to get the necessary software installed and set up. To install and start a Spark master node I use the following Cloud-init script:

# Cloud-init script to get Spark Master up and running

# Firewall setup
ufw allow from $LOCALNET
ufw allow 80/tcp
ufw allow 443/tcp
ufw allow 4040:4050/tcp
ufw allow 7077/tcp
ufw allow 8080/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Configure Spark master
cp /opt/spark/conf/ /opt/spark/conf/
sed -i 's/# - SPARK_MASTER_OPTS.*/SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4 -Dspark.executor.memory=2G"/' /opt/spark/conf/

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts

# Start Spark Master with IP address of eth0 as the address to use
/opt/spark/sbin/ -h $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/)

Save this as for use with the OpenStack command line client. The script first adds some firewall rules to allow access to the different components and installs the OpenJDK dependency. Next, a Spark tarball is downloaded, unpacked and made available under /opt/spark on the host. The tarball is prepackaged with Hadoop v1 libraries (note the “hadoop1.tgz” suffix), so adjust this if you need Hadoop v2 instead.

The only configuration of Spark we need at this point is to set the options “spark.deploy.defaultCores” and “spark.executor.memory”. They are used to configure how much resources each application will get when it starts. Since the goal is to set up a multi-user environment with Jupyter notebooks, we need to limit the total amount of CPU cores and RAM that each notebook will use. Each notebook is an “application” on the cluster for as long as the notebook is active (i.e until it is shutdown by the user). If we don’t limit the resource allocation, the first notebook created will allocate all available CPU cores on each worker, leaving no CPU cores free for the next user. In addition, the default RAM allocation for each app is only 512 MB on each worker node, which might be a bit too small, so we bump that up to 2 GB.

The echo line adds “spark-master” to /etc/hosts with a reference to the IP address of the VM. Spark tries to resolve the local hostname on startup. Without a resolvable hostname you might encounter “Name or service not known”-errors, resulting in Java exceptions and exits.

On the last line the Spark master process is started. The master process is given the IP address of the local host as an argument to make sure it binds to the correct interface. The IP address is extracted from the output of the “ip addr” command.

One way to launch the master VM with the Cloud-init script is like this:

# Install OpenStack client if not present already
sudo apt-get -y install python-openstackclient

# Customize these values to match your OpenStack cluster

# Create Spark master VM
openstack server create --flavor m1.medium --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-master

If you rather prefer using the web UI (Horizon), you could just as easily paste the Cloud-init script into the text box on the Post-Creation tab of the Launch Instance dialog and archieve the same result.

It will take some minutes for the Spark master VM to finish bootstrapping. When it’s done the Spark master UI will be available on port 8080. Remember to associate a floating IP to the VM to be able to access it from outside the OpenStack project:

openstack ip floating add spark-master

Verify that the Spark master UI is reachable and displays metadata about the cluster. If the UI is not reachable, first check that your Security Group rules allow port 8080 to the Spark master VM. Second, check the Cloud-init logs on the VM to ensure all parts of the initialization succeeded. You’ll find the Cloud-init log file on the VM as /var/log/cloud-init.log and the output from the Cloud-init script in /var/log/cloud-init-output.log. You can also try to re-run parts of the Cloud-init script with sudo to narrow down any issues with the initialization. When initialization succeeds the Spark master UI will look like this:

Spark master UI with no workers

As expected there are no workers alive yet, so let’s initialize some. To do so we use a slightly modified version of the Cloud-init script above. The main difference is the startup command, which is now /opt/spark/sbin/ with the address to the master as the only argument. Remember to adjust the variables below to your IP range and master IP address.

# Cloud-init script to get Spark Worker up and running

# Firewall setup
ufw allow from $LOCALNET
ufw allow 8081/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts
# Start Spark worker with address of Spark master to join cluster 
/opt/spark/sbin/ spark://$SPARK_MASTER_IP:7077

Save this as Notice how the last line starts up a slave with the address to the cluster master on port 7077 as the only argument. Since the Cloud-init script has no worker-specific details, it is easy to expand the cluster just by creating more worker VMs initialized with the same Cloud-init script. Let’s create the first worker:

# Customize these values to match your OpenStack cluster

# Create first Spark worker VM, this time with flavor m1.large
openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker1

Pause for a moment to let the worker creation process finish to ensure that Cloud-init does the necessary work without errors. There is no point in initializing more workers until the process is proven to work as expected. Again, it can be useful to check /var/log/cloud-init.log and /var/log/cloud-init-output.log on the new VM to verify that Cloud-init does what it’s supposed to do. On success you’ll see the worker in the Spark master UI:

Spark master UI with one worker registered

Create some more worker nodes to scale the cluster to handle more parallel tasks:

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker2

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker3

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker4

Verify that the new worker nodes show up in the Spark master UI before continuing.

Installing Jupyter and JupyterHub

A shiny new Spark cluster is fine, but we also need interfaces to be able to use it. Spark comes prepackaged with shells for Scala and Python where connection to a cluster is already set up. The same level of usability is possible to get with Jupyter (formerly IPython Notebook), so that when you open a new notebook a connection to the Spark cluster (a SparkContext) is established for you. The SparkContext is available through the variable “sc” in the notebook, ready to use by calling sc.textFile() to create an RDD, for instance.

JupyterHub is a multi-user server for Jupyter notebooks. That makes it possible for several users to use Jupyter independently and have their own notebooks and files in their home directory instead of a shared storage directory for all notebooks. However, this requires that each user has a user account on the VM where JupyterHub is running. Add user accounts for relevant users now if needed. JupyterHub uses unix authentication, meaning that it relays the username and password to the underlying authentication system on the VM for credential check.

In this deployment JupyterHub is installed on the Spark master VM and launched there. It could run on a separate VM, but there is normally no need for that since the Spark master process does not require that much resources. The VM where Jupyter notebooks are executed are called the “driver” in Spark, and that will require some processing power and memory use, depending on the use case.

SSH into the Spark master VM and run the following set of commands:

# Install pip3 and other dependencies
sudo apt-get -y install python3-pip npm nodejs-legacy
sudo npm install -g configurable-http-proxy

# Install JupyterHub and Jupyter
sudo pip3 install jupyterhub
sudo pip3 install "ipython[notebook]"

pip3 is used instead of pip because JupyterHub depends on Python >= 3.3. After installing all software and dependencies, start the JupyterHub service:

sudo jupyterhub --port 80

The benefit of having JupyterHub listen on port 80 instead of the default port 8000 should be obvious, but it requires that you start the service as root. In addition you might want to look into securing JupyterHub with an SSL certificate and have it listen on port 443, since it asks for passwords when users log in. When you have the necessary certificate and keys on the VM, the service can be started like this instead:

sudo jupyterhub --port 443 --ssl-key hub.key --ssl-cert hub.pem

Now try to open the JupyterHub login page on the floating IP address of the VM and log in. After login you should be greeted with an empty home directory with no notebooks. A new notebook can be created by clicking “New” on the right above the notebook list.

Jupyter - notebook list empty

If you create a new notebook, you’ll notice that the only supported kernel is Python3 at the moment. We need to add PySpark to that list to be able to use the Spark cluster from Jupyter.

Configuring Jupyter for PySpark

Jupyter relies on kernels to execute code. The default kernel is Python, but many other languages can be added. To use the Spark cluster from Jupyter we add a separate kernel called PySpark. In addition, kernels can run specific commands on startup, which in this case is used to initialize the SparkContext. First, some dependencies need to be installed:

sudo apt-get -y install python-dev python-setuptools
sudo easy_install pip
sudo pip install py4j
sudo pip install "ipython[notebook]"

It might seem odd to install ipython[notebook] as a dependency, but the reason is that IPython/Jupyter contains a number of Python support modules that kernels rely on. Previously when we installed using pip3, we got the Python3 versions of those modules. When installing again with pip, we get Python2 versions. PySpark depends on Python2.

To add PySpark as a kernel, a file containing a kernel definition must be created. Kernel definitions are JSON files in a specific directory. Kernels can either be enabled globally for all users or for one user only, depending on where the definition file is placed. We want the PySpark kernel to be available for all users, so we’ll add it under /usr/local/share/jupyter/kernels/ like this:

sudo mkdir -p /usr/local/share/jupyter/kernels/pyspark/
cat <<EOF | sudo tee /usr/local/share/jupyter/kernels/pyspark/kernel.json
 "display_name": "PySpark",
 "language": "python",
 "argv": [
 "env": {
  "SPARK_HOME": "/opt/spark/",
  "PYTHONPATH": "/opt/spark/python/:/opt/spark/python/lib/",
  "PYTHONSTARTUP": "/opt/spark/python/pyspark/",
  "PYSPARK_SUBMIT_ARGS": "--master spark:// pyspark-shell"

This kernel definition ensures that the Spark built-in “pyspark-shell” is started under the hood as the process where our code will be executed. Notice how the address to the Spark cluster, “spark://”, is sent as an argument. Remember to customize that address to your specific environment. The address references the Spark master VM (the same host as Jupyter runs on), but could just as easily reference an external host. For instance if you wanted to setup Jupyter on a separate OpenStack VM, or if you already have a Spark cluster running somewhere else that you want to connect to. The Spark master UI shows the right URL to use to connect, right below the Spark logo. Note that the Spark workers depend on being able to establish connections back to the host where the driver process runs (the Jupyter notebook), which may not be possible depending on the firewall setup when connecting to a remote Spark cluster. This is the reason a firewall rule allowing all traffic on the local network ( in my case) is added by Cloud-init on all the Spark VMs.

After adding the kernel definition file for PySpark you’ll have to refresh the Jupyter homepage to see the new kernel in the list. No need to restart JupyterHub.

Debugging PySpark startup errors

If you get an error message about “Dead kernel” when creating new notebooks with the PySpark kernel, there might be several causes. For instance the VM running Jupyter might not be able to connect to the Spark cluster. Or it might lack some dependencies (packages/modules) to initialize the SparkContext. To debug kernel startup issues, first check the output from JupyterHub in the terminal where you started it (might be smart to keep that terminal open until everything works as expected). JupyterHub will for example output log lines like this when a Python module is missing:

[I 2015-09-20 20:31:24.276 ubuntu kernelmanager:85] Kernel started: 8a0b760d-357a-4507-a18b-da4bebd09e3f
/usr/bin/python2: No module named ipykernel
[I 2015-09-20 20:31:27.278 ubuntu restarter:103] KernelRestarter: restarting kernel (1/5)
/usr/bin/python2: No module named ipykernel
[W 2015-09-20 20:31:39.295 ubuntu restarter:95] KernelRestarter: restart failed
[W 2015-09-20 20:31:39.295 ubuntu kernelmanager:52] Kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f died, removing from map.
ERROR:root:kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f restarted failed!
[W 2015-09-20 20:31:39.406 ubuntu handlers:441] Kernel deleted before session

This error occured because I hadn’t installed ipython[notebook] using pip yet, so the Python2 modules needed by PySpark were not available. Notice how the error message states that it is /usr/bin/python2 that reports the error. Jupyter tries to restart the kernel a total of five times, but hits the same error every time and eventually gives up. In the notebook UI this is shown as a “Dead kernel” message.

Other errors can pop up in the Spark logs on master or worker nodes. Spark logs to /opt/spark/logs, so have a look there if anything is malfunctioning. The Spark master node logs every new application that is started on the Spark cluster, so if you don’t see output there when opening a new notebook with the PySpark profile, something is not right.

One last debugging tip is to try to start the PySpark shell from Bash on the VM where Jupyter runs. It is useful to inspect what happens when the PySpark shell starts. Here is an example of output when a dependency is missing:

$ python2 /opt/spark/python/pyspark/
Traceback (most recent call last):
 File "/opt/spark/python/pyspark/", line 28, in <module>
 import py4j
ImportError: No module named py4j

Remember to use Python2 when starting the shell. The above command mimics what Jupyter does behind the scenes when a new notebook is created.

Ready to use PySpark in Jupyter

If everything went according to plan, you now have a Spark cluster which you can easily use from Jupyter notebooks just by creating them with the PySpark profile 🙂 The variable “sc” is initialized as a SparkContext connected to the Spark cluster and you can start exploring the rich Spark API for data transformation and analysis. Here’s a screenshot from a notebook where I extracted responsetime numbers from Varnish NCSA logs (web cache server logs) and computed common statistics like mean and standard deviation for the responsetime of each backend in use by the cache server:

Example use of PySpark in Jupyter

~ Arne ~

New Skill Sets Needed for Network Specialists

I share a lot of the views presented by @netmanchris in his plan for technology areas to focus on and the follow-up post It Generalist or Network Specialist?. I started out in this field as a Networking Specialist focusing on the traditional areas like switching, routing and firewalls. However, over time the need for automation, scripting and data analysis popped up more and more in my day job, to be able to automate manual tasks and improve the quality of networking services like firewall rulesets.

I’ve written about a use case for data analysis in networking in a previous post. Another example I want to highlight is the need for automatically updating firewall object-groups with new IP addresses as DNS entries for remote services change. Not all firewalls support DNS names as destination in a firewall rule. When a server need access to a specific remote site, and that site changes IP addresses from time to time, the IP adresses in the firewall config need to change too. To facilitate this I implemented a solution using Python which keeps object-groups in the firewall config in sync with IP addresses in DNS replies.

So far, these examples are from the traditional networking field. There is a new kind of Networking in the making with the invention of SDN solutions and the pervasive virtualization of everything, not just servers. Almost everything in this new Networking has APIs to support automation and requires us to learn some programming to be able to implement efficient solutions.

Python has emerged as the language of choice to sysadmins and net admins. To be able to use Python efficiently, basic knowledge of relevant modules is key. For example Paramiko for using SSH in scripts and IPy to handle IP addresses and subnets easily. To interface with APIs there may be specific modules available like python-neutronclient for the OpenStack Neutron API, if not you can always use HTTP via Requests.

In addition to Python I see knowledge of Git, Chef/Puppet, hypervisors, Linux networking, OVS, OpenStack and so on as very important in this new infrastructure-as-code movement. The new networking engineer role will be a hybrid sysadmin/netadmin/devop role and brings with it opportunities to make every day at work even more interesting!

~ Arne ~