Stream API responses directly to disk with Python

Python is the de-facto language for Data Science work. It’s very convenient to rapidly prototype a solution in Python and see if it works. However when faced with setting up the same solution in production, there are new space and time constraints to take into account. You’ll likely find that memory usage is the number one resource constraint you need to pay attention to. So let me share a useful trick to reduce memory usage in the early stages of a Python job.

To get your hands on training data, a very common step is to call an API and get data back in JSON format. For example for fetching tweets from Twitter. The simplest approach when using Python is to send an API call using Requests, store the response in a variable, decode the JSON and save it to disk for later processing.

This works well for small datasets, but when you try this on bigger JSON-based datasets, it results in very high memory usage. The problem is that you buffer data in memory and decode it, before saving to disk. Luckily, there is a better way.

Streaming API responses directly to disk

The Requests library has support for streaming. This enables you to iterate over the results you get as they arrive, instead of waiting for the entire response and buffering it in memory. Here’s a code snippet showing how this can be done:

with, data=ojb, headers=headers, stream=True) as response:
  with'out.gz', mode='wt', encoding='utf-8') as f:
    for chunk in response.iter_content(10240, decode_unicode=True):

With this code pattern the JSON is saved directly to disk in a compressed format as soon as it arrives over the network. The iterator ensures that you process chunks of 10240 bytes at a time, meaning that this is the maximum number of bytes your Python job has in memory at any time. The number of bytes is something you can tune and experiment with to figure out what works best in each case.

The impact of streaming response directly to disk is potentially huge. I’ve seen reductions in memory usage from tens of GB to almost nothing for jobs that migrated to this optimized approach.

One caveat of this approach is that it becomes a bit harder to extract parts of the JSON response if you need to do so. One example is APIs that use pagination, where the start and stop markers are part of the JSON response. A way to handle this is to resort to a simple text search of each chunk before the write call. This can work if what you’re trying to find fits nicely into one chunk. For instance if the very last chunk contains the necessary pagination values.

I believe this optimisation is important to use whenever possible, even if JSON parsing becomes a bit harder. It’s a good way to control the memory usage of a Python job that is fetching big datasets from an API. Using this approach the job should be able to stay within the memory constraints in production while fetching data.

~ Arne ~


How I Became a Bitcoin Trader


Running a trading bot for fun and profit

Bitcoin and cryptocurrencies has been a wild ride this fall. After observing the phenomenon from the sidelines for quite some time, I did as many others and bought some BTC with the intention to hold for a long time. Seeing the exchange rate go up every week was fun! However, an idea started to form. The Bitcoin market has very high volatility and is easily outpacing the stock markets by a lot. It must be possible to make a decent profit just by buying low and selling high. This should be a perfect task for a bot.

Enter CryptoTrader

After first considering to build a bot using Bitcoin exchange APIs directly, a friend mentioned there are purpose-built platforms and frameworks for this already. is a big platform for trading bots, offering both the tools to build your own bots and a market for renting bots others have written. Best of all, the site takes care of running the bots 24/7 and all you need to get started is an API key from your favorite exchange.

Getting started

I use Coinbase to buy and hold, so using their GDAX exchange was the quickest way to get started. You get trading accounts at GDAX by completing a short signup process, which contains an additional ID verification complementing the one you had to go through when signing up for Coinbase. Transferring funds between Coinbase and GDAX accounts is instant and free. The main downside to GDAX is that they have a limited number of currency pairs compared to other exchanges. The pairs are based on Bitcoin, Ethereum and Litecoin only. That’s not a big problem when getting started in this field, but as you get more experienced you’ll want to explore currency pairs offered by other exchanges too. CryptoTrader supports a long list of exchanges and currency pairs.

When considering to use CryptoTrader it is highly recommended to backtest bots on historical data. Although writing my own bot is a long-term goal, I chose to browse the bot marketplace for a bot to rent to get started. CryptoTrader has a slightly strange convention for displaying bot popularity. The most popular bot has a “100%” beside it and other bots have lower percentages, instead of using a simple count of the number of running instances. For simplicity I chose to focus my initial testing on the most popular bot, Blade Runner.

Blade Runner is built on a compelling “always sell higher” logic. The bot automatically detects the market conditions as one of rising, falling or sideways market. Based on current price it calculates the price levels where it wants to buy and sell, and executes orders based on that. Whenever it buys, it immediately registers a sell order at the exchange at a price point which is about 3% higher than the buy price (a “limit sell” order, i.e. an order with a requested price for the sale to happen). The sell order remains in the order book of the exchange until that price point is reached and the order is executed by the exchange. Whenever one of the limit sell orders are traded at the exchange, the bot has made a profit. This is the “always sell higher” logic, ensuring a profit at every sell.

There is one catch with this logic: The sell price has to be reached for the profit to be realized. If the bot buys and the market starts falling for a long time, the sell orders will just stay in the exchange order book. The bot does not seem to adjust the sell orders to the new market conditions, so you simply have to wait for the market to rise again to the predefined sell prices even though that might take days (or weeks). This means you need a medium- to long-term horizon for your trading bot and cannot expect phenomenal returns in hours. It is best to view the trading bot as a way of consistently increasing the amount of cryptocurrency you hold.

Testing, testing, testing

All the getting started guides for trading bots stress that it’s important to backtest. This means running a lot of simulations on historical data to get to know the behavior of the bot, tweak the settings and evaluate if it’s able to give consistent profits. So I started backtesting Blade Runner on the BTC-USD currency pair. I ran multiple simulations of different time periods, and in at least two of them there was a significant profit. Eager to get started for real, I purchased the entry plan of CryptoTrader and rented the bot for three months. Buying access and renting the bot was a smooth process where the CryptoTrader platform generated invoices in BTC and I just had to send the BTC from my wallet to the address in the invoice.

With access, I continued to backtest the bot on BTC-USD. And then I realized, to my dismay, that I was unable to find a combination of bot settings that made the bot give positive returns and at the same time beat the “B&H” strategy. B&H is Buy & Hold, which all backtests and live bot instances are automatically compared to. The B&H percentage is the percentage of profit or loss you would get by just buying at the start of the time period and holding the assets until the end of the time period. When backtesting on BTC-USD on the long time period of rising BTC rates this fall, the B&H percentages were usually in the double digit profits. The bot was unable to match that. However, for certain short time periods of corrections in the BTC rates against USD the bot was giving net profits while B&H was negative.

