Stream API responses directly to disk with Python

Python is the de-facto language for Data Science work. It’s very convenient to rapidly prototype a solution in Python and see if it works. However when faced with setting up the same solution in production, there are new space and time constraints to take into account. You’ll likely find that memory usage is the number one resource constraint you need to pay attention to. So let me share a useful trick to reduce memory usage in the early stages of a Python job.

To get your hands on training data, a very common step is to call an API and get data back in JSON format. For example for fetching tweets from Twitter. The simplest approach when using Python is to send an API call using Requests, store the response in a variable, decode the JSON and save it to disk for later processing.

This works well for small datasets, but when you try this on bigger JSON-based datasets, it results in very high memory usage. The problem is that you buffer data in memory and decode it, before saving to disk. Luckily, there is a better way.

Streaming API responses directly to disk

The Requests library has support for streaming. This enables you to iterate over the results you get as they arrive, instead of waiting for the entire response and buffering it in memory. Here’s a code snippet showing how this can be done:

with, data=ojb, headers=headers, stream=True) as response:
  with'out.gz', mode='wt', encoding='utf-8') as f:
    for chunk in response.iter_content(10240, decode_unicode=True):

With this code pattern the JSON is saved directly to disk in a compressed format as soon as it arrives over the network. The iterator ensures that you process chunks of 10240 bytes at a time, meaning that this is the maximum number of bytes your Python job has in memory at any time. The number of bytes is something you can tune and experiment with to figure out what works best in each case.

The impact of streaming response directly to disk is potentially huge. I’ve seen reductions in memory usage from tens of GB to almost nothing for jobs that migrated to this optimized approach.

One caveat of this approach is that it becomes a bit harder to extract parts of the JSON response if you need to do so. One example is APIs that use pagination, where the start and stop markers are part of the JSON response. A way to handle this is to resort to a simple text search of each chunk before the write call. This can work if what you’re trying to find fits nicely into one chunk. For instance if the very last chunk contains the necessary pagination values.

I believe this optimisation is important to use whenever possible, even if JSON parsing becomes a bit harder. It’s a good way to control the memory usage of a Python job that is fetching big datasets from an API. Using this approach the job should be able to stay within the memory constraints in production while fetching data.

~ Arne ~

How I Became a Bitcoin Trader


Running a trading bot for fun and profit

Bitcoin and cryptocurrencies has been a wild ride this fall. After observing the phenomenon from the sidelines for quite some time, I did as many others and bought some BTC with the intention to hold for a long time. Seeing the exchange rate go up every week was fun! However, an idea started to form. The Bitcoin market has very high volatility and is easily outpacing the stock markets by a lot. It must be possible to make a decent profit just by buying low and selling high. This should be a perfect task for a bot.

Enter CryptoTrader

After first considering to build a bot using Bitcoin exchange APIs directly, a friend mentioned there are purpose-built platforms and frameworks for this already. is a big platform for trading bots, offering both the tools to build your own bots and a market for renting bots others have written. Best of all, the site takes care of running the bots 24/7 and all you need to get started is an API key from your favorite exchange.

Getting started

I use Coinbase to buy and hold, so using their GDAX exchange was the quickest way to get started. You get trading accounts at GDAX by completing a short signup process, which contains an additional ID verification complementing the one you had to go through when signing up for Coinbase. Transferring funds between Coinbase and GDAX accounts is instant and free. The main downside to GDAX is that they have a limited number of currency pairs compared to other exchanges. The pairs are based on Bitcoin, Ethereum and Litecoin only. That’s not a big problem when getting started in this field, but as you get more experienced you’ll want to explore currency pairs offered by other exchanges too. CryptoTrader supports a long list of exchanges and currency pairs.

When considering to use CryptoTrader it is highly recommended to backtest bots on historical data. Although writing my own bot is a long-term goal, I chose to browse the bot marketplace for a bot to rent to get started. CryptoTrader has a slightly strange convention for displaying bot popularity. The most popular bot has a “100%” beside it and other bots have lower percentages, instead of using a simple count of the number of running instances. For simplicity I chose to focus my initial testing on the most popular bot, Blade Runner.

Blade Runner is built on a compelling “always sell higher” logic. The bot automatically detects the market conditions as one of rising, falling or sideways market. Based on current price it calculates the price levels where it wants to buy and sell, and executes orders based on that. Whenever it buys, it immediately registers a sell order at the exchange at a price point which is about 3% higher than the buy price (a “limit sell” order, i.e. an order with a requested price for the sale to happen). The sell order remains in the order book of the exchange until that price point is reached and the order is executed by the exchange. Whenever one of the limit sell orders are traded at the exchange, the bot has made a profit. This is the “always sell higher” logic, ensuring a profit at every sell.

There is one catch with this logic: The sell price has to be reached for the profit to be realized. If the bot buys and the market starts falling for a long time, the sell orders will just stay in the exchange order book. The bot does not seem to adjust the sell orders to the new market conditions, so you simply have to wait for the market to rise again to the predefined sell prices even though that might take days (or weeks). This means you need a medium- to long-term horizon for your trading bot and cannot expect phenomenal returns in hours. It is best to view the trading bot as a way of consistently increasing the amount of cryptocurrency you hold.

Testing, testing, testing

All the getting started guides for trading bots stress that it’s important to backtest. This means running a lot of simulations on historical data to get to know the behavior of the bot, tweak the settings and evaluate if it’s able to give consistent profits. So I started backtesting Blade Runner on the BTC-USD currency pair. I ran multiple simulations of different time periods, and in at least two of them there was a significant profit. Eager to get started for real, I purchased the entry plan of CryptoTrader and rented the bot for three months. Buying access and renting the bot was a smooth process where the CryptoTrader platform generated invoices in BTC and I just had to send the BTC from my wallet to the address in the invoice.

With access, I continued to backtest the bot on BTC-USD. And then I realized, to my dismay, that I was unable to find a combination of bot settings that made the bot give positive returns and at the same time beat the “B&H” strategy. B&H is Buy & Hold, which all backtests and live bot instances are automatically compared to. The B&H percentage is the percentage of profit or loss you would get by just buying at the start of the time period and holding the assets until the end of the time period. When backtesting on BTC-USD on the long time period of rising BTC rates this fall, the B&H percentages were usually in the double digit profits. The bot was unable to match that. However, for certain short time periods of corrections in the BTC rates against USD the bot was giving net profits while B&H was negative.

Based on these findings I concluded that the Blade Runner bot performs best in a sideways market. Reviewing the various currency pairs on GDAX, I found that ETH-BTC is usually a sideways market and decided to go with that pair. By being crypto-crypto this pair has additional advantages as I see it. First, this means my assets are in cryptocurrency all the time. This conforms nicely with my initial goal of buying and holding cryptocurrency. Second, I would not consider following a B&H strategy for the ETH-BTC pair (buying ETH at the start and calculating profits based on the ETH-BTC rate). So I can safely ignore any B&H percentages the bot reports as irrelevant for my case. Instead, the goal is to get net profits so that the bot contributes to an increase in my cryptocurrency assets. Any increase is a win, although the first goal is to earn back what I paid for access to CryptoTrader and the Blade Runner bot.

Running the bot

Blade Runner bot running
Screenshot of live bot instance running on the CryptoTrader platform

The bot has a set of parameters for tuning the behavior, along with six main modes of operation: Optimal, Automatic, Rising, Falling, Sideways and Very Low Capital. I started out with a small amount of BTC for trading and chose the Very Low Capital mode. In all other modes the bot uses a varying percentage of the total funds when placing each order. In the Very Low Capital setting this percentage is always 50%. The main reason for this is to ensure that the amount in each order is enough to avoid being rejected as too small by the exchange. This setting did however result in very few trades per day and low profits. So I quickly switched to Optimal mode instead.

Optimal mode automatically tunes the various parameters based on the market conditions. One example is that in a rising market the bot increases the amount it buys compared to a sideways or falling market. This is done to buy bigger amounts as early as possible to capture more of the rising trend.

There is one important setting to tweak in Optimal mode: “Activation of the above Percent Price Fall or Rise”. By default it is set to “Only for Price Fall”, but I got much better results by choosing “For Both Cases”. The default setting only buys when the price is falling below a calculated threshold of about 3% below the price of the last buy or sell order. To capture rising trends it’s important to buy when the price is going up too (betting on the price to increase further).

Adjusting the funds

When I found the settings that worked best and produced a steady profit, I increased the amount of BTC the bot could trade with. Before depositing funds to the exchange account the bot is using, it is important to activate the “Reset button” in the bot settings (you can keep the bot running). Deposit funds, then wait for a minute or two before you deactivate the “Reset button” in bot settings. This way your deposit won’t interfere with how the bot calculates profit.

There are other ways of manual interference too, for example a “Pause button” to stop the bot from making any new buy orders for a period of time. You can also press Buy or Sell buttons to instruct the bot to place orders. But that will influence the bot profits as the price point is probably not optimal.

It’s also worth mentioning that you can stop the bot and start it again later. When the bot stops all open sell orders are cancelled, leaving the funds in the trading account in the state it was when the bot stopped (some in ETH and some in BTC, in this case). Before starting the bot again you need to make sure all funds are in the “base currency”, meaning the last one in the trading pair (for ETH-BTC that would be BTC).

One situation you need to be on the lookout for is the bot stopping due to the total amount of funds exceeding the CryptoTrader plan limit. The various plans you can choose from, include limits in the amount of funds the bot can use for trading. The limits are measured in USD. I started with the Basic plan, and when the funds in the trading account exceeded the limit, the bot was stopped. There was no email alert about this, so I discovered it when checking in on the bot the next day. I paid for an upgrade to the Regular plan and started up the bot again. This means you cannot just start the bot and forget about it, as it might stop at any time without notice. All it takes is for one of the cryptocurrencies in your trading account to quickly rise in value compared to USD.

Preliminary results

