Using Facebook Prophet Forecasting Library to Predict the Weather

Facebook recently released a forecasting library for Python and R, called Prophet. It’s designed for forecasting future values of time series of any kind, and is remarkably easy to get started with. One of my favorite data sets are temperature time series, so here I’ll explore how good Prophet is at predicting future temperatures based on past weather observations.

The dataset consists of temperature readings every 10 minutes from my Netatmo Weather Station, stored in InfluxDB over at GrafAtmo.com. I extracted the mean temperature per hour for the last year, resulting in approx. 9000 hourly temperature observations.The timestamps are in ISO8601 format: “2016-02-11T08:00:00Z”. The dataset also has gaps due to several shorter periods of malfunctioning data collection and system maintenance. Prophet claims to handle such gaps without issues, so let’s see if it does.

Update: If you want to try this out yourself, here’s the dataset!

Installing Prophet on Ubuntu Linux

Installing Prophet for Python is done using pip. Since Prophet depends on the Stan statistical library and is optimized for speed using C, it needs Cython and PyStan. In addition it depends on NumPy and Pandas, so make sure you have those installed too.

$ sudo -H pip install cython pystan numpy pandas
$ sudo -H pip install fbprophet

These can also be installed without sudo if you don’t have administrative privileges on the system.

Producing the first temperature predictions

To produce the initial predictions, we simply run through the following steps without changing default parameters.

Import packages and prepare input data

import pandas as pd
import numpy as np
from fbprophet import Prophet
df = pd.read_csv('outdoor-temperature-hourly.csv')
df = df[df.temperature != 'DIFF']

Preparing the dataset consists of loading it as a DataFrame using Pandas. The input dataset is a merge of two time series and some of the values are invalid. They are filtered out by excluding all rows with the value DIFF.

df['ds'] = df['time']
df['y'] = df['temperature']
df = df.drop(['name', 'time', 'temperature', 'seriesA', 'seriesB'], axis=1)

Prophet requires one column named “ds” with dates or datetimes, and one column named “y” with numeric values. All other columns are ignored. The two required columns are created by duplicating two existing columns “time” and “temperature”, before all irrelevant columns are dropped from the dataframe. The preview shows the resulting dataframe which is used as input to Prophet. The values are degrees Celcius and timestamps are UTC. The input looks like this:

2017-02-26 Prophet - Input dataframe start.png

Fit model and use it to make predictions

2017-02-26-prophet-fit-input-data-and-predict

Fitting a model to the input data is as simple as “model.fit(df)”. To make predictions, you first need a DataFrame with datestamps to predict for. Prophet has a useful make_future_dataframe() method to do just that. By default it generates one row per day, but by setting the frequency parameter to “H” we get hours instead. In this example I generated a dataframe with 50 days of hourly timestamps, starting right after the most recent timestamp in the input dataset.

To make predictions based on the model, all you need to do is call “model.predict(future)”. Using the model and dataframe of future datetimes, Prophet predicts values for each future datetime.

Initial results

Prophet includes built-in plotting of the results using Matplotlib. Here’s the prediction for the hourly temperatures two months into the future, plotted as a continuation of the existing input data:

2017-02-26-prophet-predicted-temperatures-two-months-ahead

Pretty impressive output for so little work required! Prophet is fast too – these results were computed in only seconds. The temperature trends are clearly visible with higher hourly temperatures during the summer months than in the winter. The forecast for the next two months says that the temperatures are about to rise from around zero degrees Celcius currently to around 5 degrees in the start of April. However, the uncertainty intervals are quite large in this forecast (around 10 degrees). The somewhat large variations between day and night temperatures might cause this.

Before trying to reduce the uncertainty interval, let’s look at another output of the prediction model: The components of the model.

2017-02-26-prophet-plot-components-for-hourly-temperature-forecast

These subplots show several interesting patterns. The first subplot, “Trend” shows a slight temperature rise on the large scale, from year to year. As the input data only covers one year, this does not qualify as a generic trend. However, it does say that the recorded 2017-temperatures are slightly warmer than the 2016-temperatures.

The weekly trends are a bit strange. According to these results, the start of the week is colder than the rest of the week. As the weather doesn’t care about what day it is, this remains a curiosity.

The last plot shows the seasonal distribution of temperatures during the year of input data. Since I only had one year of input data, this plot follows the data as seen in the main plot pretty closely. Like in the trend subplot, the seasonal distributions would benefit from a lot more input data.

Tuning the model to only cover 2017

The initial results used the entire dataset, but how will Prophet behave if it doesn’t have input data from the same season last year to base the predictions on? Let’s investigate the results of using a smaller time period.

recent = df[df.ds > '2017-01-01']

Here I’m making a new input dataframe by selecting only the rows that have timestamps in 2017. Let’s make a new model and some new predictions too:

model_recent = Prophet()
model_recent.fit(recent)
future_recent = model_recent.make_future_dataframe(periods=24*10, freq='H')
forecast_recent = model_recent.predict(future_recent)

I set the period to make predictions for to 10 days into the future. Since the input data doesn’t cover nearly as much as in the initial results, it makes sense to reduce the number of days to predict for. The results are shown in the following graph.

2017-02-26 Prophet - Hourly Temperatures for 2017 only.png

Since this plot covers a smaller time range, we can see more clearly the daily variations between day and night. Interestingly, Prophet is making the same type of prediction for the coming days as in the previous model; the temperature is going to rise. In this plot we can see why too. At around 2017-02-14 the temperature begun to rise and the last days of input data show temperatures well into the positive Celcius range. Prophet has successfully picked up this trend change and is using that to predict the future.

The Yearly trend subplot confirms that Prophet picked up on the trend change:

2017-02-26 Prophet - Hourly 2017 Temperatures - Yearly component.png

Tuning the model to reduce uncertainty intervals

In the initial results above, the uncertainty intervals were as big as 10 degrees Celcius. This is a bit too much to be useful in a weather forecasting system. To reduce the uncertainty, let’s make the input data a bit more uniform. To avoid having Prophet deal with day/night temperature differences, I filtered out all temperature measurements except for the one at 12:00 UTC each day. The theory is that these values, one per day, will be more uniform and lead to less variance in the model output.

Filtering the measurements could certainly be done using Pandas, but I chose to use the good old shell tools:

$ head -1 outdoor-temperature-hourly.csv > outdoor-temperature-12UTC.csv
$ fgrep "T12" outdoor-temperature-hourly.csv >> outdoor-temperature-12UTC.csv

Here I generate a new CSV file with only the temperature values for timestamps that contain “T12”. In the ISO8601 time format “T” is the date-time separator, so this selects all measurements having 12 as the hour component. I first save line 1 from the original file as line 1 in the new file to not lose column headings and having to tell Pandas about them manually.

Back in my Jupyter Notebook, the new input data is loaded into a dataframe like this:

df2 = pd.read_csv('outdoor-temperature-12UTC.csv', na_values = 'DIFF', usecols = ['time', 'temperature'])
df2['ds'] = df2['time']
df2['y'] = df2['temperature']
df2 = df2.drop('time', axis=1)
df2 = df2.drop('temperature', axis=1)

You might notice slight variations compared to the first example above. Here I added arguments to Pandas read_csv() to ignore lines with value DIFF and ignore all columns except the two with datetimes and values. Either way works, but I think this variant is a bit cleaner. To make sure we only got one temperature value per day, let’s have a look at the dataframe:

2017-02-26 Prophet - Daily Temperatures.png

Looks good. By now you know the drill for making Prophet generate predictions:

model2 = Prophet()
model2.fit(df2)
future2 = model2.make_future_dataframe(periods=30)
forecast2 = model2.predict(future2)

The results

2017-02-26-prophet-daily-temperature-forecast

This is better. The uncertanty interval is down by approx. 30% to about 7 degrees. That means filtering the input data to one value per day increased the usefulness of the predictions, even though Prophet had less data to base predictions on.