Based on these findings I concluded that the Blade Runner bot performs best in a sideways market. Reviewing the various currency pairs on GDAX, I found that ETH-BTC is usually a sideways market and decided to go with that pair. By being crypto-crypto this pair has additional advantages as I see it. First, this means my assets are in cryptocurrency all the time. This conforms nicely with my initial goal of buying and holding cryptocurrency. Second, I would not consider following a B&H strategy for the ETH-BTC pair (buying ETH at the start and calculating profits based on the ETH-BTC rate). So I can safely ignore any B&H percentages the bot reports as irrelevant for my case. Instead, the goal is to get net profits so that the bot contributes to an increase in my cryptocurrency assets. Any increase is a win, although the first goal is to earn back what I paid for access to CryptoTrader and the Blade Runner bot.

Running the bot

Blade Runner bot running
Screenshot of live bot instance running on the CryptoTrader platform

The bot has a set of parameters for tuning the behavior, along with six main modes of operation: Optimal, Automatic, Rising, Falling, Sideways and Very Low Capital. I started out with a small amount of BTC for trading and chose the Very Low Capital mode. In all other modes the bot uses a varying percentage of the total funds when placing each order. In the Very Low Capital setting this percentage is always 50%. The main reason for this is to ensure that the amount in each order is enough to avoid being rejected as too small by the exchange. This setting did however result in very few trades per day and low profits. So I quickly switched to Optimal mode instead.

Optimal mode automatically tunes the various parameters based on the market conditions. One example is that in a rising market the bot increases the amount it buys compared to a sideways or falling market. This is done to buy bigger amounts as early as possible to capture more of the rising trend.

There is one important setting to tweak in Optimal mode: “Activation of the above Percent Price Fall or Rise”. By default it is set to “Only for Price Fall”, but I got much better results by choosing “For Both Cases”. The default setting only buys when the price is falling below a calculated threshold of about 3% below the price of the last buy or sell order. To capture rising trends it’s important to buy when the price is going up too (betting on the price to increase further).

Adjusting the funds

When I found the settings that worked best and produced a steady profit, I increased the amount of BTC the bot could trade with. Before depositing funds to the exchange account the bot is using, it is important to activate the “Reset button” in the bot settings (you can keep the bot running). Deposit funds, then wait for a minute or two before you deactivate the “Reset button” in bot settings. This way your deposit won’t interfere with how the bot calculates profit.

There are other ways of manual interference too, for example a “Pause button” to stop the bot from making any new buy orders for a period of time. You can also press Buy or Sell buttons to instruct the bot to place orders. But that will influence the bot profits as the price point is probably not optimal.

It’s also worth mentioning that you can stop the bot and start it again later. When the bot stops all open sell orders are cancelled, leaving the funds in the trading account in the state it was when the bot stopped (some in ETH and some in BTC, in this case). Before starting the bot again you need to make sure all funds are in the “base currency”, meaning the last one in the trading pair (for ETH-BTC that would be BTC).

One situation you need to be on the lookout for is the bot stopping due to the total amount of funds exceeding the CryptoTrader plan limit. The various plans you can choose from, include limits in the amount of funds the bot can use for trading. The limits are measured in USD. I started with the Basic plan, and when the funds in the trading account exceeded the limit, the bot was stopped. There was no email alert about this, so I discovered it when checking in on the bot the next day. I paid for an upgrade to the Regular plan and started up the bot again. This means you cannot just start the bot and forget about it, as it might stop at any time without notice. All it takes is for one of the cryptocurrencies in your trading account to quickly rise in value compared to USD.

Preliminary results

The bot has managed to generate enough profits in three weeks to cover the amount paid for three months of access to CryptoTrader and the bot itself. This was my initial goal. I’ll be running the bot for the remaining time period it has been rented for to generate net profits. But I’m sure it is possible to do better. There are lots of other promising bots in the CryptoTrader strategy marketplace, so I’ll spend time backtesting other bots and other cryptocurrency trade pairs to find even better combinations. I might even start writing my own bot.

This has been a very exciting experiment and I’ve learned a lot about cryptocurrencies, trading, signals and bots. Generating a net profit is also nice, even though it’s not very big. The biggest advantage is all the new knowledge.

Remember that the information in this blog post should not be relied upon as advice or construed as providing recommendations of any kind. Any trading activity is inherently risky and it’s important to keep in mind that all money used for trading could get lost.

Did you find this useful? Consider donating some BTC to 13sTihkkJtP8UjuWFgFpEiFuiheCb9fQNE if you want.

~ Arne ~

Using Facebook Prophet Forecasting Library to Predict the Weather

Facebook recently released a forecasting library for Python and R, called Prophet. It’s designed for forecasting future values of time series of any kind, and is remarkably easy to get started with. One of my favorite data sets are temperature time series, so here I’ll explore how good Prophet is at predicting future temperatures based on past weather observations.

The dataset consists of temperature readings every 10 minutes from my Netatmo Weather Station, stored in InfluxDB over at I extracted the mean temperature per hour for the last year, resulting in approx. 9000 hourly temperature observations.The timestamps are in ISO8601 format: “2016-02-11T08:00:00Z”. The dataset also has gaps due to several shorter periods of malfunctioning data collection and system maintenance. Prophet claims to handle such gaps without issues, so let’s see if it does.

Update: If you want to try this out yourself, here’s the dataset!

Installing Prophet on Ubuntu Linux

Installing Prophet for Python is done using pip. Since Prophet depends on the Stan statistical library and is optimized for speed using C, it needs Cython and PyStan. In addition it depends on NumPy and Pandas, so make sure you have those installed too.

$ sudo -H pip install cython pystan numpy pandas
$ sudo -H pip install fbprophet

These can also be installed without sudo if you don’t have administrative privileges on the system.

Producing the first temperature predictions

To produce the initial predictions, we simply run through the following steps without changing default parameters.