The bot has managed to generate enough profits in three weeks to cover the amount paid for three months of access to CryptoTrader and the bot itself. This was my initial goal. I’ll be running the bot for the remaining time period it has been rented for to generate net profits. But I’m sure it is possible to do better. There are lots of other promising bots in the CryptoTrader strategy marketplace, so I’ll spend time backtesting other bots and other cryptocurrency trade pairs to find even better combinations. I might even start writing my own bot.

This has been a very exciting experiment and I’ve learned a lot about cryptocurrencies, trading, signals and bots. Generating a net profit is also nice, even though it’s not very big. The biggest advantage is all the new knowledge.

Remember that the information in this blog post should not be relied upon as advice or construed as providing recommendations of any kind. Any trading activity is inherently risky and it’s important to keep in mind that all money used for trading could get lost.

Did you find this useful? Consider donating some BTC to 13sTihkkJtP8UjuWFgFpEiFuiheCb9fQNE if you want.

~ Arne ~

Using Facebook Prophet Forecasting Library to Predict the Weather

Facebook recently released a forecasting library for Python and R, called Prophet. It’s designed for forecasting future values of time series of any kind, and is remarkably easy to get started with. One of my favorite data sets are temperature time series, so here I’ll explore how good Prophet is at predicting future temperatures based on past weather observations.

The dataset consists of temperature readings every 10 minutes from my Netatmo Weather Station, stored in InfluxDB over at I extracted the mean temperature per hour for the last year, resulting in approx. 9000 hourly temperature observations.The timestamps are in ISO8601 format: “2016-02-11T08:00:00Z”. The dataset also has gaps due to several shorter periods of malfunctioning data collection and system maintenance. Prophet claims to handle such gaps without issues, so let’s see if it does.

Update: If you want to try this out yourself, here’s the dataset!

Installing Prophet on Ubuntu Linux

Installing Prophet for Python is done using pip. Since Prophet depends on the Stan statistical library and is optimized for speed using C, it needs Cython and PyStan. In addition it depends on NumPy and Pandas, so make sure you have those installed too.

$ sudo -H pip install cython pystan numpy pandas
$ sudo -H pip install fbprophet

These can also be installed without sudo if you don’t have administrative privileges on the system.

Producing the first temperature predictions

To produce the initial predictions, we simply run through the following steps without changing default parameters.

Import packages and prepare input data

import pandas as pd
import numpy as np
from fbprophet import Prophet
df = pd.read_csv('outdoor-temperature-hourly.csv')
df = df[df.temperature != 'DIFF']

Preparing the dataset consists of loading it as a DataFrame using Pandas. The input dataset is a merge of two time series and some of the values are invalid. They are filtered out by excluding all rows with the value DIFF.

df['ds'] = df['time']
df['y'] = df['temperature']
df = df.drop(['name', 'time', 'temperature', 'seriesA', 'seriesB'], axis=1)

Prophet requires one column named “ds” with dates or datetimes, and one column named “y” with numeric values. All other columns are ignored. The two required columns are created by duplicating two existing columns “time” and “temperature”, before all irrelevant columns are dropped from the dataframe. The preview shows the resulting dataframe which is used as input to Prophet. The values are degrees Celcius and timestamps are UTC. The input looks like this:

2017-02-26 Prophet - Input dataframe start.png

Fit model and use it to make predictions


Fitting a model to the input data is as simple as “”. To make predictions, you first need a DataFrame with datestamps to predict for. Prophet has a useful make_future_dataframe() method to do just that. By default it generates one row per day, but by setting the frequency parameter to “H” we get hours instead. In this example I generated a dataframe with 50 days of hourly timestamps, starting right after the most recent timestamp in the input dataset.

To make predictions based on the model, all you need to do is call “model.predict(future)”. Using the model and dataframe of future datetimes, Prophet predicts values for each future datetime.

Initial results

Prophet includes built-in plotting of the results using Matplotlib. Here’s the prediction for the hourly temperatures two months into the future, plotted as a continuation of the existing input data:


Pretty impressive output for so little work required! Prophet is fast too – these results were computed in only seconds. The temperature trends are clearly visible with higher hourly temperatures during the summer months than in the winter. The forecast for the next two months says that the temperatures are about to rise from around zero degrees Celcius currently to around 5 degrees in the start of April. However, the uncertainty intervals are quite large in this forecast (around 10 degrees). The somewhat large variations between day and night temperatures might cause this.

Before trying to reduce the uncertainty interval, let’s look at another output of the prediction model: The components of the model.


These subplots show several interesting patterns. The first subplot, “Trend” shows a slight temperature rise on the large scale, from year to year. As the input data only covers one year, this does not qualify as a generic trend. However, it does say that the recorded 2017-temperatures are slightly warmer than the 2016-temperatures.

The weekly trends are a bit strange. According to these results, the start of the week is colder than the rest of the week. As the weather doesn’t care about what day it is, this remains a curiosity.

The last plot shows the seasonal distribution of temperatures during the year of input data. Since I only had one year of input data, this plot follows the data as seen in the main plot pretty closely. Like in the trend subplot, the seasonal distributions would benefit from a lot more input data.

Tuning the model to only cover 2017

The initial results used the entire dataset, but how will Prophet behave if it doesn’t have input data from the same season last year to base the predictions on? Let’s investigate the results of using a smaller time period.

recent = df[df.ds > '2017-01-01']

Here I’m making a new input dataframe by selecting only the rows that have timestamps in 2017. Let’s make a new model and some new predictions too:

model_recent = Prophet()
future_recent = model_recent.make_future_dataframe(periods=24*10, freq='H')
forecast_recent = model_recent.predict(future_recent)

I set the period to make predictions for to 10 days into the future. Since the input data doesn’t cover nearly as much as in the initial results, it makes sense to reduce the number of days to predict for. The results are shown in the following graph.

2017-02-26 Prophet - Hourly Temperatures for 2017 only.png

Since this plot covers a smaller time range, we can see more clearly the daily variations between day and night. Interestingly, Prophet is making the same type of prediction for the coming days as in the previous model; the temperature is going to rise. In this plot we can see why too. At around 2017-02-14 the temperature begun to rise and the last days of input data show temperatures well into the positive Celcius range. Prophet has successfully picked up this trend change and is using that to predict the future.

The Yearly trend subplot confirms that Prophet picked up on the trend change:

2017-02-26 Prophet - Hourly 2017 Temperatures - Yearly component.png

Tuning the model to reduce uncertainty intervals

In the initial results above, the uncertainty intervals were as big as 10 degrees Celcius. This is a bit too much to be useful in a weather forecasting system. To reduce the uncertainty, let’s make the input data a bit more uniform. To avoid having Prophet deal with day/night temperature differences, I filtered out all temperature measurements except for the one at 12:00 UTC each day. The theory is that these values, one per day, will be more uniform and lead to less variance in the model output.

Filtering the measurements could certainly be done using Pandas, but I chose to use the good old shell tools:

$ head -1 outdoor-temperature-hourly.csv > outdoor-temperature-12UTC.csv
$ fgrep "T12" outdoor-temperature-hourly.csv >> outdoor-temperature-12UTC.csv

Here I generate a new CSV file with only the temperature values for timestamps that contain “T12”. In the ISO8601 time format “T” is the date-time separator, so this selects all measurements having 12 as the hour component. I first save line 1 from the original file as line 1 in the new file to not lose column headings and having to tell Pandas about them manually.

Back in my Jupyter Notebook, the new input data is loaded into a dataframe like this:

df2 = pd.read_csv('outdoor-temperature-12UTC.csv', na_values = 'DIFF', usecols = ['time', 'temperature'])
df2['ds'] = df2['time']
df2['y'] = df2['temperature']
df2 = df2.drop('time', axis=1)
df2 = df2.drop('temperature', axis=1)

You might notice slight variations compared to the first example above. Here I added arguments to Pandas read_csv() to ignore lines with value DIFF and ignore all columns except the two with datetimes and values. Either way works, but I think this variant is a bit cleaner. To make sure we only got one temperature value per day, let’s have a look at the dataframe:

2017-02-26 Prophet - Daily Temperatures.png

Looks good. By now you know the drill for making Prophet generate predictions:

model2 = Prophet()
future2 = model2.make_future_dataframe(periods=30)
forecast2 = model2.predict(future2)

The results


This is better. The uncertanty interval is down by approx. 30% to about 7 degrees. That means filtering the input data to one value per day increased the usefulness of the predictions, even though Prophet had less data to base predictions on.

The components of the model look similar to the first example:


Note that the first subplot, “Trend”, has higher values on the Y-axis than in the first example. That might be because I filtered out a lot of colder temperatures. 12 UTC is not the median temperature value during a typical day, it’s closer to the maximum (which is in the afternoon / early evening).

In addition to the subplots, we can inspect the predictions and components in depth by looking at the dataframe with predictions. The most interesting part is the timestamps in the future, so filter for them first:

import datetime
forecast2[forecast2.ds >]

This returns a dataframe with details about all predictions for all timestamps from now on. Here’s a screenshot of some of the contents (certain columns removed for brewity):

2017-02-26 Prophet - Inspecting Prediction Component Values.png

For each timestamp, you get the predicted value (“yhat”) in the rightmost column and all the components making up the prediction. For example we can see that the yearly components are well below zero degrees Celcius. This makes sense in the winter season. In addition, “yhat_lower” and “yhat_upper” show the exact range of the uncertainty interval.

Pro tip: Plot components with uncertainty intervals too

In the previous examples all the component subplots lack uncertainty intervals. Prophet can generate such intervals too, at the cost of longer computation time. This is done by adding a parameter to the model creation:

model2 = Prophet(mcmc_samples=500)

This gives you full Bayesian sampling and takes longer to complete, however it’s still just in the range of some minutes.

With this activated, we get component subplots with uncertainty intervals:

2017-02-26 Prophet - Uncertainty intervals for components.png