The components of the model look similar to the first example:

2017-02-26-prophet-daily-temperature-forecast-components

Note that the first subplot, “Trend”, has higher values on the Y-axis than in the first example. That might be because I filtered out a lot of colder temperatures. 12 UTC is not the median temperature value during a typical day, it’s closer to the maximum (which is in the afternoon / early evening).

In addition to the subplots, we can inspect the predictions and components in depth by looking at the dataframe with predictions. The most interesting part is the timestamps in the future, so filter for them first:

import datetime
forecast2[forecast2.ds > datetime.datetime.now()]

This returns a dataframe with details about all predictions for all timestamps from now on. Here’s a screenshot of some of the contents (certain columns removed for brewity):

2017-02-26 Prophet - Inspecting Prediction Component Values.png

For each timestamp, you get the predicted value (“yhat”) in the rightmost column and all the components making up the prediction. For example we can see that the yearly components are well below zero degrees Celcius. This makes sense in the winter season. In addition, “yhat_lower” and “yhat_upper” show the exact range of the uncertainty interval.

Pro tip: Plot components with uncertainty intervals too

In the previous examples all the component subplots lack uncertainty intervals. Prophet can generate such intervals too, at the cost of longer computation time. This is done by adding a parameter to the model creation:

model2 = Prophet(mcmc_samples=500)

This gives you full Bayesian sampling and takes longer to complete, however it’s still just in the range of some minutes.

With this activated, we get component subplots with uncertainty intervals:

2017-02-26 Prophet - Uncertainty intervals for components.png

These plots reuse the filtered input data from the previous example, with one temperature value per day. We can now reconsider the strange weekly trend in the middle subplot, where the start of the week seems to be colder than the rest. The uncertainty is quite big in that plot. In addition the zero-degrees line is well within the uncertainty interval, for all days of the week. This means it’s not possible to say that there is any difference between the various weekdays.

Closing remarks

Using Prophet to generate predictions turned out to be very easy, and there are several ways to adjust the predictions and inspect the results. While there are other libraries that have more functionality and flexibility, Prophet hits the sweet spot of predictive power versus ease of use. No more looking at weird plots of predicted values because you chose the wrong algorithm for your use case. And no more spending hours to fix input data that has gaps or timestamps in the wrong format.

The Prophet forecasting library seems very well designed. It’s now my new favorite tool for ad-hoc trend analysis and forecasting!

~ Arne ~

Advertisements

Visualize Your Netatmo Data with Grafana

Netatmo Weather Station is the ultimate personal weather station and gives you access to your measurements both through apps and a slick web interface. In addition to that, Netatmo has Developer APIs for access to raw data. Owning a Netatmo station, I just had to try to set up a Grafana dashboard with my weather data. Grafana is both flexible and has great-looking dashboards, so it should be a good fit for weather measurements. Keep on reading for tips on how to do this yourself too!

If you just want to see your Netatmo data in a beautiful dashboard, go to my free web service grafatmo.com to register for an account where the dashboard is already set up for you. And if you don’t have a Netatmo Weather Station yet but want to try anyway, create a Netatmo account and go to the publicly available Netatmo HQ station in Paris to associate the station with your account.

Netatmo Grafana dashboard big

To get started with Grafana on your own server or laptop, the first thing to do is to get access to measurement data from your Netatmo station. Register for developer access at Netatmo to get that. After creating an account, continue to “Create an App” to get the necessary client ID and client secret.

Authentication and tokens

Using the client credentials, there are basically two modes of authentication, an easy one and a more complicated one. The easy one is called Client Credentials in the Netatmo docs and gives you an OAuth2 access token in just one step. Just send a request to the Netatmo API with your client credentials and your Netatmo username+password to get a token. The access token is then used in API calls to get measurement data for your station. The more complicated method is called Authorization Code and makes it possible for your application (“client”) to request access to other Netatmo weather stations than your own by redirecting the owner of that station to an OAuth2 authorization webpage.

Using Python, the following code issues a POST request for an access token the easy way. Before running it, replace the NETATMO_* constants with your values for client ID, client secret, username and password.

 data = dict(grant_type='password', client_id=NETATMO_CLIENT_ID,
        client_secret=NETATMO_CLIENT_SECRET, username=NETATMO_USERNAME,
        password=NETATMO_PASSWORD, scope='read_station')

 resp = requests.post('https://api.netatmo.com/oauth2/token', data=data)
 if resp.status_code == 200:
     token = resp.json()
     token['expiry'] = int(time.time()) + token['expires_in']

Now you have a “token” variable as a dictionary with several fields.

The returned token data comes with an expires_in field that says how many seconds the token is valid. In the last line above I transform that into an expiry field containing the unixtime of expiry. That’s necessary to be able to check periodically if the token needs to be refreshed. Here is an example of a token dictionary including the additional expiry field:

{u'access_token': u'abcdefgh|1234567890',
 u'expire_in': 10800,
 u'expires_in': 10800,
 u'expiry': 1468168541,
 u'refresh_token': u'ababababab|2727272727',
 u'scope': [u'read_station']}

When the expiry time gets close, the refresh_token is used to renew the access_token by issuing another POST request:

data = dict(grant_type='refresh_token', refresh_token=token['refresh_token'], client_id=NETATMO_CLIENT_ID, client_secret=NETATMO_CLIENT_SECRET)
resp = requests.post('https://api.netatmo.com/oauth2/token', data=data)
if resp.status_code == 200:
    token = resp.json()
    token['expiry'] = int(time.time()) + token['expires_in']

Measurement JSON data format

With a valid access_token we can fetch the interesting measurements from Netatmo. The APIs contain several methods for different Netatmo products, but for the Weather Station only /api/getstationdata is needed. Issue a GET request to see all the available measurements:

resp = requests.get('https://api.netatmo.com/api/getstationsdata?access_token=' + token['access_token'])
if resp.status_code == 200:
    data = resp.json()

The data structure returned has a lot of fields and varies by the number of extra modules attached to your Netatmo station (and even more if you’ve connected more than one station to your Netatmo account, like the public Netatmo HQ station in Paris). Here’s an excerpt of data returned in JSON format:

{u'body': 
  {u'devices': 
    [{u'_id': u'70:ee:aa:bb:cc:dd',
      u'co2_calibrating': False,
      u'dashboard_data': 
        {u'AbsolutePressure': 985,
        u'CO2': 431,
        u'Humidity': 46,
        u'Noise': 37,
        u'Pressure': 1001.9,
        u'Temperature': 26.3,
        u'date_max_temp': 1468101837,
        u'date_min_temp': 1468125907,
        u'max_temp': 26.7,
        u'min_temp': 24.8,
        u'pressure_trend': u'stable',
        u'temp_trend': u'stable',
        u'time_utc': 1468157806},
      u'data_type': 
        [u'Temperature',
        u'CO2',
        u'Humidity',
        u'Noise',
        u'Pressure'],
...
      u'modules': 
        [{u'_id': u'02:00:aa:bb:cc:dd',
          u'dashboard_data': 
            {u'Humidity': 52,
            u'Temperature': 22.8,
            u'date_max_temp': 1468127398,
            u'date_min_temp': 1468115964,
            u'max_temp': 26,
            u'min_temp': 9.9,
            u'temp_trend': u'down',
            u'time_utc': 1468157799},
          u'data_type': 
            [u'Temperature',
            u'Humidity'],
...

The dashboard_data section has the actual readings, while data_type informs us of the measurement types that this station reports. Values are reported in the unit the user selected on setup, meaning they could be Fahrenheit instead of Celcius for instance. A separate user part of the returned JSON has details about which units are used.

In addition to the data from the indoor Weather Station, stations also have a modules parameter which holds measurements from all connected modules (outdoor module, rain gauge, wind gauge and so on). As seen above, for each module the JSON fields are the same as for the station, with the measurements in dashboard_data and reported measurements in data_type. This greatly simplifies parsing of the JSON response, as you can use the same code for parsing the devices list as for each entry in the modules list.

Storing data in InfluxDB

InfluxDB is a time series database with high performance, good compression and an easy-to-use write API and query language. After installing and starting it up with default config options, it’s ready to use as a data store for time-series data like weather measurements. The write API is available through HTTP. To write to InfluxDB, issue POST requests with the actual data as newline-delimited strings in the body of the request. InfluxDB documentation refers to this as the line protocol. An example write request can look like this:

payload = """
Humidity,station_name=MyStation,module_name=Outdoors value=52 1468157799
Temperature,station_name=MyStation,module_name=Outdoors value=22.8 1468157799
Rain,station_name=MyStation,module_name=Rain_Gauge value=0 1468157799
"""

resp = requests.post('http://localhost:8086/write?precision=s&db=netatmo', data=payload)

This will save three measurements into time series named Humidity, Temperature and Rain in database netatmo. The value field is the actual measurement and the timestamp is from the time_utc field alongside the measurements. It’s trivial to convert the returned JSON into the line format that InfluxDB expects.

The station_name and module_name are custom tags attached to the time series to make it possible to distinguish between different stations and modules. The tags are available for filtering in Grafana using WHERE statements. Station names and module names defined when setting up the Netatmo Weather Station are available in the returned JSON from the Netatmo API.

Setting up a Grafana dashboard

After downloading and installing Grafana, go to the Datasource part of the web UI and create a new data source with the following settings:

Grafana - Data source setup

The User and Password under InfluxDB Details are root/root, but are not really used unless InfluxDB authentication was configured with non-default settings before starting up the database.

With a data source in place, the next step is to create a dashboard. There are many ways to visualize weather data, but at least add graphs for each time series you’ve stored in InfluxDB. That way you get a feel for how the metric changes over time. For some metrics the trends are most interesting, for other metrics only the current value is necessary to display. If everything works as expected, you should get suggestions when you set up the metric queries in Grafana, like this:

Create graph - Suggestions

Under the WHERE section you can filter on tags associated with each time series, like for example the module name to only get outdoor temperatures instead of both indoor and outdoor.

Awesome visualizations

With Grafana and InfluxDB set up to store data from your Netatmo Weather Station, you can create some pretty awesome visualizations. Like for instance this outdoor temperature graph over several months, with a moving_average() layered on top:

Outdoor Temperature with Moving Average.png

This makes it easy to see that the temperature moved in the right direction during these months, and that there were some periods with higher average temperature than others (the two first weeks of May in particular).

If you’re interested in more awesome visualizations of Netatmo Weather Station data, head over to my web service grafatmo.com to get your own personalized weather dashboard!

~ Arne ~

Spark cluster on OpenStack with multi-user Jupyter Notebook

Spark on OpenStack with Jupyter

Apache Spark is gaining traction as the defacto analysis suite for big data, especially for those using Python. Spark has a rich API for Python and several very useful built-in libraries like MLlib for machine learning and Spark Streaming for realtime analysis. Jupyter (formerly IPython Notebook) is a convenient interface to perform exploratory data analysis and all kinds of other analytic tasks using Python. In this post I’ll show step-by-step how to set up a Spark cluster on OpenStack and configure Jupyter with multi-user access and an easy-to-use PySpark profile.

Setting up a Spark cluster on OpenStack VMs

To get a Spark standalone cluster up and running, all you need to do is spawn some VMs and start Spark as master on one of them and slave on the others. They will automatically form a cluster that you can connect to from Python, Java and Scala applications using the IP address of the master node. Sounds easy enough, right? But there are some pitfalls, so read on for tips on how to avoid them.

To create the first VM to be used as master node, I use the OpenStack command line client. We’ll get that node up and running first. The distribution is Ubuntu 14.04 Trusty. Cloud-init is an easy way to perform bootstrapping of nodes to get the necessary software installed and set up. To install and start a Spark master node I use the following Cloud-init script:

#!/bin/bash
#
# Cloud-init script to get Spark Master up and running
#
SPARK_VERSION="1.5.0"
APACHE_MIRROR="apache.uib.no"
LOCALNET="10.20.30.0/24"

# Firewall setup
ufw allow from $LOCALNET
ufw allow 80/tcp
ufw allow 443/tcp
ufw allow 4040:4050/tcp
ufw allow 7077/tcp
ufw allow 8080/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Configure Spark master
cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
sed -i 's/# - SPARK_MASTER_OPTS.*/SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4 -Dspark.executor.memory=2G"/' /opt/spark/conf/spark-env.sh

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts

# Start Spark Master with IP address of eth0 as the address to use
/opt/spark/sbin/start-master.sh -h $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/)

Save this as init-spark-master.sh for use with the OpenStack command line client. The script first adds some firewall rules to allow access to the different components and installs the OpenJDK dependency. Next, a Spark tarball is downloaded, unpacked and made available under /opt/spark on the host. The tarball is prepackaged with Hadoop v1 libraries (note the “hadoop1.tgz” suffix), so adjust this if you need Hadoop v2 instead.

The only configuration of Spark we need at this point is to set the options “spark.deploy.defaultCores” and “spark.executor.memory”. They are used to configure how much resources each application will get when it starts. Since the goal is to set up a multi-user environment with Jupyter notebooks, we need to limit the total amount of CPU cores and RAM that each notebook will use. Each notebook is an “application” on the cluster for as long as the notebook is active (i.e until it is shutdown by the user). If we don’t limit the resource allocation, the first notebook created will allocate all available CPU cores on each worker, leaving no CPU cores free for the next user. In addition, the default RAM allocation for each app is only 512 MB on each worker node, which might be a bit too small, so we bump that up to 2 GB.

The echo line adds “spark-master” to /etc/hosts with a reference to the IP address of the VM. Spark tries to resolve the local hostname on startup. Without a resolvable hostname you might encounter “Name or service not known”-errors, resulting in Java exceptions and exits.

On the last line the Spark master process is started. The master process is given the IP address of the local host as an argument to make sure it binds to the correct interface. The IP address is extracted from the output of the “ip addr” command.

One way to launch the master VM with the Cloud-init script is like this:

# Install OpenStack client if not present already
sudo apt-get -y install python-openstackclient

# Customize these values to match your OpenStack cluster
OS_IMAGE="f9c9b0dc-6407-4ac2-ad0e-45cb4e47bb01"
OS_NETID="df7cc182-8794-4134-b700-1fb8f1fbf070"
OS_KEYNAME="arnes"

# Create Spark master VM
openstack server create --flavor m1.medium --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-master.sh spark-master

If you rather prefer using the web UI (Horizon), you could just as easily paste the Cloud-init script into the text box on the Post-Creation tab of the Launch Instance dialog and archieve the same result.

It will take some minutes for the Spark master VM to finish bootstrapping. When it’s done the Spark master UI will be available on port 8080. Remember to associate a floating IP to the VM to be able to access it from outside the OpenStack project:

openstack ip floating add 10.1.1.1 spark-master

Verify that the Spark master UI is reachable and displays metadata about the cluster. If the UI is not reachable, first check that your Security Group rules allow port 8080 to the Spark master VM. Second, check the Cloud-init logs on the VM to ensure all parts of the initialization succeeded. You’ll find the Cloud-init log file on the VM as /var/log/cloud-init.log and the output from the Cloud-init script in /var/log/cloud-init-output.log. You can also try to re-run parts of the Cloud-init script with sudo to narrow down any issues with the initialization. When initialization succeeds the Spark master UI will look like this:

Spark master UI with no workers

As expected there are no workers alive yet, so let’s initialize some. To do so we use a slightly modified version of the Cloud-init script above. The main difference is the startup command, which is now /opt/spark/sbin/start-slave.sh with the address to the master as the only argument. Remember to adjust the variables below to your IP range and master IP address.

#!/bin/bash
#
# Cloud-init script to get Spark Worker up and running
#
SPARK_VERSION="1.5.0"
APACHE_MIRROR="apache.uib.no"
LOCALNET="10.20.30.0/24"
SPARK_MASTER_IP="10.20.30.178"

# Firewall setup
ufw allow from $LOCALNET
ufw allow 8081/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts
# Start Spark worker with address of Spark master to join cluster 
/opt/spark/sbin/start-slave.sh spark://$SPARK_MASTER_IP:7077

Save this as init-spark-worker.sh. Notice how the last line starts up a slave with the address to the cluster master on port 7077 as the only argument. Since the Cloud-init script has no worker-specific details, it is easy to expand the cluster just by creating more worker VMs initialized with the same Cloud-init script. Let’s create the first worker:

# Customize these values to match your OpenStack cluster
OS_IMAGE="f9c9b0dc-6407-4ac2-ad0e-45cb4e47bb01"
OS_NETID="df7cc182-8794-4134-b700-1fb8f1fbf070"
OS_KEYNAME="arnes"

# Create first Spark worker VM, this time with flavor m1.large
openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker1

Pause for a moment to let the worker creation process finish to ensure that Cloud-init does the necessary work without errors. There is no point in initializing more workers until the process is proven to work as expected. Again, it can be useful to check /var/log/cloud-init.log and /var/log/cloud-init-output.log on the new VM to verify that Cloud-init does what it’s supposed to do. On success you’ll see the worker in the Spark master UI:

Spark master UI with one worker registered

Create some more worker nodes to scale the cluster to handle more parallel tasks:

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker2

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker3

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data init-spark-worker.sh spark-worker4

Verify that the new worker nodes show up in the Spark master UI before continuing.

Installing Jupyter and JupyterHub

A shiny new Spark cluster is fine, but we also need interfaces to be able to use it. Spark comes prepackaged with shells for Scala and Python where connection to a cluster is already set up. The same level of usability is possible to get with Jupyter (formerly IPython Notebook), so that when you open a new notebook a connection to the Spark cluster (a SparkContext) is established for you. The SparkContext is available through the variable “sc” in the notebook, ready to use by calling sc.textFile() to create an RDD, for instance.

JupyterHub is a multi-user server for Jupyter notebooks. That makes it possible for several users to use Jupyter independently and have their own notebooks and files in their home directory instead of a shared storage directory for all notebooks. However, this requires that each user has a user account on the VM where JupyterHub is running. Add user accounts for relevant users now if needed. JupyterHub uses unix authentication, meaning that it relays the username and password to the underlying authentication system on the VM for credential check.

In this deployment JupyterHub is installed on the Spark master VM and launched there. It could run on a separate VM, but there is normally no need for that since the Spark master process does not require that much resources. The VM where Jupyter notebooks are executed are called the “driver” in Spark, and that will require some processing power and memory use, depending on the use case.

SSH into the Spark master VM and run the following set of commands:

# Install pip3 and other dependencies
sudo apt-get -y install python3-pip npm nodejs-legacy
sudo npm install -g configurable-http-proxy

# Install JupyterHub and Jupyter
sudo pip3 install jupyterhub
sudo pip3 install "ipython[notebook]"

pip3 is used instead of pip because JupyterHub depends on Python >= 3.3. After installing all software and dependencies, start the JupyterHub service:

sudo jupyterhub --port 80

The benefit of having JupyterHub listen on port 80 instead of the default port 8000 should be obvious, but it requires that you start the service as root. In addition you might want to look into securing JupyterHub with an SSL certificate and have it listen on port 443, since it asks for passwords when users log in. When you have the necessary certificate and keys on the VM, the service can be started like this instead:

sudo jupyterhub --port 443 --ssl-key hub.key --ssl-cert hub.pem

Now try to open the JupyterHub login page on the floating IP address of the VM and log in. After login you should be greeted with an empty home directory with no notebooks. A new notebook can be created by clicking “New” on the right above the notebook list.

Jupyter - notebook list empty

If you create a new notebook, you’ll notice that the only supported kernel is Python3 at the moment. We need to add PySpark to that list to be able to use the Spark cluster from Jupyter.

Configuring Jupyter for PySpark

Jupyter relies on kernels to execute code. The default kernel is Python, but many other languages can be added. To use the Spark cluster from Jupyter we add a separate kernel called PySpark. In addition, kernels can run specific commands on startup, which in this case is used to initialize the SparkContext. First, some dependencies need to be installed:

sudo apt-get -y install python-dev python-setuptools
sudo easy_install pip
sudo pip install py4j
sudo pip install "ipython[notebook]"

It might seem odd to install ipython[notebook] as a dependency, but the reason is that IPython/Jupyter contains a number of Python support modules that kernels rely on. Previously when we installed using pip3, we got the Python3 versions of those modules. When installing again with pip, we get Python2 versions. PySpark depends on Python2.

To add PySpark as a kernel, a file containing a kernel definition must be created. Kernel definitions are JSON files in a specific directory. Kernels can either be enabled globally for all users or for one user only, depending on where the definition file is placed. We want the PySpark kernel to be available for all users, so we’ll add it under /usr/local/share/jupyter/kernels/ like this:

sudo mkdir -p /usr/local/share/jupyter/kernels/pyspark/
cat <<EOF | sudo tee /usr/local/share/jupyter/kernels/pyspark/kernel.json
{
 "display_name": "PySpark",
 "language": "python",
 "argv": [
  "/usr/bin/python2",
  "-m",
  "ipykernel",
  "-f",
  "{connection_file}"
 ],
 "env": {
  "SPARK_HOME": "/opt/spark/",
  "PYTHONPATH": "/opt/spark/python/:/opt/spark/python/lib/py4j-0.8.2.1-src.zip",
  "PYTHONSTARTUP": "/opt/spark/python/pyspark/shell.py",
  "PYSPARK_SUBMIT_ARGS": "--master spark://10.20.30.178:7077 pyspark-shell"
 }
}
EOF

This kernel definition ensures that the Spark built-in “pyspark-shell” is started under the hood as the process where our code will be executed. Notice how the address to the Spark cluster, “spark://10.20.30.178:7077”, is sent as an argument. Remember to customize that address to your specific environment. The address references the Spark master VM (the same host as Jupyter runs on), but could just as easily reference an external host. For instance if you wanted to setup Jupyter on a separate OpenStack VM, or if you already have a Spark cluster running somewhere else that you want to connect to. The Spark master UI shows the right URL to use to connect, right below the Spark logo. Note that the Spark workers depend on being able to establish connections back to the host where the driver process runs (the Jupyter notebook), which may not be possible depending on the firewall setup when connecting to a remote Spark cluster. This is the reason a firewall rule allowing all traffic on the local network (10.20.30.0/24 in my case) is added by Cloud-init on all the Spark VMs.

After adding the kernel definition file for PySpark you’ll have to refresh the Jupyter homepage to see the new kernel in the list. No need to restart JupyterHub.

Debugging PySpark startup errors

If you get an error message about “Dead kernel” when creating new notebooks with the PySpark kernel, there might be several causes. For instance the VM running Jupyter might not be able to connect to the Spark cluster. Or it might lack some dependencies (packages/modules) to initialize the SparkContext. To debug kernel startup issues, first check the output from JupyterHub in the terminal where you started it (might be smart to keep that terminal open until everything works as expected). JupyterHub will for example output log lines like this when a Python module is missing:

[I 2015-09-20 20:31:24.276 ubuntu kernelmanager:85] Kernel started: 8a0b760d-357a-4507-a18b-da4bebd09e3f
/usr/bin/python2: No module named ipykernel
[I 2015-09-20 20:31:27.278 ubuntu restarter:103] KernelRestarter: restarting kernel (1/5)
/usr/bin/python2: No module named ipykernel
...
[W 2015-09-20 20:31:39.295 ubuntu restarter:95] KernelRestarter: restart failed
[W 2015-09-20 20:31:39.295 ubuntu kernelmanager:52] Kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f died, removing from map.
ERROR:root:kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f restarted failed!
[W 2015-09-20 20:31:39.406 ubuntu handlers:441] Kernel deleted before session

This error occured because I hadn’t installed ipython[notebook] using pip yet, so the Python2 modules needed by PySpark were not available. Notice how the error message states that it is /usr/bin/python2 that reports the error. Jupyter tries to restart the kernel a total of five times, but hits the same error every time and eventually gives up. In the notebook UI this is shown as a “Dead kernel” message.

Other errors can pop up in the Spark logs on master or worker nodes. Spark logs to /opt/spark/logs, so have a look there if anything is malfunctioning. The Spark master node logs every new application that is started on the Spark cluster, so if you don’t see output there when opening a new notebook with the PySpark profile, something is not right.

One last debugging tip is to try to start the PySpark shell from Bash on the VM where Jupyter runs. It is useful to inspect what happens when the PySpark shell starts. Here is an example of output when a dependency is missing:

$ python2 /opt/spark/python/pyspark/shell.py
Traceback (most recent call last):
 File "/opt/spark/python/pyspark/shell.py", line 28, in <module>
 import py4j
ImportError: No module named py4j

Remember to use Python2 when starting the shell. The above command mimics what Jupyter does behind the scenes when a new notebook is created.

Ready to use PySpark in Jupyter

If everything went according to plan, you now have a Spark cluster which you can easily use from Jupyter notebooks just by creating them with the PySpark profile 🙂 The variable “sc” is initialized as a SparkContext connected to the Spark cluster and you can start exploring the rich Spark API for data transformation and analysis. Here’s a screenshot from a notebook where I extracted responsetime numbers from Varnish NCSA logs (web cache server logs) and computed common statistics like mean and standard deviation for the responsetime of each backend in use by the cache server:

Example use of PySpark in Jupyter

~ Arne ~

Spark – How to fix “WARN TaskSchedulerImpl: Initial job has not accepted any resources”

Apache Spark and Firewalls

When setting up Apache Spark on your own cluster, in my case on OpenStack VMs, a common pitfall is the following error message:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

This error can pop up in the log output of the interactive Python Spark shell or Jupyter (formerly IPython Notebook) after starting a PySpark session and trying to perform any kind of Spark action (like .count() or .take() on a RDD), rendering PySpark unusable.

As the error message suggests, I investigated resource shortages first. The Spark Master UI reported that my PySpark shell had allocated all the available CPU cores and a small portion of the available memory. I therefore lowered the number of CPU cores for each Spark application on the cluster, by adding the following line in spark-env.sh on the master node and restarting the master:

SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4"

After this change my PySpark shell was limited to 4 CPU cores of the 16 CPU cores in my cluster at that time, instead of reserving all available cores (the default setting). However, even though the Spark UI now reported there would be enough free CPU cores and memory to actually run some Spark actions, the error message still popped up and no Spark actions would execute.

While debugging this issue, I came across a Spark-user mailing list post by Marcelo Vanzin of Cloudera where he outlines two possible causes for this particular error:

"...
- You're requesting more resources than the master has available, so
your executors are not starting. Given your explanation this doesn't
seem to be the case.

- The executors are starting, but are having problems connecting 
back to the driver. In this case, you should be able to see 
errors in each executor's log file.
..."

The second of these was causing this error in my case. The host firewall on the host where I ran my PySpark shell rejected the connection attempts back from the worker nodes. After allowing all traffic between all nodes involved, the problem was resolved! The driver host was another VM in the same OpenStack project, so allowing all traffic between the VMs in the same project was OK to do security-wise.

The error message is not particularly useful in the case where executors are unable to connect back to the driver. If you encounter the same error message, remember to check firewall logs from all involved firewalls (host and/or network firewalls).

On a side note, this requirement of Spark to connect back from executors to the driver makes it harder to set up a Spark cluster in a secure way. Unless the driver is in the same security zone as the Spark cluster, it may not be possible to allow the Spark cluster workers to establish connections to the driver host on arbitrary ports. Hopefully the Apache Spark project will address this limitation in a future release, by making sure all necessary connections are established by the driver (client host) only.

~ Arne ~

Analyzing Popular Topics In My Twitter Timeline using Apache Spark

Word cloud of Twitter hashtags
Most popular Twitter topics, generated using Apache Spark and Wordle.net

Over the last weeks I’ve dived into data analysis using Apache Spark. Spark is a framework for efficient, distributed analysis of data, built on the Hadoop platform but with much more flexibility than classic Hadoop MapReduce. To showcase some of the functionality I’ll walk you through an analysis of Twitter data. The code is available as an IPython Notebook on Github.

The question I want to answer using Spark is: What topics are people currently tweeting about? The people are in this case the ones I follow on Twitter, at the moment approx. 600 Twitter users. They represent a diverse set of interests mirroring the topics I’m interested in, such as data analysis, machine learning, networking technology, infrastructure, motor sports and so on. By extracting the hashtags they’ve used in tweets the last week and do a standard word count I’ll generate a list of the most popular topics right now.

The amount of functionality for data analysis in Spark is impressive. Spark features a long list of available transformations and actions, such as map, filter, reduce, several types of joins, cogroup, sum, union, intersect and so on. In addition, Spark has a machine learning library with a growing number of models and algorithms. For instance does Spark MLlib include everything needed to do the Linear Regression example I did on AWS in my previous blog post. Spark also comes with a Streaming component where batch analysis pipelines easily can be set up to run as realtime analysis jobs instead. Compared to classic Hadoop MapReduce Spark is not only more flexible, but also much faster thanks to the in-memory based analysis.

To do the Twitter analysis, I first fetched about 24000 tweets from the Twitter API using a Python module called Tweepy:

Each tweet was saved to a local MongoDB instance for persistence. The loop first checks the database to see if that user has been processed already, to save time if the loop has to be run several times. Due to rate limiting of the Twitter API it took about 2 hours to download the dataset. By the way, the term “friends” is the word Twitter uses to reference the list of users that a user follows.

The code snippet above depends on a valid, authorized API session with the Twitter API and an established connection to MongoDB. See the IPython Notebook for the necessary code to establish those connections. Of the dependencies the Python modules “tweepy” and “pymongo” need to be installed, preferably using pip to get the latest versions.

With the tweets saved in MongoDB, we are ready to start doing some filtering and analysis on them. First, the set of tweets need to be loaded into Spark and filtered:

In the code snippet above I use sc.parallelize() to load a Python list into Spark, but I could just as easily have used sc.textfile() to load data from a file on disk or sc.newAPIHadoopFile() to load a file from HDFS. Spark also supports use of Hadoop connectors to load data directly from other systems such as MongoDB, but that connector unfortunately does not support PySpark yet. In this case the dataset fits in memory of the Python process so I can use sc.parallelize() to load it into Spark, but if I’d like to run the analysis on a longer timespan than one week that would not be feasible. To see how the MongoDB connector can be used with Python, check out this example code by @notdirkhesse which he demonstrated as part of his excellent Spark talk in June.

“sc” is the SparkContext object, which is the object used to communicate with the Spark API from Python. I’m using a Vagrant box with Spark set up and “sc” initialized automatically, which was provided as part of the very interesting Spark MOOCs CS100 and CS190 (BerkeleyX Big Data X-Series). The SparkContext can be initialized to use remote clusters running on EC2 or Databricks instead of a local Vagrant box, which is how you’d scale out the computations.

Spark has a concept of RDDs, Resilient Distributed Datasets. RDDs represent an entire dataset regardless of how it is distributed around on the cluster of nodes. RDDs are immutable, so a transformation on a RDD returns a new RDD with the results. The last two lines of the code snippet above are transformations to filter() the dataset. An important point to note about Spark is that all transformations are lazily evaluated, meaning they are not computed until an action is called on the resulting RDD. The two filter statements are only recorded by Spark so that it knows how to generate the resulting RDDs when needed.

Let’s inspect the filters in a bit more detail:

tweetsWithTagsRDD = allTweetsRDD.filter(lambda t: len(t['entities']['hashtags']) > 0)

The first filter transformation is called on allTweetsRDD, which is the RDD that represents the entire dataset of tweets. For each of the tweets in allTweetsRDD, the lambda expression is evaluated. Only those tweets where the expression equals True is returned to be included in tweetsWithTagsRDD. All other tweets are silently discarded.

filteredTweetsRDD = tweetsWithTagsRDD.filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)

The second filter transformation is a bit more complex due to the datetime calculations, but follows the same pattern as the first. It is called on tweetsWithTagsRDD, the results of the first transformation, and checks if the tweet timestamp in the “created_at” field is recent enough to be within the time window I defined (one week). The tweet timestamp is parsed using python-dateutil, converted to unixtime and compared to the precomputed limit.

For those of you who are already acquainted with Spark, the following syntax might make more sense:

filteredTweetsRDD = (allTweetsRDD
                     .filter(lambda t: len(t['entities']['hashtags']) > 0)
                     .filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)
                    )

The inspiration from Functional Programming in Sparks programming model is apparent here, with enclosing parentheses around the entire statement in addition to the use of lambda functions. The resulting filteredTweetsRDD is the same as before. However, by assigning a variable name to the results of each filter transformation, it’s easy to compute counts:

tweetCount = allTweetsRDD.count()
withTagsCount = tweetsWithTagsRDD.count()
filteredCount = filteredTweetsRDD.count()

count() is an example of an action in Spark, so when I execute these statements the filter transformations above are also computed. The counts revealed the following about my dataset:

  • Total number of tweets: 24271
  • Tweets filtered away due to no hashtags: 17150
  • Of the tweets who had hashtags, 4665 where too old
  • Resulting set of tweets to analyze: 2456

Now we’re ready to do the data analysis part! With a filtered set of 2456 tweets in filteredTweetsRDD, I proceed to extract all hashtags and do a word count to find the most popular tags:

What’s happening here is that I’m creating a new Pair RDD consisting of tuples of (hashtag, count). The first step is to extract all hashtags with a flatMap(), and remember that every tweet can contain a list of multiple tags.

filteredTweetsRDD.flatMap(lambda tweet: [hashtag['text'].lower() \
    for hashtag in tweet['entities']['hashtags']])

A flatMap() transformation is similar to a map(), which passes each element of a RDD through a user-supplied function. In contrast to map, flatMap ensures that the result is a list instead of a nested datastructure – like a list of lists for instance. Since the analysis I’m doing doesn’t care which tweet has which hashtags, a simple list is sufficient. The lambda function does a list comprehension to extract the “text” field of each hashtag in the data structure and lowercase it. The data structure for tweets looks like this:

{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Sun Jul 12 19:29:09 +0000 2015',
 u'entities': {u'hashtags': [{u'indices': [75, 83], 
                              u'text': u'TurnAMC'},
                             {u'indices': [139, 140], 
                              u'text': u'RenewTURN'}],
               u'symbols': [],
               u'urls': [],
...

So the result of the lambda function on this tweet would be:

['turnamc', 'renewturn']

After the flatmap(), a standard word count using map() and reduceByKey() follows:

.map(lambda tag: (tag, 1))
.reduceByKey(lambda a, b: a + b)

A word count is the “Hello World”-equivalent for Spark. First, each hashtag is transformed to a key-value tuple of (hashtag, 1). Second, all tuples with the same key are reduced using the lambda function, which takes two counts and returns the sum. Spark runs both map() and reduceByKey() in parallel on the data partition residing on each worker node in a cluster, before the results of the local reduceByKey() are shuffled so that all values belonging to a key is processed by one worker. This behaviour mimics the use of a Combiner in classic Hadoop MapReduce. Since both map() and reduceByKey() are transformations, the result is a new RDD.

To actually perform the computations and get results, I call the action takeOrdered() with a cursom sort function to get the top 20 hashtags by count. The sort function simply orders key-value pairs descending by value.

The 20 most used hashtags in my dataset turned out to be:

[(u'bigdata', 114),
 (u'openstack', 92),
 (u'gophercon', 71),
 (u'machinelearning', 68),
 (u'sdn', 66),
 (u'datascience', 58),
 (u'docker', 56),
 (u'dtm', 46),
 (u'audisport', 44),
 (u'dtmzandvoort', 42),
 (u'hpc', 40),
 (u'welcomechallenges', 38),
 (u'devops', 37),
 (u'analytics', 36),
 (u'awssummit', 36),
 (u'infosec', 33),
 (u'security', 32),
 (u'openstacknow', 29),
 (u'renewturn', 29),
 (u'mobil1scgp', 28)]

In this list it’s easy to recognize several of the interests I mentioned earlier. Big Data is the top hashtag, which together with Machine Learning and Data Science make up a significant portion of the interesting tweets I see in my Twitter timeline. OpenStack is another top hashtag, which is a natural topic given my current job in infrastructure. SDN is a closely related topic and an integrated part of the OpenStack scene. Docker is taking over in the infrastructure world and the DevOps mindset that follows with it is also a popular topic.

What’s interesting to see is that conferences spark a lot of engagement on Twitter. Both GopherCon and AWS Summit turn up in the top 20 list since they took place during the week of tweets I analyzed. The same goes for motor sports (hashtags DTM Zandvoort, Audi Sport, Welcome Challenges), although in that case it’s the professional teams, in contrast to conference goers, that make sure their Twitter followers are constantly updated on standings and news.

As I’m sure you’ve noticed, the word cloud at the beginning of this blog post is generated from the list of hashtags in the dataset and their counts. To finish off the analysis I also computed the average number of hashtags per tweet that had at least one hashtag:

# Count the number of hashtags used
totalHashtags = countsRDD.map(lambda (key, value): value) \
                         .reduce(lambda a, b: a + b)

# Compute average number of hashtags per tweet
print('A total of {} hashtags gives an average number of ' +
      'tags per tweet at {}.'.format(totalHashtags, 
      round(totalHashtags/float(filteredTweetsRDD.count()), 2)))

Here I do another map + reduce, but this time the map function extracts the count for each hashtag and the reduce function sums it all up. It is very easy to build such pipelines of transformations to get the desired results. The speed and flexibility of Spark lowers the entry point and invites the user to do more such computations. In closing, here are the numbers:

  • A total of 4030 hashtags analyzed
  • An average number of tags per tweet at 1.64

~ Arne ~

Using Amazon Machine Learning to Predict the Weather

Amazon recently launched their Machine Learning service, so I thought I’d take it for a spin. Machine Learning (ML) is all about predicting future data based on patterns in existing data. As an experiment I wanted to see if machine learning would be able to predict the weather of tomorrow based on weather observations. Weather systems travel large distances on a time scale of hours and days, so recent weather observations from around the country can be used to predict the future weather of one specific site. Meteorological institutes do this every day by running complex weather models on hundreds of nodes in large HPC clusters. I don’t expect machine learning to produce quite as good results as those models do, but thought it would be fun to see how close ML could get.

Weather Map
Weather forecast from yr.no, delivered by the Norwegian Meteorological Institute and the NRK

The Amazon Machine Learning service makes it easy to get started and reduces the time it takes to get actionable insights from data. The service comes with tutorials, developer guides, a very useful explanation of machine learning concepts and enough tips to guide anyone through their first steps in the world of machine learning. There is a sample dataset you can use to create your first prediction model but if you want, you can follow along my journey in this post with my dataset instead. The source code to generate it is on Github and all you need to generate CSV files with weather observations is a free API key from Wunderground.com.

Defining the use case and dataset

Before diving into coding and machine learning, it’s important to define the use case as clearly as possible. To test whether Machine Learning is a viable approach to weather forecasting is the overall goal. To test this, I choose to predict the temperature tomorrow at 12:00 UTC in Oslo, the capital of Norway. The dataset I’ve chosen is weather observations from five cities in Norway, scattered around the southern half of the country. The weather in Oslo usually comes from the west, so I include observations from cities like Stavanger and Bergen in the dataset.

The layout of the dataset is important. Amazon Machine Learning treats every line in the dataset (CSV file) as a separate record and processes them randomly. For each record, it tries to predict the target value (temperature in Oslo the next day) from the variables present in that record. This means that you can’t rely on any connections between records, for example that the temperature measured at 10 AM may be similar to the temperature at 11 AM.

To create a dataset with enough data in each record to be able to predict the target value, I append all weather observations with the same timestamp, regardless of location, to the same record. This means that for any given timestamp there will for instance be five temperatures, five wind measurements and so on. To be able to distinguish between different cities in the dataset I named each column (each variable) with the first letter of the city name, forming variable names like “o_tempm” when the original observational data had a variable “tempm” containing the temperature for Oslo.

Creating the training dataset

Machine Learning works by creating a model from a training dataset where the target value to predict is already known. Since I want to predict a numerical value, Amazon ML defaults to a linear regression model. That is, it tries to build a formula which can output the target value, using individual weights for each variable in a record that tells the model how that variable is related to the target value. Some variables get weight zero, meaning they are not related to the target value at all, and others get positive weights between 0 and 1. To be able to determine weights for variables, there must be a sufficient amount of training data.

To create a sufficiently large training dataset, I needed weather observations for some time, at least 14 days. Fortunately, Wunderground.com has a JSON API that is really easy to use, and their history endpoint can provide weather observations for both the current date and dates back in time. I use that to collect observations for the last two weeks for all five cities. Since the free tier of their API restricts the use to 500 calls a day and maximum of 10 calls per minute, the script I made to generate the dataset has to wait some seconds between each API call. To limit the API usage and be able to rerun the script I cache weather observations on disk, because I don’t expect past weather observations to change.

A weather observation returned by their API has the following syntax:

{'conds': 'Light Rain Showers',
 'date': {'hour': '14',
 'mday': '29',
 'min': '00',
 'mon': '05',
 'pretty': '2:00 PM CEST on May 29, 2015',
 'tzname': 'Europe/Oslo',
 'year': '2015'},
 'dewpti': '35',
 'dewptm': '2',
 'fog': '0',
 'hail': '0',
 'heatindexi': '-9999',
 'heatindexm': '-9999',
 'hum': '36',
 'icon': 'rain',
 'metar': 'AAXX 29121 01384 11786 52108 10121 20015 51007 60001 78082 84260',
 'precipi': '',
 'precipm': '',
 'pressurei': '',
 'pressurem': '',
 'rain': '1',
 'snow': '0',
 'tempi': '54',
 'tempm': '12',
 'thunder': '0',
 'tornado': '0',
 'utcdate': {'hour': '12',
 'mday': '29',
 'min': '00',
 'mon': '05',
 'pretty': '12:00 PM GMT on May 29, 2015',
 'tzname': 'UTC',
 'year': '2015'},
 'visi': '37',
 'vism': '60',
 'wdird': '210',
 'wdire': 'SSW',
 'wgusti': '',
 'wgustm': '',
 'windchilli': '-999',
 'windchillm': '-999',
 'wspdi': '17.9',
 'wspdm': '28.8'}

As you can see, this is the observation for May 29th at 12:00 UTC and the temperature was 12 degrees Celsius (“tempm”, where m stands for the metric system). The API returns a list of observations for a given place and date. The list contains observation data for at least every hour, some places even three times per hour. For each date and time, I combine the observations from all five places into one long record. If there isn’t data from all five places for that timestamp, I skip it, to make sure that all machine learning records have a sufficient amount of data. Lastly, for all records belonging to the training set, I append the target value (the next day’s temperature) as the last field. That way, I get a file “dataset.csv” with known target values that can be used to train the model. Here is an example record with the last number (8) being the target to predict:

20150515-22:00,,0,0,0,,56,,,0,0,6,0,0,10,180,South,,,7.2,Clear,3,0,0,,64,,1015,0,0,7,0,0,35,220,SW,,,10.8,,5,0,0,,76,,1014,0,0,8,0,0,,190,South,,,10.8,Clear,3,0,0,,68,,1013,0,0,7,0,0,30,190,South,,,18.0,Clear,-1,0,0,,66,,1013,0,0,3,0,0,45,130,SE,,,14.4,8

This way, I get a dataset where each timestamp is a separate record and all timestamps belonging to one date gets the next day’s temperature in Oslo at 12 UTC as the target value. In total 844 records for 14 days of observations.

In addition, the script outputs a file “testset.csv” with the last day of observations where the target value is unknown and should be predicted by the model.

Upload both CSV files to Amazon S3 before continuing, as Amazon Machine Learning is only able to use input data from S3 (or Redshift, but in that case Amazon exports it to S3 before using it). Be sure to select the “US Standard” region of S3, as the Machine Learning service is only available in their North Virginia location at the moment. To reduce costs it is important that the S3 datasets are in the same region as the Machine Learning service.

Create Amazon Machine Learning datasources

The datasource is an object used by the Machine Learning model to access the data in S3. Datasource objects contain a schema that tells the model what type of field each variable is (numeric, binary, categorical, text). This schema is autodetected when you create a datasource, but may need some review before continuing to make sure all field types were classified correctly.

To create a datasource, first log into AWS to get access to Amazon Machine Learning. Select Create New -> Datasource. You are then asked for the path to the dataset in S3 and to give it a name, for instance “Weather observations”:

Create datasource

The dataset is verified and the schema is auto-generated. The next step is to make any adjustments to the schema if needed. Remember to say Yes to “Does the first line in your CSV contain the column names?”. I found that most fields containing actual data were correctly categorized, but fields with little or no data were not. There are some observation fields, like for instance “tornado”, that are normally zero for all five places and all times in my dataset. That field is binary but often autodetected as numeric (probably not an issue, since the field has no relevant data). The field “precipm” is numeric, but as it’s often blank (no precipitation detected) it can be mislabeled as categorical. Remember to go through all variables to check for misdetections like these, in my dataset there is 97 variables to check.

The third step is to define a target, which is the variable “target_o_tempm” in my dataset. When selected, Amazon Machine Learning informs you that “ML models trained from this datasource will use Numerical regression.” The last step in datasource creation is to define what field will be used as row identifier, in this case “datetime_utc”. The row identifier will be used to label output from the machine learning model, so it’s handy to use a value that’s unique to each record. The field selected as row identifier will be classified as Categorical. Review the settings and click Finish. It may take some minutes for the datasource to get status Completed, since Amazon does quite a bit of data analysis in the background. For each variable, the range of values is detected and scores like mean and median are computed.

At this point, I suggest you go ahead and create another datasource for the testset while you wait. The testset needs to have the same schema as the dataset used to train the model, which means that every field must have the same classification in both datasources. Therefore it’s smart to create the testset datasource right away when you still remember how each variable should be classified.

In my case I got 30 binary attributes (variables), 11 categorical attributes and 56 numeric attributes. The review page for datasource creation lists that information. Make sure the numbers match for the dataset and testset:

Create datasource - review page

Creating the Machine Learning model

So, now you’ve got two datasources and can initialize model training. Most of the hard work in creating a dataset and the necessary datasources is already done at this point, so it’s time to put all this data to work and initialize creation of the model that is going to generate predictions.

Select Create New -> ML Model on the Machine Learning Dashboard page. The first step is to select the datasource you created for the dataset (use the “I already created a datasource pointing to my S3 data” option). The second step is the model settings, where the defaults are just fine for our use. Amazon splits the training dataset into two parts (a 70-30 split) to be able to both train the model and evaluate its performance. Evaluation is done using the last 30% of the dataset. The last step is to review the settings, and again, the defaults are fine. The advanced settings include options like how many passes ML should do over the dataset (default 10) and how it should do regularization. More on that below, just click Finish now to create the model. This will again take some time, as Amazon performs a lot of computations behind the scenes to train the model and evaluate its performance.

Regularization is a technique used to avoid overfitting of the model to the training dataset. Machine learning models are prone to both underfitting and overfitting problems. Underfitting means the model has failed at capturing the relation between input variables and target variable, so it is poor at predicting the target value. Overfitting also gives poor predictions and is a state where the model follows the training dataset too closely. The model remembers the training data instead of capturing the generic relations between variables. If for instance the target value in the training data fluctuates back and forth, the model output also fluctuates in the same manner. The result is errors and noise in the model output. When used on real data where the target value is unknown, the model will not be able to predict the value in a consistent way. This image helps explain the issue in a beautiful way:

Machine learning - overfitting
Underfitting vs overfitting a machine learning model. Image source: http://knewt.ly/1dDB111

So to avoid overfitting, regularization is used. Regularization can be performed in multiple ways. Common techniques involve penalizing extreme variable weights and setting small weights to zero, to make the model less dependent on the training data and better at predicting unknown target values.

Exploring model performance

Remember that Amazon did an automatic split of the training data, using 70% of the records to train the model and the remaining 30% to evaluate the model. The Machine Learning service computes a couple of interesting performance metrics to inspect. Go to the Dashboard page and select the evaluation in the list of objects. The first metric shown is the RMSE, Root Mean Square Error, of the evaluation training data. The RMSE should be as low as possible, meaning the mean error is small and the predicted output is close to the actual target value. Amazon computes a baseline RMSE based from the model training data and compares the RMSE of the evaluation training data to that. In my testing I archieved an RMSE of about 2.5 in my first tests and near 2.0 after refining the dataset a bit. The biggest optimization I did was to change the value of invalid weather observations from the default value of -999 or -9999 to be empty. That way the range of values for each field got more close to the truth and did not include those very low numbers.

By selecting “Explore model performance”, you get access to an histogram showing the residuals of the model. Residuals are differences between predicted target and actual value. Here’s the plot for my model:

ML model performance histogram
Distribution of residuals

The histogram tells us that my model has a tendency to over-predict the temperature, and that a residual of 1 to 2 degrees Celcius is the most likely outcome. This is called a positive bias. To lower the bias it is possible to re-train the ML model with more data.

Before we use the model to predict temperatures, I’d like to show some of the interesting parts of the results of model training. Click the datasource for the training dataset on the Dashboard page to load the Data Report. Select Categorical under Attributes in the left menu. Sort the table by “Correlations to target” by clicking on that column. You’ll get a view that looks like this:

Datasource - Data Report Categorical

This table tells you how much weight each field has in determining the target value, so it is a good source of information on what the most important weather observation data are. Of the categorical attributes, the wind direction in Stavanger is the most important attribute for how the temperature is going to be in Oslo the next day. That makes sense since Stavanger is west of Oslo, so weather that hits Stavanger first is likely to arrive to Oslo later. Wind direction in Kristiansand is also important and in third place on this ranking we find the conditions in Trondheim. The most commonly observed values is shown along with a small view of the distribution for each variable. Click Numeric to see a similar ranking for those variables, revealing that dew point temperature for Trondheim and atmospheric pressure in Stavanger are the two most important numeric variables. For numeric variables, the range and mean of observed values is shown. It’s interesting to see that none of the mentioned important variables are weather observations from Oslo.

Use the model to predict the weather

The last step is to actually use the Amazon Machine Learning service to predict tomorrow’s temperature, by feeding the testset to the model. The testset contains one day of observations and an empty target value field.

Go back to the Dashboard and select the model in the object list. Click “Generate batch predictions” at the bottom of the model info page. The first step is to locate the testset datasource, which you’ve already created. When you click Verify Amazon checks that the schema of the testset matches the training dataset, which it should given that all variables have the same classification in both datasources. It is possible to create the testset datasource in this step instead of choosing an existing datasource. However, when I tried that, the schema of the testset was auto-generated with no option for customizing field classifications, so therefore it failed verification since the schema didn’t match the training dataset. That’s why I recommended you create a datasource for the testset too, not just for the training dataset.

After selecting the testset datasource, the last step in the wizard before review is choice of output S3 location. Amazon Machine Learning will create a subfolder in the location you supply. Review and Finish to launch the prediction process. Just like some of the previous steps, this may take some minutes to finish.

The results from the prediction process is saved to S3 as a gzip file. To see the results you need to locate the file in the S3 Console, download it and unpack it. The unpacked file is a CSV file, but lacks the “.csv” suffix, so you might need to add that to get the OS to recognize it properly. Results look like this:

tag,trueLabel,score
20150530-00:00,,1.503819E1
20150530-00:50,,1.342798E1
20150530-01:00,,1.297405E1
20150530-01:50,,1.321695E1
20150530-02:00,,1.270639E1
20150530-02:50,,1.271092E1
20150530-03:00,,1.309844E1
20150530-03:50,,1.444643E1
20150530-04:00,,1.373403E1

The “score” field is the predicted value, in this case the predicted temperature in Celsius. The tag reveals what observations resulted in the prediction, so for instance the observations from the five places for timestamp 02:00 resulted in a predicted temperature in Oslo the next day at 12:00 UTC of 12.7 degrees Celsius.

As you might have noticed, we’ve now got a bunch of predictions for the same temperature, not just one prediction. The strength in that is that we can inspect the distribution of predicted values and even how the predicted value changes according to observation timestamp (for example to check if weather observations from daytime give better predictions than those from the night).

Distribution of predicted temperature values

Predicted Temperature Distribution

Even though the individual predictions differ, there seems to be strong indications that the value is between 13.0 and 14.5 degrees Celsius. Computing the mean over all predicted temperatures gives 13.6 degrees Celsius.

Development of predicted temperature values

Predicted Temperature Development

This plot shows the development in the predicted value as the observation time progresses. There does not seem to be any significant trend in how the different observation times perform.

The actual temperature – and some closing remarks

At this point I’m sure you wonder what the real temperature value ended up being. The value I tried to predict using Amazon Machine Learning turned out to be:

12:00 UTC on May 31, 2015:  12 degrees Celsius

Taking into account the positive bias of 1 to 2 degrees and a prediction mean value of 13.6 degrees Celsius, I am very satisfied with these results.

To improve the model performance further I could try to reduce the bias. To do that I’d have to re-train the model with more training data, since two weeks of data isn’t much. To get a model which could be used all year around, I’d have to include training data from a relevant subset of days throughout the year (cold days, snowy days, heavy rain, hot summer days and so on). Another possible optimization is to include more places in the dataset. The dataset lacks places to the east of Oslo, which is an obvious flaw. In addition to more data, I could explore if the dataset might need to be organized differently. It is for instance possible to append all observations from an entire day into one record, instead of creating a separate record for each observation timestamp. That would give the model more variables to use for prediction but then only one record to predict the value from.

It has been very interesting to get started with Amazon Machine Learning and test it with a real-life dataset. The power of the tool combined with the ease of use that Amazon has built into the service makes it a great offering. I’m sure I’ll use the service for new experiments in the future!

~ Arne ~