Import packages and prepare input data

import pandas as pd
import numpy as np
from fbprophet import Prophet
df = pd.read_csv('outdoor-temperature-hourly.csv')
df = df[df.temperature != 'DIFF']

Preparing the dataset consists of loading it as a DataFrame using Pandas. The input dataset is a merge of two time series and some of the values are invalid. They are filtered out by excluding all rows with the value DIFF.

df['ds'] = df['time']
df['y'] = df['temperature']
df = df.drop(['name', 'time', 'temperature', 'seriesA', 'seriesB'], axis=1)

Prophet requires one column named “ds” with dates or datetimes, and one column named “y” with numeric values. All other columns are ignored. The two required columns are created by duplicating two existing columns “time” and “temperature”, before all irrelevant columns are dropped from the dataframe. The preview shows the resulting dataframe which is used as input to Prophet. The values are degrees Celcius and timestamps are UTC. The input looks like this:

2017-02-26 Prophet - Input dataframe start.png

Fit model and use it to make predictions


Fitting a model to the input data is as simple as “”. To make predictions, you first need a DataFrame with datestamps to predict for. Prophet has a useful make_future_dataframe() method to do just that. By default it generates one row per day, but by setting the frequency parameter to “H” we get hours instead. In this example I generated a dataframe with 50 days of hourly timestamps, starting right after the most recent timestamp in the input dataset.

To make predictions based on the model, all you need to do is call “model.predict(future)”. Using the model and dataframe of future datetimes, Prophet predicts values for each future datetime.

Initial results

Prophet includes built-in plotting of the results using Matplotlib. Here’s the prediction for the hourly temperatures two months into the future, plotted as a continuation of the existing input data:


Pretty impressive output for so little work required! Prophet is fast too – these results were computed in only seconds. The temperature trends are clearly visible with higher hourly temperatures during the summer months than in the winter. The forecast for the next two months says that the temperatures are about to rise from around zero degrees Celcius currently to around 5 degrees in the start of April. However, the uncertainty intervals are quite large in this forecast (around 10 degrees). The somewhat large variations between day and night temperatures might cause this.

Before trying to reduce the uncertainty interval, let’s look at another output of the prediction model: The components of the model.


These subplots show several interesting patterns. The first subplot, “Trend” shows a slight temperature rise on the large scale, from year to year. As the input data only covers one year, this does not qualify as a generic trend. However, it does say that the recorded 2017-temperatures are slightly warmer than the 2016-temperatures.

The weekly trends are a bit strange. According to these results, the start of the week is colder than the rest of the week. As the weather doesn’t care about what day it is, this remains a curiosity.

The last plot shows the seasonal distribution of temperatures during the year of input data. Since I only had one year of input data, this plot follows the data as seen in the main plot pretty closely. Like in the trend subplot, the seasonal distributions would benefit from a lot more input data.

Tuning the model to only cover 2017

The initial results used the entire dataset, but how will Prophet behave if it doesn’t have input data from the same season last year to base the predictions on? Let’s investigate the results of using a smaller time period.

recent = df[df.ds > '2017-01-01']

Here I’m making a new input dataframe by selecting only the rows that have timestamps in 2017. Let’s make a new model and some new predictions too:

model_recent = Prophet()
future_recent = model_recent.make_future_dataframe(periods=24*10, freq='H')
forecast_recent = model_recent.predict(future_recent)

I set the period to make predictions for to 10 days into the future. Since the input data doesn’t cover nearly as much as in the initial results, it makes sense to reduce the number of days to predict for. The results are shown in the following graph.

2017-02-26 Prophet - Hourly Temperatures for 2017 only.png

Since this plot covers a smaller time range, we can see more clearly the daily variations between day and night. Interestingly, Prophet is making the same type of prediction for the coming days as in the previous model; the temperature is going to rise. In this plot we can see why too. At around 2017-02-14 the temperature begun to rise and the last days of input data show temperatures well into the positive Celcius range. Prophet has successfully picked up this trend change and is using that to predict the future.

The Yearly trend subplot confirms that Prophet picked up on the trend change:

2017-02-26 Prophet - Hourly 2017 Temperatures - Yearly component.png

Tuning the model to reduce uncertainty intervals

In the initial results above, the uncertainty intervals were as big as 10 degrees Celcius. This is a bit too much to be useful in a weather forecasting system. To reduce the uncertainty, let’s make the input data a bit more uniform. To avoid having Prophet deal with day/night temperature differences, I filtered out all temperature measurements except for the one at 12:00 UTC each day. The theory is that these values, one per day, will be more uniform and lead to less variance in the model output.

Filtering the measurements could certainly be done using Pandas, but I chose to use the good old shell tools:

$ head -1 outdoor-temperature-hourly.csv > outdoor-temperature-12UTC.csv
$ fgrep "T12" outdoor-temperature-hourly.csv >> outdoor-temperature-12UTC.csv

Here I generate a new CSV file with only the temperature values for timestamps that contain “T12”. In the ISO8601 time format “T” is the date-time separator, so this selects all measurements having 12 as the hour component. I first save line 1 from the original file as line 1 in the new file to not lose column headings and having to tell Pandas about them manually.

Back in my Jupyter Notebook, the new input data is loaded into a dataframe like this:

df2 = pd.read_csv('outdoor-temperature-12UTC.csv', na_values = 'DIFF', usecols = ['time', 'temperature'])
df2['ds'] = df2['time']
df2['y'] = df2['temperature']
df2 = df2.drop('time', axis=1)
df2 = df2.drop('temperature', axis=1)

You might notice slight variations compared to the first example above. Here I added arguments to Pandas read_csv() to ignore lines with value DIFF and ignore all columns except the two with datetimes and values. Either way works, but I think this variant is a bit cleaner. To make sure we only got one temperature value per day, let’s have a look at the dataframe:

2017-02-26 Prophet - Daily Temperatures.png

Looks good. By now you know the drill for making Prophet generate predictions:

model2 = Prophet()
future2 = model2.make_future_dataframe(periods=30)
forecast2 = model2.predict(future2)

The results