These plots reuse the filtered input data from the previous example, with one temperature value per day. We can now reconsider the strange weekly trend in the middle subplot, where the start of the week seems to be colder than the rest. The uncertainty is quite big in that plot. In addition the zero-degrees line is well within the uncertainty interval, for all days of the week. This means it’s not possible to say that there is any difference between the various weekdays.

Closing remarks

Using Prophet to generate predictions turned out to be very easy, and there are several ways to adjust the predictions and inspect the results. While there are other libraries that have more functionality and flexibility, Prophet hits the sweet spot of predictive power versus ease of use. No more looking at weird plots of predicted values because you chose the wrong algorithm for your use case. And no more spending hours to fix input data that has gaps or timestamps in the wrong format.

The Prophet forecasting library seems very well designed. It’s now my new favorite tool for ad-hoc trend analysis and forecasting!

~ Arne ~

Visualize Your Netatmo Data with Grafana

Netatmo Weather Station is the ultimate personal weather station and gives you access to your measurements both through apps and a slick web interface. In addition to that, Netatmo has Developer APIs for access to raw data. Owning a Netatmo station, I just had to try to set up a Grafana dashboard with my weather data. Grafana is both flexible and has great-looking dashboards, so it should be a good fit for weather measurements. Keep on reading for tips on how to do this yourself too!

If you just want to see your Netatmo data in a beautiful dashboard, go to my free web service to register for an account where the dashboard is already set up for you. And if you don’t have a Netatmo Weather Station yet but want to try anyway, create a Netatmo account and go to the publicly available Netatmo HQ station in Paris to associate the station with your account.

Netatmo Grafana dashboard big

To get started with Grafana on your own server or laptop, the first thing to do is to get access to measurement data from your Netatmo station. Register for developer access at Netatmo to get that. After creating an account, continue to “Create an App” to get the necessary client ID and client secret.

Authentication and tokens

Using the client credentials, there are basically two modes of authentication, an easy one and a more complicated one. The easy one is called Client Credentials in the Netatmo docs and gives you an OAuth2 access token in just one step. Just send a request to the Netatmo API with your client credentials and your Netatmo username+password to get a token. The access token is then used in API calls to get measurement data for your station. The more complicated method is called Authorization Code and makes it possible for your application (“client”) to request access to other Netatmo weather stations than your own by redirecting the owner of that station to an OAuth2 authorization webpage.

Using Python, the following code issues a POST request for an access token the easy way. Before running it, replace the NETATMO_* constants with your values for client ID, client secret, username and password.

 data = dict(grant_type='password', client_id=NETATMO_CLIENT_ID,
        client_secret=NETATMO_CLIENT_SECRET, username=NETATMO_USERNAME,
        password=NETATMO_PASSWORD, scope='read_station')

 resp ='', data=data)
 if resp.status_code == 200:
     token = resp.json()
     token['expiry'] = int(time.time()) + token['expires_in']

Now you have a “token” variable as a dictionary with several fields.

The returned token data comes with an expires_in field that says how many seconds the token is valid. In the last line above I transform that into an expiry field containing the unixtime of expiry. That’s necessary to be able to check periodically if the token needs to be refreshed. Here is an example of a token dictionary including the additional expiry field:

{u'access_token': u'abcdefgh|1234567890',
 u'expire_in': 10800,
 u'expires_in': 10800,
 u'expiry': 1468168541,
 u'refresh_token': u'ababababab|2727272727',
 u'scope': [u'read_station']}

When the expiry time gets close, the refresh_token is used to renew the access_token by issuing another POST request:

data = dict(grant_type='refresh_token', refresh_token=token['refresh_token'], client_id=NETATMO_CLIENT_ID, client_secret=NETATMO_CLIENT_SECRET)
resp ='', data=data)
if resp.status_code == 200:
    token = resp.json()
    token['expiry'] = int(time.time()) + token['expires_in']

Measurement JSON data format

With a valid access_token we can fetch the interesting measurements from Netatmo. The APIs contain several methods for different Netatmo products, but for the Weather Station only /api/getstationdata is needed. Issue a GET request to see all the available measurements:

resp = requests.get('' + token['access_token'])
if resp.status_code == 200:
    data = resp.json()