This is better. The uncertanty interval is down by approx. 30% to about 7 degrees. That means filtering the input data to one value per day increased the usefulness of the predictions, even though Prophet had less data to base predictions on.

The components of the model look similar to the first example:


Note that the first subplot, “Trend”, has higher values on the Y-axis than in the first example. That might be because I filtered out a lot of colder temperatures. 12 UTC is not the median temperature value during a typical day, it’s closer to the maximum (which is in the afternoon / early evening).

In addition to the subplots, we can inspect the predictions and components in depth by looking at the dataframe with predictions. The most interesting part is the timestamps in the future, so filter for them first:

import datetime
forecast2[forecast2.ds >]

This returns a dataframe with details about all predictions for all timestamps from now on. Here’s a screenshot of some of the contents (certain columns removed for brewity):

2017-02-26 Prophet - Inspecting Prediction Component Values.png

For each timestamp, you get the predicted value (“yhat”) in the rightmost column and all the components making up the prediction. For example we can see that the yearly components are well below zero degrees Celcius. This makes sense in the winter season. In addition, “yhat_lower” and “yhat_upper” show the exact range of the uncertainty interval.

Pro tip: Plot components with uncertainty intervals too

In the previous examples all the component subplots lack uncertainty intervals. Prophet can generate such intervals too, at the cost of longer computation time. This is done by adding a parameter to the model creation:

model2 = Prophet(mcmc_samples=500)

This gives you full Bayesian sampling and takes longer to complete, however it’s still just in the range of some minutes.

With this activated, we get component subplots with uncertainty intervals:

2017-02-26 Prophet - Uncertainty intervals for components.png

These plots reuse the filtered input data from the previous example, with one temperature value per day. We can now reconsider the strange weekly trend in the middle subplot, where the start of the week seems to be colder than the rest. The uncertainty is quite big in that plot. In addition the zero-degrees line is well within the uncertainty interval, for all days of the week. This means it’s not possible to say that there is any difference between the various weekdays.

Closing remarks

Using Prophet to generate predictions turned out to be very easy, and there are several ways to adjust the predictions and inspect the results. While there are other libraries that have more functionality and flexibility, Prophet hits the sweet spot of predictive power versus ease of use. No more looking at weird plots of predicted values because you chose the wrong algorithm for your use case. And no more spending hours to fix input data that has gaps or timestamps in the wrong format.

The Prophet forecasting library seems very well designed. It’s now my new favorite tool for ad-hoc trend analysis and forecasting!

~ Arne ~

Visualize Your Netatmo Data with Grafana

Netatmo Weather Station is the ultimate personal weather station and gives you access to your measurements both through apps and a slick web interface. In addition to that, Netatmo has Developer APIs for access to raw data. Owning a Netatmo station, I just had to try to set up a Grafana dashboard with my weather data. Grafana is both flexible and has great-looking dashboards, so it should be a good fit for weather measurements. Keep on reading for tips on how to do this yourself too!

If you just want to see your Netatmo data in a beautiful dashboard, go to my free web service to register for an account where the dashboard is already set up for you. And if you don’t have a Netatmo Weather Station yet but want to try anyway, create a Netatmo account and go to the publicly available Netatmo HQ station in Paris to associate the station with your account.

Netatmo Grafana dashboard big

To get started with Grafana on your own server or laptop, the first thing to do is to get access to measurement data from your Netatmo station. Register for developer access at Netatmo to get that. After creating an account, continue to “Create an App” to get the necessary client ID and client secret.

Authentication and tokens

Using the client credentials, there are basically two modes of authentication, an easy one and a more complicated one. The easy one is called Client Credentials in the Netatmo docs and gives you an OAuth2 access token in just one step. Just send a request to the Netatmo API with your client credentials and your Netatmo username+password to get a token. The access token is then used in API calls to get measurement data for your station. The more complicated method is called Authorization Code and makes it possible for your application (“client”) to request access to other Netatmo weather stations than your own by redirecting the owner of that station to an OAuth2 authorization webpage.

Using Python, the following code issues a POST request for an access token the easy way. Before running it, replace the NETATMO_* constants with your values for client ID, client secret, username and password.

 data = dict(grant_type='password', client_id=NETATMO_CLIENT_ID,
        client_secret=NETATMO_CLIENT_SECRET, username=NETATMO_USERNAME,
        password=NETATMO_PASSWORD, scope='read_station')

 resp ='', data=data)
 if resp.status_code == 200:
     token = resp.json()
     token['expiry'] = int(time.time()) + token['expires_in']

Now you have a “token” variable as a dictionary with several fields.

The returned token data comes with an expires_in field that says how many seconds the token is valid. In the last line above I transform that into an expiry field containing the unixtime of expiry. That’s necessary to be able to check periodically if the token needs to be refreshed. Here is an example of a token dictionary including the additional expiry field:

{u'access_token': u'abcdefgh|1234567890',
 u'expire_in': 10800,
 u'expires_in': 10800,
 u'expiry': 1468168541,
 u'refresh_token': u'ababababab|2727272727',
 u'scope': [u'read_station']}

When the expiry time gets close, the refresh_token is used to renew the access_token by issuing another POST request:

data = dict(grant_type='refresh_token', refresh_token=token['refresh_token'], client_id=NETATMO_CLIENT_ID, client_secret=NETATMO_CLIENT_SECRET)
resp ='', data=data)
if resp.status_code == 200:
    token = resp.json()
    token['expiry'] = int(time.time()) + token['expires_in']

Measurement JSON data format

With a valid access_token we can fetch the interesting measurements from Netatmo. The APIs contain several methods for different Netatmo products, but for the Weather Station only /api/getstationdata is needed. Issue a GET request to see all the available measurements:

resp = requests.get('' + token['access_token'])
if resp.status_code == 200:
    data = resp.json()

The data structure returned has a lot of fields and varies by the number of extra modules attached to your Netatmo station (and even more if you’ve connected more than one station to your Netatmo account, like the public Netatmo HQ station in Paris). Here’s an excerpt of data returned in JSON format:

    [{u'_id': u'70:ee:aa:bb:cc:dd',
      u'co2_calibrating': False,
        {u'AbsolutePressure': 985,
        u'CO2': 431,
        u'Humidity': 46,
        u'Noise': 37,
        u'Pressure': 1001.9,
        u'Temperature': 26.3,
        u'date_max_temp': 1468101837,
        u'date_min_temp': 1468125907,
        u'max_temp': 26.7,
        u'min_temp': 24.8,
        u'pressure_trend': u'stable',
        u'temp_trend': u'stable',
        u'time_utc': 1468157806},
        [{u'_id': u'02:00:aa:bb:cc:dd',
            {u'Humidity': 52,
            u'Temperature': 22.8,
            u'date_max_temp': 1468127398,
            u'date_min_temp': 1468115964,
            u'max_temp': 26,
            u'min_temp': 9.9,
            u'temp_trend': u'down',
            u'time_utc': 1468157799},

The dashboard_data section has the actual readings, while data_type informs us of the measurement types that this station reports. Values are reported in the unit the user selected on setup, meaning they could be Fahrenheit instead of Celcius for instance. A separate user part of the returned JSON has details about which units are used.

In addition to the data from the indoor Weather Station, stations also have a modules parameter which holds measurements from all connected modules (outdoor module, rain gauge, wind gauge and so on). As seen above, for each module the JSON fields are the same as for the station, with the measurements in dashboard_data and reported measurements in data_type. This greatly simplifies parsing of the JSON response, as you can use the same code for parsing the devices list as for each entry in the modules list.

Storing data in InfluxDB

InfluxDB is a time series database with high performance, good compression and an easy-to-use write API and query language. After installing and starting it up with default config options, it’s ready to use as a data store for time-series data like weather measurements. The write API is available through HTTP. To write to InfluxDB, issue POST requests with the actual data as newline-delimited strings in the body of the request. InfluxDB documentation refers to this as the line protocol. An example write request can look like this:

payload = """
Humidity,station_name=MyStation,module_name=Outdoors value=52 1468157799
Temperature,station_name=MyStation,module_name=Outdoors value=22.8 1468157799
Rain,station_name=MyStation,module_name=Rain_Gauge value=0 1468157799

resp ='http://localhost:8086/write?precision=s&db=netatmo', data=payload)

This will save three measurements into time series named Humidity, Temperature and Rain in database netatmo. The value field is the actual measurement and the timestamp is from the time_utc field alongside the measurements. It’s trivial to convert the returned JSON into the line format that InfluxDB expects.

The station_name and module_name are custom tags attached to the time series to make it possible to distinguish between different stations and modules. The tags are available for filtering in Grafana using WHERE statements. Station names and module names defined when setting up the Netatmo Weather Station are available in the returned JSON from the Netatmo API.

Setting up a Grafana dashboard

After downloading and installing Grafana, go to the Datasource part of the web UI and create a new data source with the following settings:

Grafana - Data source setup

The User and Password under InfluxDB Details are root/root, but are not really used unless InfluxDB authentication was configured with non-default settings before starting up the database.

With a data source in place, the next step is to create a dashboard. There are many ways to visualize weather data, but at least add graphs for each time series you’ve stored in InfluxDB. That way you get a feel for how the metric changes over time. For some metrics the trends are most interesting, for other metrics only the current value is necessary to display. If everything works as expected, you should get suggestions when you set up the metric queries in Grafana, like this:

Create graph - Suggestions

Under the WHERE section you can filter on tags associated with each time series, like for example the module name to only get outdoor temperatures instead of both indoor and outdoor.

Awesome visualizations

With Grafana and InfluxDB set up to store data from your Netatmo Weather Station, you can create some pretty awesome visualizations. Like for instance this outdoor temperature graph over several months, with a moving_average() layered on top:

Outdoor Temperature with Moving Average.png

This makes it easy to see that the temperature moved in the right direction during these months, and that there were some periods with higher average temperature than others (the two first weeks of May in particular).

If you’re interested in more awesome visualizations of Netatmo Weather Station data, head over to my web service to get your own personalized weather dashboard!

~ Arne ~

Docker + OpenStack = True

OpenStack and DockerOpenStack has the potential to revolutionize how enterprises do virtualization, so lots of people are currently busy with setting up OpenStack private clouds. I’ve been part of such a project too. In this post, I’ll explain why I believe Docker makes a lot of sense to use for deploying the services that together make up an OpenStack cloud. Docker features flexibility, orchestration, clustering and more.

All about microservices

OpenStack is commonly viewed as a platform for virtualization. However, under the hood OpenStack is nothing more than a bunch of microservices which end-users can interact with, either directly or through the Horizon UI. Most of the microservices are REST APIs for controlling different aspects of the platform (block storage, authentication, telemetry and so on). API nodes for an OpenStack cloud end up running dozens of small processes, each with their own distinct purpose. In such a setting, Dockerizing all these microservices makes a lot of sense.

Flexibility and speed

Of several improvements Dockerizing brings, the most important is the flexibility. Running a Docker Swarm for OpenStack API services makes it possible to easily scale out by adding more swarm nodes and launching additional copies of the containers on them. Coupled with HAProxy, the new containers can join the backend pool for an API service in seconds and help alleviate high load. Sure, the same can be accomplished by adding physical API nodes, provision them with Chef/Puppet/Salt/Ansible and reconfigure HAProxy to include them in the backend pool for each service, but that takes considerably longer time than just launching more pre-built containers.

Versioning and ease of debugging

Since Docker images are versioned and pushed to a central registry, it’s trivial to ensure that all instances of a service run with identical configs, packages and libraries. Furthermore, even though a service like Nova typically consists of 4-5 different sub-services, all of them can share the same config and therefore use the same container image. The only difference is which command the container runs when started. Being able to easily check that all backend instances are identical (use the same version of the image) is important when debugging issues. Also, Docker Compose has built-in support for viewing logs from several containers sorted chronologically, no matter which physical node they run on. That also includes the option to follow logs in real-time from several containers at once.

Orchestration and clustering

Using Docker Compose to orchestrate microservices is important. Compose provides an interface to scale the number of containers for each service and supports constraints and affinities on where each container should run. For example if you run a clustered MySQL instance for the internal OpenStack databases, Compose can ensure that each database container runs on a different physical host than the others in the MySQL cluster. When creating container images for each service, an entrypoint shell script can be included and used to detect if there is an existing cluster to add itself to, or if this is the first instance of the service. Clustering the services that OpenStack APIs rely on (notably MySQL and RabbitMQ) becomes easier with this type of pattern.

Service Discovery

A solution for Service Discovery is a requirement when operating dozens of microservices that expect to be able to find each other and talk together. In the Docker ecosystem, Consul is a great option for service discovery. Coupled with the Registrator container deployed on each swarm node, all microservices that listen on a TCP/UDP port are automatically added as services in Consul. It’s easy to query Consul for the IP addresses of a particular service using either the HTTP or DNS interface. With the right Consul setup, each Dockerized service can reference other services by their Consul DNS name in config files and so on. This way, no server names or IP addresses need to be hard coded in the config of each service, which is a great plus.

There are more desireable effects of Dockerizing OpenStack microservices, but the most important ones in my opinion are flexibility, ease of debugging, orchestration and service discovery. If you wonder why Docker doesn’t just replace OpenStack entirely, I recommend reading this TechRepublic article. There Matt Asay points out that a common enterprise pattern is to utilize OpenStack for its strong multi-tenancy model. Applications can in that case be deployed with Docker on top of VMs provisioned using OpenStack, which I think will be a very useful way of utilizing OpenStack and Docker for big enterprises with a diverse set of applications, departments and users.

~ Arne ~

Spark cluster on OpenStack with multi-user Jupyter Notebook

Spark on OpenStack with Jupyter

Apache Spark is gaining traction as the defacto analysis suite for big data, especially for those using Python. Spark has a rich API for Python and several very useful built-in libraries like MLlib for machine learning and Spark Streaming for realtime analysis. Jupyter (formerly IPython Notebook) is a convenient interface to perform exploratory data analysis and all kinds of other analytic tasks using Python. In this post I’ll show step-by-step how to set up a Spark cluster on OpenStack and configure Jupyter with multi-user access and an easy-to-use PySpark profile.

Setting up a Spark cluster on OpenStack VMs

To get a Spark standalone cluster up and running, all you need to do is spawn some VMs and start Spark as master on one of them and slave on the others. They will automatically form a cluster that you can connect to from Python, Java and Scala applications using the IP address of the master node. Sounds easy enough, right? But there are some pitfalls, so read on for tips on how to avoid them.

To create the first VM to be used as master node, I use the OpenStack command line client. We’ll get that node up and running first. The distribution is Ubuntu 14.04 Trusty. Cloud-init is an easy way to perform bootstrapping of nodes to get the necessary software installed and set up. To install and start a Spark master node I use the following Cloud-init script:

# Cloud-init script to get Spark Master up and running

# Firewall setup
ufw allow from $LOCALNET
ufw allow 80/tcp
ufw allow 443/tcp
ufw allow 4040:4050/tcp
ufw allow 7077/tcp
ufw allow 8080/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Configure Spark master
cp /opt/spark/conf/ /opt/spark/conf/
sed -i 's/# - SPARK_MASTER_OPTS.*/SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4 -Dspark.executor.memory=2G"/' /opt/spark/conf/

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts

# Start Spark Master with IP address of eth0 as the address to use
/opt/spark/sbin/ -h $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/)

Save this as for use with the OpenStack command line client. The script first adds some firewall rules to allow access to the different components and installs the OpenJDK dependency. Next, a Spark tarball is downloaded, unpacked and made available under /opt/spark on the host. The tarball is prepackaged with Hadoop v1 libraries (note the “hadoop1.tgz” suffix), so adjust this if you need Hadoop v2 instead.

The only configuration of Spark we need at this point is to set the options “spark.deploy.defaultCores” and “spark.executor.memory”. They are used to configure how much resources each application will get when it starts. Since the goal is to set up a multi-user environment with Jupyter notebooks, we need to limit the total amount of CPU cores and RAM that each notebook will use. Each notebook is an “application” on the cluster for as long as the notebook is active (i.e until it is shutdown by the user). If we don’t limit the resource allocation, the first notebook created will allocate all available CPU cores on each worker, leaving no CPU cores free for the next user. In addition, the default RAM allocation for each app is only 512 MB on each worker node, which might be a bit too small, so we bump that up to 2 GB.

The echo line adds “spark-master” to /etc/hosts with a reference to the IP address of the VM. Spark tries to resolve the local hostname on startup. Without a resolvable hostname you might encounter “Name or service not known”-errors, resulting in Java exceptions and exits.

On the last line the Spark master process is started. The master process is given the IP address of the local host as an argument to make sure it binds to the correct interface. The IP address is extracted from the output of the “ip addr” command.

One way to launch the master VM with the Cloud-init script is like this:

# Install OpenStack client if not present already
sudo apt-get -y install python-openstackclient

# Customize these values to match your OpenStack cluster

# Create Spark master VM
openstack server create --flavor m1.medium --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-master

If you rather prefer using the web UI (Horizon), you could just as easily paste the Cloud-init script into the text box on the Post-Creation tab of the Launch Instance dialog and archieve the same result.

It will take some minutes for the Spark master VM to finish bootstrapping. When it’s done the Spark master UI will be available on port 8080. Remember to associate a floating IP to the VM to be able to access it from outside the OpenStack project:

openstack ip floating add spark-master

Verify that the Spark master UI is reachable and displays metadata about the cluster. If the UI is not reachable, first check that your Security Group rules allow port 8080 to the Spark master VM. Second, check the Cloud-init logs on the VM to ensure all parts of the initialization succeeded. You’ll find the Cloud-init log file on the VM as /var/log/cloud-init.log and the output from the Cloud-init script in /var/log/cloud-init-output.log. You can also try to re-run parts of the Cloud-init script with sudo to narrow down any issues with the initialization. When initialization succeeds the Spark master UI will look like this:

Spark master UI with no workers

As expected there are no workers alive yet, so let’s initialize some. To do so we use a slightly modified version of the Cloud-init script above. The main difference is the startup command, which is now /opt/spark/sbin/ with the address to the master as the only argument. Remember to adjust the variables below to your IP range and master IP address.

# Cloud-init script to get Spark Worker up and running

# Firewall setup
ufw allow from $LOCALNET
ufw allow 8081/tcp

# Dependencies
apt-get -y update
apt-get -y install openjdk-7-jdk

# Download and unpack Spark
curl -o /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz http://$APACHE_MIRROR/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop1.tgz
tar xvz -C /opt -f /tmp/spark-$SPARK_VERSION-bin-hadoop1.tgz
ln -s /opt/spark-$SPARK_VERSION-bin-hadoop1/ /opt/spark
chown -R root.root /opt/spark-$SPARK_VERSION-bin-hadoop1/*

# Make sure our hostname is resolvable by adding it to /etc/hosts
echo $(ip -o addr show dev eth0 | fgrep "inet " | egrep -o '[0-9.]+/[0-9]+' | cut -f1 -d/) $HOSTNAME | sudo tee -a /etc/hosts
# Start Spark worker with address of Spark master to join cluster 
/opt/spark/sbin/ spark://$SPARK_MASTER_IP:7077

Save this as Notice how the last line starts up a slave with the address to the cluster master on port 7077 as the only argument. Since the Cloud-init script has no worker-specific details, it is easy to expand the cluster just by creating more worker VMs initialized with the same Cloud-init script. Let’s create the first worker:

# Customize these values to match your OpenStack cluster

# Create first Spark worker VM, this time with flavor m1.large
openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker1

Pause for a moment to let the worker creation process finish to ensure that Cloud-init does the necessary work without errors. There is no point in initializing more workers until the process is proven to work as expected. Again, it can be useful to check /var/log/cloud-init.log and /var/log/cloud-init-output.log on the new VM to verify that Cloud-init does what it’s supposed to do. On success you’ll see the worker in the Spark master UI:

Spark master UI with one worker registered

Create some more worker nodes to scale the cluster to handle more parallel tasks:

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker2

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker3

openstack server create --flavor m1.large --image $OS_IMAGE --nic net-id=$OS_NETID --key-name $OS_KEYNAME --user-data spark-worker4

Verify that the new worker nodes show up in the Spark master UI before continuing.

Installing Jupyter and JupyterHub

A shiny new Spark cluster is fine, but we also need interfaces to be able to use it. Spark comes prepackaged with shells for Scala and Python where connection to a cluster is already set up. The same level of usability is possible to get with Jupyter (formerly IPython Notebook), so that when you open a new notebook a connection to the Spark cluster (a SparkContext) is established for you. The SparkContext is available through the variable “sc” in the notebook, ready to use by calling sc.textFile() to create an RDD, for instance.

JupyterHub is a multi-user server for Jupyter notebooks. That makes it possible for several users to use Jupyter independently and have their own notebooks and files in their home directory instead of a shared storage directory for all notebooks. However, this requires that each user has a user account on the VM where JupyterHub is running. Add user accounts for relevant users now if needed. JupyterHub uses unix authentication, meaning that it relays the username and password to the underlying authentication system on the VM for credential check.

In this deployment JupyterHub is installed on the Spark master VM and launched there. It could run on a separate VM, but there is normally no need for that since the Spark master process does not require that much resources. The VM where Jupyter notebooks are executed are called the “driver” in Spark, and that will require some processing power and memory use, depending on the use case.

SSH into the Spark master VM and run the following set of commands:

# Install pip3 and other dependencies
sudo apt-get -y install python3-pip npm nodejs-legacy
sudo npm install -g configurable-http-proxy

# Install JupyterHub and Jupyter
sudo pip3 install jupyterhub
sudo pip3 install "ipython[notebook]"

pip3 is used instead of pip because JupyterHub depends on Python >= 3.3. After installing all software and dependencies, start the JupyterHub service:

sudo jupyterhub --port 80

The benefit of having JupyterHub listen on port 80 instead of the default port 8000 should be obvious, but it requires that you start the service as root. In addition you might want to look into securing JupyterHub with an SSL certificate and have it listen on port 443, since it asks for passwords when users log in. When you have the necessary certificate and keys on the VM, the service can be started like this instead:

sudo jupyterhub --port 443 --ssl-key hub.key --ssl-cert hub.pem

Now try to open the JupyterHub login page on the floating IP address of the VM and log in. After login you should be greeted with an empty home directory with no notebooks. A new notebook can be created by clicking “New” on the right above the notebook list.

Jupyter - notebook list empty

If you create a new notebook, you’ll notice that the only supported kernel is Python3 at the moment. We need to add PySpark to that list to be able to use the Spark cluster from Jupyter.

Configuring Jupyter for PySpark

Jupyter relies on kernels to execute code. The default kernel is Python, but many other languages can be added. To use the Spark cluster from Jupyter we add a separate kernel called PySpark. In addition, kernels can run specific commands on startup, which in this case is used to initialize the SparkContext. First, some dependencies need to be installed:

sudo apt-get -y install python-dev python-setuptools
sudo easy_install pip
sudo pip install py4j
sudo pip install "ipython[notebook]"

It might seem odd to install ipython[notebook] as a dependency, but the reason is that IPython/Jupyter contains a number of Python support modules that kernels rely on. Previously when we installed using pip3, we got the Python3 versions of those modules. When installing again with pip, we get Python2 versions. PySpark depends on Python2.

To add PySpark as a kernel, a file containing a kernel definition must be created. Kernel definitions are JSON files in a specific directory. Kernels can either be enabled globally for all users or for one user only, depending on where the definition file is placed. We want the PySpark kernel to be available for all users, so we’ll add it under /usr/local/share/jupyter/kernels/ like this:

sudo mkdir -p /usr/local/share/jupyter/kernels/pyspark/
cat <<EOF | sudo tee /usr/local/share/jupyter/kernels/pyspark/kernel.json
 "display_name": "PySpark",
 "language": "python",
 "argv": [
 "env": {
  "SPARK_HOME": "/opt/spark/",
  "PYTHONPATH": "/opt/spark/python/:/opt/spark/python/lib/",
  "PYTHONSTARTUP": "/opt/spark/python/pyspark/",
  "PYSPARK_SUBMIT_ARGS": "--master spark:// pyspark-shell"

This kernel definition ensures that the Spark built-in “pyspark-shell” is started under the hood as the process where our code will be executed. Notice how the address to the Spark cluster, “spark://”, is sent as an argument. Remember to customize that address to your specific environment. The address references the Spark master VM (the same host as Jupyter runs on), but could just as easily reference an external host. For instance if you wanted to setup Jupyter on a separate OpenStack VM, or if you already have a Spark cluster running somewhere else that you want to connect to. The Spark master UI shows the right URL to use to connect, right below the Spark logo. Note that the Spark workers depend on being able to establish connections back to the host where the driver process runs (the Jupyter notebook), which may not be possible depending on the firewall setup when connecting to a remote Spark cluster. This is the reason a firewall rule allowing all traffic on the local network ( in my case) is added by Cloud-init on all the Spark VMs.

After adding the kernel definition file for PySpark you’ll have to refresh the Jupyter homepage to see the new kernel in the list. No need to restart JupyterHub.

Debugging PySpark startup errors

If you get an error message about “Dead kernel” when creating new notebooks with the PySpark kernel, there might be several causes. For instance the VM running Jupyter might not be able to connect to the Spark cluster. Or it might lack some dependencies (packages/modules) to initialize the SparkContext. To debug kernel startup issues, first check the output from JupyterHub in the terminal where you started it (might be smart to keep that terminal open until everything works as expected). JupyterHub will for example output log lines like this when a Python module is missing:

[I 2015-09-20 20:31:24.276 ubuntu kernelmanager:85] Kernel started: 8a0b760d-357a-4507-a18b-da4bebd09e3f
/usr/bin/python2: No module named ipykernel
[I 2015-09-20 20:31:27.278 ubuntu restarter:103] KernelRestarter: restarting kernel (1/5)
/usr/bin/python2: No module named ipykernel
[W 2015-09-20 20:31:39.295 ubuntu restarter:95] KernelRestarter: restart failed
[W 2015-09-20 20:31:39.295 ubuntu kernelmanager:52] Kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f died, removing from map.
ERROR:root:kernel 8a0b760d-357a-4507-a18b-da4bebd09e3f restarted failed!
[W 2015-09-20 20:31:39.406 ubuntu handlers:441] Kernel deleted before session

This error occured because I hadn’t installed ipython[notebook] using pip yet, so the Python2 modules needed by PySpark were not available. Notice how the error message states that it is /usr/bin/python2 that reports the error. Jupyter tries to restart the kernel a total of five times, but hits the same error every time and eventually gives up. In the notebook UI this is shown as a “Dead kernel” message.

Other errors can pop up in the Spark logs on master or worker nodes. Spark logs to /opt/spark/logs, so have a look there if anything is malfunctioning. The Spark master node logs every new application that is started on the Spark cluster, so if you don’t see output there when opening a new notebook with the PySpark profile, something is not right.

One last debugging tip is to try to start the PySpark shell from Bash on the VM where Jupyter runs. It is useful to inspect what happens when the PySpark shell starts. Here is an example of output when a dependency is missing:

$ python2 /opt/spark/python/pyspark/
Traceback (most recent call last):
 File "/opt/spark/python/pyspark/", line 28, in <module>
 import py4j
ImportError: No module named py4j

Remember to use Python2 when starting the shell. The above command mimics what Jupyter does behind the scenes when a new notebook is created.

Ready to use PySpark in Jupyter

If everything went according to plan, you now have a Spark cluster which you can easily use from Jupyter notebooks just by creating them with the PySpark profile 🙂 The variable “sc” is initialized as a SparkContext connected to the Spark cluster and you can start exploring the rich Spark API for data transformation and analysis. Here’s a screenshot from a notebook where I extracted responsetime numbers from Varnish NCSA logs (web cache server logs) and computed common statistics like mean and standard deviation for the responsetime of each backend in use by the cache server:

Example use of PySpark in Jupyter

~ Arne ~

Spark – How to fix “WARN TaskSchedulerImpl: Initial job has not accepted any resources”

Apache Spark and Firewalls

When setting up Apache Spark on your own cluster, in my case on OpenStack VMs, a common pitfall is the following error message:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

This error can pop up in the log output of the interactive Python Spark shell or Jupyter (formerly IPython Notebook) after starting a PySpark session and trying to perform any kind of Spark action (like .count() or .take() on a RDD), rendering PySpark unusable.

As the error message suggests, I investigated resource shortages first. The Spark Master UI reported that my PySpark shell had allocated all the available CPU cores and a small portion of the available memory. I therefore lowered the number of CPU cores for each Spark application on the cluster, by adding the following line in on the master node and restarting the master:


After this change my PySpark shell was limited to 4 CPU cores of the 16 CPU cores in my cluster at that time, instead of reserving all available cores (the default setting). However, even though the Spark UI now reported there would be enough free CPU cores and memory to actually run some Spark actions, the error message still popped up and no Spark actions would execute.

While debugging this issue, I came across a Spark-user mailing list post by Marcelo Vanzin of Cloudera where he outlines two possible causes for this particular error:

- You're requesting more resources than the master has available, so
your executors are not starting. Given your explanation this doesn't
seem to be the case.

- The executors are starting, but are having problems connecting 
back to the driver. In this case, you should be able to see 
errors in each executor's log file.

The second of these was causing this error in my case. The host firewall on the host where I ran my PySpark shell rejected the connection attempts back from the worker nodes. After allowing all traffic between all nodes involved, the problem was resolved! The driver host was another VM in the same OpenStack project, so allowing all traffic between the VMs in the same project was OK to do security-wise.

The error message is not particularly useful in the case where executors are unable to connect back to the driver. If you encounter the same error message, remember to check firewall logs from all involved firewalls (host and/or network firewalls).

On a side note, this requirement of Spark to connect back from executors to the driver makes it harder to set up a Spark cluster in a secure way. Unless the driver is in the same security zone as the Spark cluster, it may not be possible to allow the Spark cluster workers to establish connections to the driver host on arbitrary ports. Hopefully the Apache Spark project will address this limitation in a future release, by making sure all necessary connections are established by the driver (client host) only.

~ Arne ~