The data structure returned has a lot of fields and varies by the number of extra modules attached to your Netatmo station (and even more if you’ve connected more than one station to your Netatmo account, like the public Netatmo HQ station in Paris). Here’s an excerpt of data returned in JSON format:

    [{u'_id': u'70:ee:aa:bb:cc:dd',
      u'co2_calibrating': False,
        {u'AbsolutePressure': 985,
        u'CO2': 431,
        u'Humidity': 46,
        u'Noise': 37,
        u'Pressure': 1001.9,
        u'Temperature': 26.3,
        u'date_max_temp': 1468101837,
        u'date_min_temp': 1468125907,
        u'max_temp': 26.7,
        u'min_temp': 24.8,
        u'pressure_trend': u'stable',
        u'temp_trend': u'stable',
        u'time_utc': 1468157806},
        [{u'_id': u'02:00:aa:bb:cc:dd',
            {u'Humidity': 52,
            u'Temperature': 22.8,
            u'date_max_temp': 1468127398,
            u'date_min_temp': 1468115964,
            u'max_temp': 26,
            u'min_temp': 9.9,
            u'temp_trend': u'down',
            u'time_utc': 1468157799},

The dashboard_data section has the actual readings, while data_type informs us of the measurement types that this station reports. Values are reported in the unit the user selected on setup, meaning they could be Fahrenheit instead of Celcius for instance. A separate user part of the returned JSON has details about which units are used.

In addition to the data from the indoor Weather Station, stations also have a modules parameter which holds measurements from all connected modules (outdoor module, rain gauge, wind gauge and so on). As seen above, for each module the JSON fields are the same as for the station, with the measurements in dashboard_data and reported measurements in data_type. This greatly simplifies parsing of the JSON response, as you can use the same code for parsing the devices list as for each entry in the modules list.

Storing data in InfluxDB

InfluxDB is a time series database with high performance, good compression and an easy-to-use write API and query language. After installing and starting it up with default config options, it’s ready to use as a data store for time-series data like weather measurements. The write API is available through HTTP. To write to InfluxDB, issue POST requests with the actual data as newline-delimited strings in the body of the request. InfluxDB documentation refers to this as the line protocol. An example write request can look like this:

payload = """
Humidity,station_name=MyStation,module_name=Outdoors value=52 1468157799
Temperature,station_name=MyStation,module_name=Outdoors value=22.8 1468157799
Rain,station_name=MyStation,module_name=Rain_Gauge value=0 1468157799

resp ='http://localhost:8086/write?precision=s&db=netatmo', data=payload)

This will save three measurements into time series named Humidity, Temperature and Rain in database netatmo. The value field is the actual measurement and the timestamp is from the time_utc field alongside the measurements. It’s trivial to convert the returned JSON into the line format that InfluxDB expects.

The station_name and module_name are custom tags attached to the time series to make it possible to distinguish between different stations and modules. The tags are available for filtering in Grafana using WHERE statements. Station names and module names defined when setting up the Netatmo Weather Station are available in the returned JSON from the Netatmo API.

Setting up a Grafana dashboard

After downloading and installing Grafana, go to the Datasource part of the web UI and create a new data source with the following settings:

Grafana - Data source setup

The User and Password under InfluxDB Details are root/root, but are not really used unless InfluxDB authentication was configured with non-default settings before starting up the database.

With a data source in place, the next step is to create a dashboard. There are many ways to visualize weather data, but at least add graphs for each time series you’ve stored in InfluxDB. That way you get a feel for how the metric changes over time. For some metrics the trends are most interesting, for other metrics only the current value is necessary to display. If everything works as expected, you should get suggestions when you set up the metric queries in Grafana, like this:

Create graph - Suggestions

Under the WHERE section you can filter on tags associated with each time series, like for example the module name to only get outdoor temperatures instead of both indoor and outdoor.

Awesome visualizations

With Grafana and InfluxDB set up to store data from your Netatmo Weather Station, you can create some pretty awesome visualizations. Like for instance this outdoor temperature graph over several months, with a moving_average() layered on top:

Outdoor Temperature with Moving Average.png

This makes it easy to see that the temperature moved in the right direction during these months, and that there were some periods with higher average temperature than others (the two first weeks of May in particular).

If you’re interested in more awesome visualizations of Netatmo Weather Station data, head over to my web service to get your own personalized weather dashboard!

~ Arne ~

Docker + OpenStack = True

OpenStack and DockerOpenStack has the potential to revolutionize how enterprises do virtualization, so lots of people are currently busy with setting up OpenStack private clouds. I’ve been part of such a project too. In this post, I’ll explain why I believe Docker makes a lot of sense to use for deploying the services that together make up an OpenStack cloud. Docker features flexibility, orchestration, clustering and more.

All about microservices

OpenStack is commonly viewed as a platform for virtualization. However, under the hood OpenStack is nothing more than a bunch of microservices which end-users can interact with, either directly or through the Horizon UI. Most of the microservices are REST APIs for controlling different aspects of the platform (block storage, authentication, telemetry and so on). API nodes for an OpenStack cloud end up running dozens of small processes, each with their own distinct purpose. In such a setting, Dockerizing all these microservices makes a lot of sense.

Flexibility and speed

Of several improvements Dockerizing brings, the most important is the flexibility. Running a Docker Swarm for OpenStack API services makes it possible to easily scale out by adding more swarm nodes and launching additional copies of the containers on them. Coupled with HAProxy, the new containers can join the backend pool for an API service in seconds and help alleviate high load. Sure, the same can be accomplished by adding physical API nodes, provision them with Chef/Puppet/Salt/Ansible and reconfigure HAProxy to include them in the backend pool for each service, but that takes considerably longer time than just launching more pre-built containers.

Versioning and ease of debugging

Since Docker images are versioned and pushed to a central registry, it’s trivial to ensure that all instances of a service run with identical configs, packages and libraries. Furthermore, even though a service like Nova typically consists of 4-5 different sub-services, all of them can share the same config and therefore use the same container image. The only difference is which command the container runs when started. Being able to easily check that all backend instances are identical (use the same version of the image) is important when debugging issues. Also, Docker Compose has built-in support for viewing logs from several containers sorted chronologically, no matter which physical node they run on. That also includes the option to follow logs in real-time from several containers at once.

Orchestration and clustering

Using Docker Compose to orchestrate microservices is important. Compose provides an interface to scale the number of containers for each service and supports constraints and affinities on where each container should run. For example if you run a clustered MySQL instance for the internal OpenStack databases, Compose can ensure that each database container runs on a different physical host than the others in the MySQL cluster. When creating container images for each service, an entrypoint shell script can be included and used to detect if there is an existing cluster to add itself to, or if this is the first instance of the service. Clustering the services that OpenStack APIs rely on (notably MySQL and RabbitMQ) becomes easier with this type of pattern.

Service Discovery

A solution for Service Discovery is a requirement when operating dozens of microservices that expect to be able to find each other and talk together. In the Docker ecosystem, Consul is a great option for service discovery. Coupled with the Registrator container deployed on each swarm node, all microservices that listen on a TCP/UDP port are automatically added as services in Consul. It’s easy to query Consul for the IP addresses of a particular service using either the HTTP or DNS interface. With the right Consul setup, each Dockerized service can reference other services by their Consul DNS name in config files and so on. This way, no server names or IP addresses need to be hard coded in the config of each service, which is a great plus.

There are more desireable effects of Dockerizing OpenStack microservices, but the most important ones in my opinion are flexibility, ease of debugging, orchestration and service discovery. If you wonder why Docker doesn’t just replace OpenStack entirely, I recommend reading this TechRepublic article. There Matt Asay points out that a common enterprise pattern is to utilize OpenStack for its strong multi-tenancy model. Applications can in that case be deployed with Docker on top of VMs provisioned using OpenStack, which I think will be a very useful way of utilizing OpenStack and Docker for big enterprises with a diverse set of applications, departments and users.

~ Arne ~

Spark cluster on OpenStack with multi-user Jupyter Notebook

Spark on OpenStack with Jupyter

Apache Spark is gaining traction as the defacto analysis suite for big data, especially for those using Python. Spark has a rich API for Python and several very useful built-in libraries like MLlib for machine learning and Spark Streaming for realtime analysis. Jupyter (formerly IPython Notebook) is a convenient interface to perform exploratory data analysis and all kinds of other analytic tasks using Python. In this post I’ll show step-by-step how to set up a Spark cluster on OpenStack and configure Jupyter with multi-user access and an easy-to-use PySpark profile.

Setting up a Spark cluster on OpenStack VMs

To get a Spark standalone cluster up and running, all you need to do is spawn some VMs and start Spark as master on one of them and slave on the others. They will automatically form a cluster that you can connect to from Python, Java and Scala applications using the IP address of the master node. Sounds easy enough, right? But there are some pitfalls, so read on for tips on how to avoid them.

To create the first VM to be used as master node, I use the OpenStack command line client. We’ll get that node up and running first. The distribution is Ubuntu 14.04 Trusty. Cloud-init is an easy way to perform bootstrapping of nodes to get the necessary software installed and set up. To install and start a Spark master node I use the following Cloud-init script:

# Cloud-init script to get Spark Master up and running

# Firewall setup
ufw allow from $LOCALNET
ufw allow 80/tcp
ufw allow 443/tcp
ufw allow 4040:4050/tcp
ufw allow 7077/tcp
ufw allow 8080/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Configure Spark master
cp /opt/spark/conf/ /opt/spark/conf/
sed -i 's/# - SPARK_MASTER_OPTS.*/SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4 -Dspark.executor.memory=2G"/' /opt/spark/conf/

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts

# Start Spark Master with IP address of eth0 as the address to use
/opt/spark/sbin/ -h $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/)

Save this as for use with the OpenStack command line client. The script first adds some firewall rules to allow access to the different components and installs the OpenJDK dependency. Next, a Spark tarball is downloaded, unpacked and made available under /opt/spark on the host. The tarball is prepackaged with Hadoop v1 libraries (note the “hadoop1.tgz” suffix), so adjust this if you need Hadoop v2 instead.

The only configuration of Spark we need at this point is to set the options “spark.deploy.defaultCores” and “spark.executor.memory”. They are used to configure how much resources each application will get when it starts. Since the goal is to set up a multi-user environment with Jupyter notebooks, we need to limit the total amount of CPU cores and RAM that each notebook will use. Each notebook is an “application” on the cluster for as long as the notebook is active (i.e until it is shutdown by the user). If we don’t limit the resource allocation, the first notebook created will allocate all available CPU cores on each worker, leaving no CPU cores free for the next user. In addition, the default RAM allocation for each app is only 512 MB on each worker node, which might be a bit too small, so we bump that up to 2 GB.

The echo line adds “spark-master” to /etc/hosts with a reference to the IP address of the VM. Spark tries to resolve the local hostname on startup. Without a resolvable hostname you might encounter “Name or service not known”-errors, resulting in Java exceptions and exits.

On the last line the Spark master process is started. The master process is given the IP address of the local host as an argument to make sure it binds to the correct interface. The IP address is extracted from the output of the “ip addr” command.

One way to launch the master VM with the Cloud-init script is like this:

# Install OpenStack client if not present already
sudo apt-get -y install python-openstackclient

# Customize these values to match your OpenStack cluster

# Create Spark master VM
openstack server create --flavor m1.medium --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-master

If you rather prefer using the web UI (Horizon), you could just as easily paste the Cloud-init script into the text box on the Post-Creation tab of the Launch Instance dialog and archieve the same result.

It will take some minutes for the Spark master VM to finish bootstrapping. When it’s done the Spark master UI will be available on port 8080. Remember to associate a floating IP to the VM to be able to access it from outside the OpenStack project:

openstack ip floating add spark-master

Verify that the Spark master UI is reachable and displays metadata about the cluster. If the UI is not reachable, first check that your Security Group rules allow port 8080 to the Spark master VM. Second, check the Cloud-init logs on the VM to ensure all parts of the initialization succeeded. You’ll find the Cloud-init log file on the VM as /var/log/cloud-init.log and the output from the Cloud-init script in /var/log/cloud-init-output.log. You can also try to re-run parts of the Cloud-init script with sudo to narrow down any issues with the initialization. When initialization succeeds the Spark master UI will look like this:

Spark master UI with no workers

As expected there are no workers alive yet, so let’s initialize some. To do so we use a slightly modified version of the Cloud-init script above. The main difference is the startup command, which is now /opt/spark/sbin/ with the address to the master as the only argument. Remember to adjust the variables below to your IP range and master IP address.

# Cloud-init script to get Spark Worker up and running

# Firewall setup
ufw allow from $LOCALNET
ufw allow 8081/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts
# Start Spark worker with address of Spark master to join cluster 
/opt/spark/sbin/ spark://$SPARK_MASTER_IP:7077

Save this as Notice how the last line starts up a slave with the address to the cluster master on port 7077 as the only argument. Since the Cloud-init script has no worker-specific details, it is easy to expand the cluster just by creating more worker VMs initialized with the same Cloud-init script. Let’s create the first worker:

# Customize these values to match your OpenStack cluster

# Create first Spark worker VM, this time with flavor m1.large
openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker1

Pause for a moment to let the worker creation process finish to ensure that Cloud-init does the necessary work without errors. There is no point in initializing more workers until the process is proven to work as expected. Again, it can be useful to check /var/log/cloud-init.log and /var/log/cloud-init-output.log on the new VM to verify that Cloud-init does what it’s supposed to do. On success you’ll see the worker in the Spark master UI:

Spark master UI with one worker registered

Create some more worker nodes to scale the cluster to handle more parallel tasks:

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker2

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker3

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker4

Verify that the new worker nodes show up in the Spark master UI before continuing.

Installing Jupyter and JupyterHub

A shiny new Spark cluster is fine, but we also need interfaces to be able to use it. Spark comes prepackaged with shells for Scala and Python where connection to a cluster is already set up. The same level of usability is possible to get with Jupyter (formerly IPython Notebook), so that when you open a new notebook a connection to the Spark cluster (a SparkContext) is established for you. The SparkContext is available through the variable “sc” in the notebook, ready to use by calling sc.textFile() to create an RDD, for instance.

JupyterHub is a multi-user server for Jupyter notebooks. That makes it possible for several users to use Jupyter independently and have their own notebooks and files in their home directory instead of a shared storage directory for all notebooks. However, this requires that each user has a user account on the VM where JupyterHub is running. Add user accounts for relevant users now if needed. JupyterHub uses unix authentication, meaning that it relays the username and password to the underlying authentication system on the VM for credential check.

In this deployment JupyterHub is installed on the Spark master VM and launched there. It could run on a separate VM, but there is normally no need for that since the Spark master process does not require that much resources. The VM where Jupyter notebooks are executed are called the “driver” in Spark, and that will require some processing power and memory use, depending on the use case.

SSH into the Spark master VM and run the following set of commands:

# Install pip3 and other dependencies
sudo apt-get -y install python3-pip npm nodejs-legacy
sudo npm install -g configurable-http-proxy

# Install JupyterHub and Jupyter
sudo pip3 install jupyterhub
sudo pip3 install "ipython[notebook]"

pip3 is used instead of pip because JupyterHub depends on Python >= 3.3. After installing all software and dependencies, start the JupyterHub service:

sudo jupyterhub --port 80

The benefit of having JupyterHub listen on port 80 instead of the default port 8000 should be obvious, but it requires that you start the service as root. In addition you might want to look into securing JupyterHub with an SSL certificate and have it listen on port 443, since it asks for passwords when users log in. When you have the necessary certificate and keys on the VM, the service can be started like this instead:

sudo jupyterhub --port 443 --ssl-key hub.key --ssl-cert hub.pem

Now try to open the JupyterHub login page on the floating IP address of the VM and log in. After login you should be greeted with an empty home directory with no notebooks. A new notebook can be created by clicking “New” on the right above the notebook list.

Jupyter - notebook list empty

If you create a new notebook, you’ll notice that the only supported kernel is Python3 at the moment. We need to add PySpark to that list to be able to use the Spark cluster from Jupyter.

Configuring Jupyter for PySpark

Jupyter relies on kernels to execute code. The default kernel is Python, but many other languages can be added. To use the Spark cluster from Jupyter we add a separate kernel called PySpark. In addition, kernels can run specific commands on startup, which in this case is used to initialize the SparkContext. First, some dependencies need to be installed:

sudo apt-get -y install python-dev python-setuptools
sudo easy_install pip
sudo pip install py4j
sudo pip install "ipython[notebook]"

It might seem odd to install ipython[notebook] as a dependency, but the reason is that IPython/Jupyter contains a number of Python support modules that kernels rely on. Previously when we installed using pip3, we got the Python3 versions of those modules. When installing again with pip, we get Python2 versions. PySpark depends on Python2.

To add PySpark as a kernel, a file containing a kernel definition must be created. Kernel definitions are JSON files in a specific directory. Kernels can either be enabled globally for all users or for one user only, depending on where the definition file is placed. We want the PySpark kernel to be available for all users, so we’ll add it under /usr/local/share/jupyter/kernels/ like this:

sudo mkdir -p /usr/local/share/jupyter/kernels/pyspark/
cat <<EOF | sudo tee /usr/local/share/jupyter/kernels/pyspark/kernel.json
 "display_name": "PySpark",
 "language": "python",
 "argv": [
 "env": {
  "SPARK_HOME": "/opt/spark/",
  "PYTHONPATH": "/opt/spark/python/:/opt/spark/python/lib/",
  "PYTHONSTARTUP": "/opt/spark/python/pyspark/",
  "PYSPARK_SUBMIT_ARGS": "--master spark:// pyspark-shell"

This kernel definition ensures that the Spark built-in “pyspark-shell” is started under the hood as the process where our code will be executed. Notice how the address to the Spark cluster, “spark://”, is sent as an argument. Remember to customize that address to your specific environment. The address references the Spark master VM (the same host as Jupyter runs on), but could just as easily reference an external host. For instance if you wanted to setup Jupyter on a separate OpenStack VM, or if you already have a Spark cluster running somewhere else that you want to connect to. The Spark master UI shows the right URL to use to connect, right below the Spark logo. Note that the Spark workers depend on being able to establish connections back to the host where the driver process runs (the Jupyter notebook), which may not be possible depending on the firewall setup when connecting to a remote Spark cluster. This is the reason a firewall rule allowing all traffic on the local network ( in my case) is added by Cloud-init on all the Spark VMs.

After adding the kernel definition file for PySpark you’ll have to refresh the Jupyter homepage to see the new kernel in the list. No need to restart JupyterHub.

Debugging PySpark startup errors

If you get an error message about “Dead kernel” when creating new notebooks with the PySpark kernel, there might be several causes. For instance the VM running Jupyter might not be able to connect to the Spark cluster. Or it might lack some dependencies (packages/modules) to initialize the SparkContext. To debug kernel startup issues, first check the output from JupyterHub in the terminal where you started it (might be smart to keep that terminal open until everything works as expected). JupyterHub will for example output log lines like this when a Python module is missing:

[I 2015-09-20 20:31:24.276 ubuntu kernelmanager:85] Kernel started: 8a0b760d-357a-4507-a18b-da4bebd09e3f
/usr/bin/python2: No module named ipykernel
[I 2015-09-20 20:31:27.278 ubuntu restarter:103] KernelRestarter: restarting kernel (1/5)
/usr/bin/python2: No module named ipykernel
[W 2015-09-20 20:31:39.295 ubuntu restarter:95] KernelRestarter: restart failed
[W 2015-09-20 20:31:39.295 ubuntu kernelmanager:52] Kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f died, removing from map.
ERROR:root:kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f restarted failed!
[W 2015-09-20 20:31:39.406 ubuntu handlers:441] Kernel deleted before session

This error occured because I hadn’t installed ipython[notebook] using pip yet, so the Python2 modules needed by PySpark were not available. Notice how the error message states that it is /usr/bin/python2 that reports the error. Jupyter tries to restart the kernel a total of five times, but hits the same error every time and eventually gives up. In the notebook UI this is shown as a “Dead kernel” message.

Other errors can pop up in the Spark logs on master or worker nodes. Spark logs to /opt/spark/logs, so have a look there if anything is malfunctioning. The Spark master node logs every new application that is started on the Spark cluster, so if you don’t see output there when opening a new notebook with the PySpark profile, something is not right.

One last debugging tip is to try to start the PySpark shell from Bash on the VM where Jupyter runs. It is useful to inspect what happens when the PySpark shell starts. Here is an example of output when a dependency is missing:

$ python2 /opt/spark/python/pyspark/
Traceback (most recent call last):
 File "/opt/spark/python/pyspark/", line 28, in <module>
 import py4j
ImportError: No module named py4j

Remember to use Python2 when starting the shell. The above command mimics what Jupyter does behind the scenes when a new notebook is created.

Ready to use PySpark in Jupyter

If everything went according to plan, you now have a Spark cluster which you can easily use from Jupyter notebooks just by creating them with the PySpark profile 🙂 The variable “sc” is initialized as a SparkContext connected to the Spark cluster and you can start exploring the rich Spark API for data transformation and analysis. Here’s a screenshot from a notebook where I extracted responsetime numbers from Varnish NCSA logs (web cache server logs) and computed common statistics like mean and standard deviation for the responsetime of each backend in use by the cache server:

Example use of PySpark in Jupyter

~ Arne ~

Spark – How to fix “WARN TaskSchedulerImpl: Initial job has not accepted any resources”

Apache Spark and Firewalls

When setting up Apache Spark on your own cluster, in my case on OpenStack VMs, a common pitfall is the following error message:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

This error can pop up in the log output of the interactive Python Spark shell or Jupyter (formerly IPython Notebook) after starting a PySpark session and trying to perform any kind of Spark action (like .count() or .take() on a RDD), rendering PySpark unusable.

As the error message suggests, I investigated resource shortages first. The Spark Master UI reported that my PySpark shell had allocated all the available CPU cores and a small portion of the available memory. I therefore lowered the number of CPU cores for each Spark application on the cluster, by adding the following line in on the master node and restarting the master:


After this change my PySpark shell was limited to 4 CPU cores of the 16 CPU cores in my cluster at that time, instead of reserving all available cores (the default setting). However, even though the Spark UI now reported there would be enough free CPU cores and memory to actually run some Spark actions, the error message still popped up and no Spark actions would execute.

While debugging this issue, I came across a Spark-user mailing list post by Marcelo Vanzin of Cloudera where he outlines two possible causes for this particular error:

- You're requesting more resources than the master has available, so
your executors are not starting. Given your explanation this doesn't
seem to be the case.

- The executors are starting, but are having problems connecting 
back to the driver. In this case, you should be able to see 
errors in each executor's log file.

The second of these was causing this error in my case. The host firewall on the host where I ran my PySpark shell rejected the connection attempts back from the worker nodes. After allowing all traffic between all nodes involved, the problem was resolved! The driver host was another VM in the same OpenStack project, so allowing all traffic between the VMs in the same project was OK to do security-wise.

The error message is not particularly useful in the case where executors are unable to connect back to the driver. If you encounter the same error message, remember to check firewall logs from all involved firewalls (host and/or network firewalls).

On a side note, this requirement of Spark to connect back from executors to the driver makes it harder to set up a Spark cluster in a secure way. Unless the driver is in the same security zone as the Spark cluster, it may not be possible to allow the Spark cluster workers to establish connections to the driver host on arbitrary ports. Hopefully the Apache Spark project will address this limitation in a future release, by making sure all necessary connections are established by the driver (client host) only.

~ Arne ~

Analyzing Popular Topics In My Twitter Timeline using Apache Spark

Word cloud of Twitter hashtags
Most popular Twitter topics, generated using Apache Spark and

Over the last weeks I’ve dived into data analysis using Apache Spark. Spark is a framework for efficient, distributed analysis of data, built on the Hadoop platform but with much more flexibility than classic Hadoop MapReduce. To showcase some of the functionality I’ll walk you through an analysis of Twitter data. The code is available as an IPython Notebook on Github.

The question I want to answer using Spark is: What topics are people currently tweeting about? The people are in this case the ones I follow on Twitter, at the moment approx. 600 Twitter users. They represent a diverse set of interests mirroring the topics I’m interested in, such as data analysis, machine learning, networking technology, infrastructure, motor sports and so on. By extracting the hashtags they’ve used in tweets the last week and do a standard word count I’ll generate a list of the most popular topics right now.

The amount of functionality for data analysis in Spark is impressive. Spark features a long list of available transformations and actions, such as map, filter, reduce, several types of joins, cogroup, sum, union, intersect and so on. In addition, Spark has a machine learning library with a growing number of models and algorithms. For instance does Spark MLlib include everything needed to do the Linear Regression example I did on AWS in my previous blog post. Spark also comes with a Streaming component where batch analysis pipelines easily can be set up to run as realtime analysis jobs instead. Compared to classic Hadoop MapReduce Spark is not only more flexible, but also much faster thanks to the in-memory based analysis.

To do the Twitter analysis, I first fetched about 24000 tweets from the Twitter API using a Python module called Tweepy:

# Get details about own user
me =
friends = api.friends_ids(
# Initialize data structure
tweets = {}
# Fetch lists recent tweets for each of the user IDs in the list 'friends'
for user in friends:
# Only query Twitter for data not already cached
if db.tweets.find({'user_id': user}).count() == 0:
print('Get recent tweets for user {}…'.format(user))
tweets[user] = []
# Query Twitter API for 2 pages (= 40 tweets)
for page in tweepy.Cursor(api.user_timeline, id=user).pages(2):
print(' Got {} tweets so far…'.format(len(tweets[user])))
# API is rate limited (5 sec sleep = 180 reqs in 15 min)
# Save each tweet to database
for tweet in tweets[user]:
db.tweets.insert_one({'user_id': user, 'tweet': tweet._json})

view raw

hosted with ❤ by GitHub

Each tweet was saved to a local MongoDB instance for persistence. The loop first checks the database to see if that user has been processed already, to save time if the loop has to be run several times. Due to rate limiting of the Twitter API it took about 2 hours to download the dataset. By the way, the term “friends” is the word Twitter uses to reference the list of users that a user follows.

The code snippet above depends on a valid, authorized API session with the Twitter API and an established connection to MongoDB. See the IPython Notebook for the necessary code to establish those connections. Of the dependencies the Python modules “tweepy” and “pymongo” need to be installed, preferably using pip to get the latest versions.

With the tweets saved in MongoDB, we are ready to start doing some filtering and analysis on them. First, the set of tweets need to be loaded into Spark and filtered:

# Extract tweets from MongoDB
allTweets = []
for doc in db.tweets.find():
# Load tweets into Spark for analysis
allTweetsRDD = sc.parallelize(allTweets, 8)
# Set up filter to only get tweets from the last week
limit = datetime.timedelta(days=DAYS_LIMIT)
limit_unixtime = time.mktime(limit.timetuple())
# Filter tweets to get rid of those who either have no hashtags or are too old
tweetsWithTagsRDD = allTweetsRDD.filter(lambda t: len(t['entities']['hashtags']) > 0)
filteredTweetsRDD = tweetsWithTagsRDD.filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)

In the code snippet above I use sc.parallelize() to load a Python list into Spark, but I could just as easily have used sc.textfile() to load data from a file on disk or sc.newAPIHadoopFile() to load a file from HDFS. Spark also supports use of Hadoop connectors to load data directly from other systems such as MongoDB, but that connector unfortunately does not support PySpark yet. In this case the dataset fits in memory of the Python process so I can use sc.parallelize() to load it into Spark, but if I’d like to run the analysis on a longer timespan than one week that would not be feasible. To see how the MongoDB connector can be used with Python, check out this example code by @notdirkhesse which he demonstrated as part of his excellent Spark talk in June.

“sc” is the SparkContext object, which is the object used to communicate with the Spark API from Python. I’m using a Vagrant box with Spark set up and “sc” initialized automatically, which was provided as part of the very interesting Spark MOOCs CS100 and CS190 (BerkeleyX Big Data X-Series). The SparkContext can be initialized to use remote clusters running on EC2 or Databricks instead of a local Vagrant box, which is how you’d scale out the computations.

Spark has a concept of RDDs, Resilient Distributed Datasets. RDDs represent an entire dataset regardless of how it is distributed around on the cluster of nodes. RDDs are immutable, so a transformation on a RDD returns a new RDD with the results. The last two lines of the code snippet above are transformations to filter() the dataset. An important point to note about Spark is that all transformations are lazily evaluated, meaning they are not computed until an action is called on the resulting RDD. The two filter statements are only recorded by Spark so that it knows how to generate the resulting RDDs when needed.

Let’s inspect the filters in a bit more detail:

tweetsWithTagsRDD = allTweetsRDD.filter(lambda t: len(t['entities']['hashtags']) > 0)

The first filter transformation is called on allTweetsRDD, which is the RDD that represents the entire dataset of tweets. For each of the tweets in allTweetsRDD, the lambda expression is evaluated. Only those tweets where the expression equals True is returned to be included in tweetsWithTagsRDD. All other tweets are silently discarded.

filteredTweetsRDD = tweetsWithTagsRDD.filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)

The second filter transformation is a bit more complex due to the datetime calculations, but follows the same pattern as the first. It is called on tweetsWithTagsRDD, the results of the first transformation, and checks if the tweet timestamp in the “created_at” field is recent enough to be within the time window I defined (one week). The tweet timestamp is parsed using python-dateutil, converted to unixtime and compared to the precomputed limit.

For those of you who are already acquainted with Spark, the following syntax might make more sense:

filteredTweetsRDD = (allTweetsRDD
                     .filter(lambda t: len(t['entities']['hashtags']) > 0)
                     .filter(lambda t: time.mktime(parser.parse(t['created_at']).timetuple()) > limit_unixtime)

The inspiration from Functional Programming in Sparks programming model is apparent here, with enclosing parentheses around the entire statement in addition to the use of lambda functions. The resulting filteredTweetsRDD is the same as before. However, by assigning a variable name to the results of each filter transformation, it’s easy to compute counts:

tweetCount = allTweetsRDD.count()
withTagsCount = tweetsWithTagsRDD.count()
filteredCount = filteredTweetsRDD.count()

count() is an example of an action in Spark, so when I execute these statements the filter transformations above are also computed. The counts revealed the following about my dataset:

  • Total number of tweets: 24271
  • Tweets filtered away due to no hashtags: 17150
  • Of the tweets who had hashtags, 4665 where too old
  • Resulting set of tweets to analyze: 2456

Now we’re ready to do the data analysis part! With a filtered set of 2456 tweets in filteredTweetsRDD, I proceed to extract all hashtags and do a word count to find the most popular tags:

# Count the number of occurrences for each hashtag,
# by first extracting the hashtag and lowercasing it,
# then do a standard word count with map and reduceByKey
countsRDD = (filteredTweetsRDD
.flatMap(lambda tweet: [hashtag['text'].lower() for hashtag in tweet['entities']['hashtags']])
.map(lambda tag: (tag, 1))
.reduceByKey(lambda a, b: a + b)
# Get the most used hashtags (order countsRDD descending by count)
countsRDD.takeOrdered(20, lambda (key, value): value)

What’s happening here is that I’m creating a new Pair RDD consisting of tuples of (hashtag, count). The first step is to extract all hashtags with a flatMap(), and remember that every tweet can contain a list of multiple tags.

filteredTweetsRDD.flatMap(lambda tweet: [hashtag['text'].lower() \
    for hashtag in tweet['entities']['hashtags']])

A flatMap() transformation is similar to a map(), which passes each element of a RDD through a user-supplied function. In contrast to map, flatMap ensures that the result is a list instead of a nested datastructure – like a list of lists for instance. Since the analysis I’m doing doesn’t care which tweet has which hashtags, a simple list is sufficient. The lambda function does a list comprehension to extract the “text” field of each hashtag in the data structure and lowercase it. The data structure for tweets looks like this:

{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Sun Jul 12 19:29:09 +0000 2015',
 u'entities': {u'hashtags': [{u'indices': [75, 83], 
                              u'text': u'TurnAMC'},
                             {u'indices': [139, 140], 
                              u'text': u'RenewTURN'}],
               u'symbols': [],
               u'urls': [],

So the result of the lambda function on this tweet would be:

['turnamc', 'renewturn']

After the flatmap(), a standard word count using map() and reduceByKey() follows:

.map(lambda tag: (tag, 1))
.reduceByKey(lambda a, b: a + b)

A word count is the “Hello World”-equivalent for Spark. First, each hashtag is transformed to a key-value tuple of (hashtag, 1). Second, all tuples with the same key are reduced using the lambda function, which takes two counts and returns the sum. Spark runs both map() and reduceByKey() in parallel on the data partition residing on each worker node in a cluster, before the results of the local reduceByKey() are shuffled so that all values belonging to a key is processed by one worker. This behaviour mimics the use of a Combiner in classic Hadoop MapReduce. Since both map() and reduceByKey() are transformations, the result is a new RDD.

To actually perform the computations and get results, I call the action takeOrdered() with a cursom sort function to get the top 20 hashtags by count. The sort function simply orders key-value pairs descending by value.

The 20 most used hashtags in my dataset turned out to be:

[(u'bigdata', 114),
 (u'openstack', 92),
 (u'gophercon', 71),
 (u'machinelearning', 68),
 (u'sdn', 66),
 (u'datascience', 58),
 (u'docker', 56),
 (u'dtm', 46),
 (u'audisport', 44),
 (u'dtmzandvoort', 42),
 (u'hpc', 40),
 (u'welcomechallenges', 38),
 (u'devops', 37),
 (u'analytics', 36),
 (u'awssummit', 36),
 (u'infosec', 33),
 (u'security', 32),
 (u'openstacknow', 29),
 (u'renewturn', 29),
 (u'mobil1scgp', 28)]

In this list it’s easy to recognize several of the interests I mentioned earlier. Big Data is the top hashtag, which together with Machine Learning and Data Science make up a significant portion of the interesting tweets I see in my Twitter timeline. OpenStack is another top hashtag, which is a natural topic given my current job in infrastructure. SDN is a closely related topic and an integrated part of the OpenStack scene. Docker is taking over in the infrastructure world and the DevOps mindset that follows with it is also a popular topic.

What’s interesting to see is that conferences spark a lot of engagement on Twitter. Both GopherCon and AWS Summit turn up in the top 20 list since they took place during the week of tweets I analyzed. The same goes for motor sports (hashtags DTM Zandvoort, Audi Sport, Welcome Challenges), although in that case it’s the professional teams, in contrast to conference goers, that make sure their Twitter followers are constantly updated on standings and news.

As I’m sure you’ve noticed, the word cloud at the beginning of this blog post is generated from the list of hashtags in the dataset and their counts. To finish off the analysis I also computed the average number of hashtags per tweet that had at least one hashtag:

# Count the number of hashtags used
totalHashtags = (key, value): value) \
                         .reduce(lambda a, b: a + b)

# Compute average number of hashtags per tweet
print('A total of {} hashtags gives an average number of ' +
      'tags per tweet at {}.'.format(totalHashtags, 
      round(totalHashtags/float(filteredTweetsRDD.count()), 2)))

Here I do another map + reduce, but this time the map function extracts the count for each hashtag and the reduce function sums it all up. It is very easy to build such pipelines of transformations to get the desired results. The speed and flexibility of Spark lowers the entry point and invites the user to do more such computations. In closing, here are the numbers:

  • A total of 4030 hashtags analyzed
  • An average number of tags per tweet at 1.64

~ Arne ~

Using Amazon Machine Learning to Predict the Weather

Amazon recently launched their Machine Learning service, so I thought I’d take it for a spin. Machine Learning (ML) is all about predicting future data based on patterns in existing data. As an experiment I wanted to see if machine learning would be able to predict the weather of tomorrow based on weather observations. Weather systems travel large distances on a time scale of hours and days, so recent weather observations from around the country can be used to predict the future weather of one specific site. Meteorological institutes do this every day by running complex weather models on hundreds of nodes in large HPC clusters. I don’t expect machine learning to produce quite as good results as those models do, but thought it would be fun to see how close ML could get.

Weather Map
Weather forecast from, delivered by the Norwegian Meteorological Institute and the NRK

The Amazon Machine Learning service makes it easy to get started and reduces the time it takes to get actionable insights from data. The service comes with tutorials, developer guides, a very useful explanation of machine learning concepts and enough tips to guide anyone through their first steps in the world of machine learning. There is a sample dataset you can use to create your first prediction model but if you want, you can follow along my journey in this post with my dataset instead. The source code to generate it is on Github and all you need to generate CSV files with weather observations is a free API key from

Defining the use case and dataset

Before diving into coding and machine learning, it’s important to define the use case as clearly as possible. To test whether Machine Learning is a viable approach to weather forecasting is the overall goal. To test this, I choose to predict the temperature tomorrow at 12:00 UTC in Oslo, the capital of Norway. The dataset I’ve chosen is weather observations from five cities in Norway, scattered around the southern half of the country. The weather in Oslo usually comes from the west, so I include observations from cities like Stavanger and Bergen in the dataset.

The layout of the dataset is important. Amazon Machine Learning treats every line in the dataset (CSV file) as a separate record and processes them randomly. For each record, it tries to predict the target value (temperature in Oslo the next day) from the variables present in that record. This means that you can’t rely on any connections between records, for example that the temperature measured at 10 AM may be similar to the temperature at 11 AM.

To create a dataset with enough data in each record to be able to predict the target value, I append all weather observations with the same timestamp, regardless of location, to the same record. This means that for any given timestamp there will for instance be five temperatures, five wind measurements and so on. To be able to distinguish between different cities in the dataset I named each column (each variable) with the first letter of the city name, forming variable names like “o_tempm” when the original observational data had a variable “tempm” containing the temperature for Oslo.

Creating the training dataset

Machine Learning works by creating a model from a training dataset where the target value to predict is already known. Since I want to predict a numerical value, Amazon ML defaults to a linear regression model. That is, it tries to build a formula which can output the target value, using individual weights for each variable in a record that tells the model how that variable is related to the target value. Some variables get weight zero, meaning they are not related to the target value at all, and others get positive weights between 0 and 1. To be able to determine weights for variables, there must be a sufficient amount of training data.

To create a sufficiently large training dataset, I needed weather observations for some time, at least 14 days. Fortunately, has a JSON API that is really easy to use, and their history endpoint can provide weather observations for both the current date and dates back in time. I use that to collect observations for the last two weeks for all five cities. Since the free tier of their API restricts the use to 500 calls a day and maximum of 10 calls per minute, the script I made to generate the dataset has to wait some seconds between each API call. To limit the API usage and be able to rerun the script I cache weather observations on disk, because I don’t expect past weather observations to change.

A weather observation returned by their API has the following syntax:

{'conds': 'Light Rain Showers',
 'date': {'hour': '14',
 'mday': '29',
 'min': '00',
 'mon': '05',
 'pretty': '2:00 PM CEST on May 29, 2015',
 'tzname': 'Europe/Oslo',
 'year': '2015'},
 'dewpti': '35',
 'dewptm': '2',
 'fog': '0',
 'hail': '0',
 'heatindexi': '-9999',
 'heatindexm': '-9999',
 'hum': '36',
 'icon': 'rain',
 'metar': 'AAXX 29121 01384 11786 52108 10121 20015 51007 60001 78082 84260',
 'precipi': '',
 'precipm': '',
 'pressurei': '',
 'pressurem': '',
 'rain': '1',
 'snow': '0',
 'tempi': '54',
 'tempm': '12',
 'thunder': '0',
 'tornado': '0',
 'utcdate': {'hour': '12',
 'mday': '29',
 'min': '00',
 'mon': '05',
 'pretty': '12:00 PM GMT on May 29, 2015',
 'tzname': 'UTC',
 'year': '2015'},
 'visi': '37',
 'vism': '60',
 'wdird': '210',
 'wdire': 'SSW',
 'wgusti': '',
 'wgustm': '',
 'windchilli': '-999',
 'windchillm': '-999',
 'wspdi': '17.9',
 'wspdm': '28.8'}

As you can see, this is the observation for May 29th at 12:00 UTC and the temperature was 12 degrees Celsius (“tempm”, where m stands for the metric system). The API returns a list of observations for a given place and date. The list contains observation data for at least every hour, some places even three times per hour. For each date and time, I combine the observations from all five places into one long record. If there isn’t data from all five places for that timestamp, I skip it, to make sure that all machine learning records have a sufficient amount of data. Lastly, for all records belonging to the training set, I append the target value (the next day’s temperature) as the last field. That way, I get a file “dataset.csv” with known target values that can be used to train the model. Here is an example record with the last number (8) being the target to predict:


This way, I get a dataset where each timestamp is a separate record and all timestamps belonging to one date gets the next day’s temperature in Oslo at 12 UTC as the target value. In total 844 records for 14 days of observations.

In addition, the script outputs a file “testset.csv” with the last day of observations where the target value is unknown and should be predicted by the model.

Upload both CSV files to Amazon S3 before continuing, as Amazon Machine Learning is only able to use input data from S3 (or Redshift, but in that case Amazon exports it to S3 before using it). Be sure to select the “US Standard” region of S3, as the Machine Learning service is only available in their North Virginia location at the moment. To reduce costs it is important that the S3 datasets are in the same region as the Machine Learning service.

Create Amazon Machine Learning datasources

The datasource is an object used by the Machine Learning model to access the data in S3. Datasource objects contain a schema that tells the model what type of field each variable is (numeric, binary, categorical, text). This schema is autodetected when you create a datasource, but may need some review before continuing to make sure all field types were classified correctly.

To create a datasource, first log into AWS to get access to Amazon Machine Learning. Select Create New -> Datasource. You are then asked for the path to the dataset in S3 and to give it a name, for instance “Weather observations”:

Create datasource

The dataset is verified and the schema is auto-generated. The next step is to make any adjustments to the schema if needed. Remember to say Yes to “Does the first line in your CSV contain the column names?”. I found that most fields containing actual data were correctly categorized, but fields with little or no data were not. There are some observation fields, like for instance “tornado”, that are normally zero for all five places and all times in my dataset. That field is binary but often autodetected as numeric (probably not an issue, since the field has no relevant data). The field “precipm” is numeric, but as it’s often blank (no precipitation detected) it can be mislabeled as categorical. Remember to go through all variables to check for misdetections like these, in my dataset there is 97 variables to check.

The third step is to define a target, which is the variable “target_o_tempm” in my dataset. When selected, Amazon Machine Learning informs you that “ML models trained from this datasource will use Numerical regression.” The last step in datasource creation is to define what field will be used as row identifier, in this case “datetime_utc”. The row identifier will be used to label output from the machine learning model, so it’s handy to use a value that’s unique to each record. The field selected as row identifier will be classified as Categorical. Review the settings and click Finish. It may take some minutes for the datasource to get status Completed, since Amazon does quite a bit of data analysis in the background. For each variable, the range of values is detected and scores like mean and median are computed.

At this point, I suggest you go ahead and create another datasource for the testset while you wait. The testset needs to have the same schema as the dataset used to train the model, which means that every field must have the same classification in both datasources. Therefore it’s smart to create the testset datasource right away when you still remember how each variable should be classified.

In my case I got 30 binary attributes (variables), 11 categorical attributes and 56 numeric attributes. The review page for datasource creation lists that information. Make sure the numbers match for the dataset and testset:

Create datasource - review page

Creating the Machine Learning model

So, now you’ve got two datasources and can initialize model training. Most of the hard work in creating a dataset and the necessary datasources is already done at this point, so it’s time to put all this data to work and initialize creation of the model that is going to generate predictions.

Select Create New -> ML Model on the Machine Learning Dashboard page. The first step is to select the datasource you created for the dataset (use the “I already created a datasource pointing to my S3 data” option). The second step is the model settings, where the defaults are just fine for our use. Amazon splits the training dataset into two parts (a 70-30 split) to be able to both train the model and evaluate its performance. Evaluation is done using the last 30% of the dataset. The last step is to review the settings, and again, the defaults are fine. The advanced settings include options like how many passes ML should do over the dataset (default 10) and how it should do regularization. More on that below, just click Finish now to create the model. This will again take some time, as Amazon performs a lot of computations behind the scenes to train the model and evaluate its performance.

Regularization is a technique used to avoid overfitting of the model to the training dataset. Machine learning models are prone to both underfitting and overfitting problems. Underfitting means the model has failed at capturing the relation between input variables and target variable, so it is poor at predicting the target value. Overfitting also gives poor predictions and is a state where the model follows the training dataset too closely. The model remembers the training data instead of capturing the generic relations between variables. If for instance the target value in the training data fluctuates back and forth, the model output also fluctuates in the same manner. The result is errors and noise in the model output. When used on real data where the target value is unknown, the model will not be able to predict the value in a consistent way. This image helps explain the issue in a beautiful way:

Machine learning - overfitting
Underfitting vs overfitting a machine learning model. Image source:

So to avoid overfitting, regularization is used. Regularization can be performed in multiple ways. Common techniques involve penalizing extreme variable weights and setting small weights to zero, to make the model less dependent on the training data and better at predicting unknown target values.

Exploring model performance

Remember that Amazon did an automatic split of the training data, using 70% of the records to train the model and the remaining 30% to evaluate the model. The Machine Learning service computes a couple of interesting performance metrics to inspect. Go to the Dashboard page and select the evaluation in the list of objects. The first metric shown is the RMSE, Root Mean Square Error, of the evaluation training data. The RMSE should be as low as possible, meaning the mean error is small and the predicted output is close to the actual target value. Amazon computes a baseline RMSE based from the model training data and compares the RMSE of the evaluation training data to that. In my testing I archieved an RMSE of about 2.5 in my first tests and near 2.0 after refining the dataset a bit. The biggest optimization I did was to change the value of invalid weather observations from the default value of -999 or -9999 to be empty. That way the range of values for each field got more close to the truth and did not include those very low numbers.

By selecting “Explore model performance”, you get access to an histogram showing the residuals of the model. Residuals are differences between predicted target and actual value. Here’s the plot for my model:

ML model performance histogram
Distribution of residuals

The histogram tells us that my model has a tendency to over-predict the temperature, and that a residual of 1 to 2 degrees Celcius is the most likely outcome. This is called a positive bias. To lower the bias it is possible to re-train the ML model with more data.

Before we use the model to predict temperatures, I’d like to show some of the interesting parts of the results of model training. Click the datasource for the training dataset on the Dashboard page to load the Data Report. Select Categorical under Attributes in the left menu. Sort the table by “Correlations to target” by clicking on that column. You’ll get a view that looks like this:

Datasource - Data Report Categorical

This table tells you how much weight each field has in determining the target value, so it is a good source of information on what the most important weather observation data are. Of the categorical attributes, the wind direction in Stavanger is the most important attribute for how the temperature is going to be in Oslo the next day. That makes sense since Stavanger is west of Oslo, so weather that hits Stavanger first is likely to arrive to Oslo later. Wind direction in Kristiansand is also important and in third place on this ranking we find the conditions in Trondheim. The most commonly observed values is shown along with a small view of the distribution for each variable. Click Numeric to see a similar ranking for those variables, revealing that dew point temperature for Trondheim and atmospheric pressure in Stavanger are the two most important numeric variables. For numeric variables, the range and mean of observed values is shown. It’s interesting to see that none of the mentioned important variables are weather observations from Oslo.

Use the model to predict the weather

The last step is to actually use the Amazon Machine Learning service to predict tomorrow’s temperature, by feeding the testset to the model. The testset contains one day of observations and an empty target value field.

Go back to the Dashboard and select the model in the object list. Click “Generate batch predictions” at the bottom of the model info page. The first step is to locate the testset datasource, which you’ve already created. When you click Verify Amazon checks that the schema of the testset matches the training dataset, which it should given that all variables have the same classification in both datasources. It is possible to create the testset datasource in this step instead of choosing an existing datasource. However, when I tried that, the schema of the testset was auto-generated with no option for customizing field classifications, so therefore it failed verification since the schema didn’t match the training dataset. That’s why I recommended you create a datasource for the testset too, not just for the training dataset.

After selecting the testset datasource, the last step in the wizard before review is choice of output S3 location. Amazon Machine Learning will create a subfolder in the location you supply. Review and Finish to launch the prediction process. Just like some of the previous steps, this may take some minutes to finish.

The results from the prediction process is saved to S3 as a gzip file. To see the results you need to locate the file in the S3 Console, download it and unpack it. The unpacked file is a CSV file, but lacks the “.csv” suffix, so you might need to add that to get the OS to recognize it properly. Results look like this:


The “score” field is the predicted value, in this case the predicted temperature in Celsius. The tag reveals what observations resulted in the prediction, so for instance the observations from the five places for timestamp 02:00 resulted in a predicted temperature in Oslo the next day at 12:00 UTC of 12.7 degrees Celsius.

As you might have noticed, we’ve now got a bunch of predictions for the same temperature, not just one prediction. The strength in that is that we can inspect the distribution of predicted values and even how the predicted value changes according to observation timestamp (for example to check if weather observations from daytime give better predictions than those from the night).

Distribution of predicted temperature values

Predicted Temperature Distribution

Even though the individual predictions differ, there seems to be strong indications that the value is between 13.0 and 14.5 degrees Celsius. Computing the mean over all predicted temperatures gives 13.6 degrees Celsius.

Development of predicted temperature values

Predicted Temperature Development

This plot shows the development in the predicted value as the observation time progresses. There does not seem to be any significant trend in how the different observation times perform.

The actual temperature – and some closing remarks

At this point I’m sure you wonder what the real temperature value ended up being. The value I tried to predict using Amazon Machine Learning turned out to be:

12:00 UTC on May 31, 2015:  12 degrees Celsius

Taking into account the positive bias of 1 to 2 degrees and a prediction mean value of 13.6 degrees Celsius, I am very satisfied with these results.

To improve the model performance further I could try to reduce the bias. To do that I’d have to re-train the model with more training data, since two weeks of data isn’t much. To get a model which could be used all year around, I’d have to include training data from a relevant subset of days throughout the year (cold days, snowy days, heavy rain, hot summer days and so on). Another possible optimization is to include more places in the dataset. The dataset lacks places to the east of Oslo, which is an obvious flaw. In addition to more data, I could explore if the dataset might need to be organized differently. It is for instance possible to append all observations from an entire day into one record, instead of creating a separate record for each observation timestamp. That would give the model more variables to use for prediction but then only one record to predict the value from.

It has been very interesting to get started with Amazon Machine Learning and test it with a real-life dataset. The power of the tool combined with the ease of use that Amazon has built into the service makes it a great offering. I’m sure I’ll use the service for new experiments in the future!

~ Arne ~

Making the Network Ready for 40GbE to the Server

In today’s server networks, 10GbE has become commonplace and has taken over for multiple 1GbE links to each server. However, for some workloads, 10GbE might not be enough. One such case is OpenStack network nodes which potentially handle a big part of the traffic in and out of an OpenStack cloud (depending on how it’s configured). When faced with such use cases, how should the network prepare for delivering 40GbE* to servers?

The issue here is the number of available 40GbE ports on data center switches and the cost of cabling. The most cost-effective cabling for both 10GbE and 40GbE is the Direct Attached Cable (DAC) type based on Twinaxial cabling. Such cables are based on copper and have transceivers directly connected to each end of the cable. For 10GbE the SFP+ standard is commonplace in server NICs and switches. 40GbE uses the slightly larger QSFP transceivers, which internally are made up of four 10Gbit/s lanes (an important feature which we’ll come back to). DAC cables exist in lengths up 10 metres (33 feet), but the price increases substantially when the cables get longer than 3 to 5 metres. When longer runs of 10GbE or 40GbE than 10 metres are needed, fiber cabling and separate transceivers are the only option. The cost of each transceiver is usually several times that of one DAC cable. Constraints like that are important to take into account when designing a data center network.

In an end-of-row design with chassis-based switches it’s relatively easy to provide the needed 40GbE ports by adding line cards with the right port type, but the cabling cost will be an issue. Servers in adjacent racks may use DAC cabling, but as the distance to each server NIC increases so does the cost of each DAC cable. In addition, fiber connections have to be used for everything longer than 10 metres, which is further adding to the cabling costs.

On the contrary, with a top-of-rack switch design where all the cabling is inside each rack, the cable lengths are limited to 3 metres at the most. That makes DAC cables a perfect fit. However, the issue then becomes how to make 40GbE ports available to servers from top-of-rack switches. Typical 10GbE datacenter switches are equipped with some 40GbE ports for uplinks to the rest of the network (commonly a spine layer in bigger designs), so it may be tempting to use some of those for the few servers requiring 40GbE in the beginning. Doing so is not advisable, since it decreases the available uplink bandwidth and limits the scalability of the design. With a leaf-spine design with four spine switches the requirement is four uplinks from each leaf. When bandwidth usage increases, you may want to increase that by either increasing the number of spines to eight or double the amount of bandwidth from each leaf to each spine. Both of these scalability options require eight 40GbE ports for spine uplinks, so if you’ve used some of those 40GbE ports to connect servers, you don’t have the necessary ports to scale.

If 10GbE switches are not an option in a mixed 10/40GbE environment where only a handful of servers need 40GbE, then what about pure 40GbE top-of-rack switches? Several vendors supply 1 RU 40GbE data center switches with 24 to 32 QSFP ports. With a majority of servers still using 10GbE, this might not seem like a viable option. Enter the DAC Breakout Cable. Using a QSFP-to-SFP+ breakout cable, each 40GbE port can be split into four individual 10GbE ports and used to connect several servers to the switch. What makes this possible is the mentioned 4 x 10Gbit/s lanes that 40GbE QSFP is made up of internally.

Using breakout cables for all server cabling in a rack enables a smooth migration path to 40GbE, since the servers that need 40GbE can get their own QSFP port on the switch. In a leaf-spine design with four spines, even a 24-port 40GbE top-of-rack switch is a viable leaf option. Using 4 ports for spine uplinks there is 20 ports available for servers, which equals a total of 80 10GbE connections using breakout cables. That’s enough for 40 servers with dual 10GbE. When a few servers migrate to 40GbE NICs, you re-cable them using QSFP DAC cables to the top-of-rack switch. If you need to scale out to eight spines, you take four more 40GbE ports for spine uplinks and adjust down the number of possible server 10GbE links accordingly. This design provides flexibility both in terms of spine uplink count and gradual 10-to-40GbE migration for servers. An then, in the near future when a majority of servers use 40GbE, you can add another leaf switch to increase the available port count (or replace the one in the rack with a model with higher port count).

~ Arne ~

* I’m aware of the 25/50 GbE initiative and it will be very interesting to see which of the 40GbE and 25 GbE technologies that will prevail in the future. 25 GbE might pan out as the most cost-efficient alternative. However, the 40GbE technology is already used in today’s networks as the preferred spine uplink technology, which makes it relatively easy to create a migration path to 40GbE for servers too.