December 12, 2018

Streaming timeseries with Flask and Plotly

This post describes simple app for streaming cpu utilization to a web page. It uses Flask as websockets server (flask-socketio plugin), socket.io as client library and plotly.js for visualization.

demo

Flask app

Follow flask-socketio doc to create a flask app. SocketIO is going to use Redis as message broker as there will be a separate process that pushes messages to clients. Flask websocket server and this process will communicate through Redis.

def create_app(register_blueprint=True):
    app = Flask(__name__)
    app.secret_key = os.urandom(42)
    if register_blueprint:
        app.register_blueprint(plotting_blueprint)

    socketio = SocketIO(app, message_queue='redis://localhost:6379/')
    socketio.on_event('connect', bootstrap_on_connect) # (1)
    return socketio, app


socketio, application = create_app()


if __name__ == '__main__':
    socketio.run(application)

(1) Here first hook is defined - when client connects we are going to send him some history data - the task for bootstrap_on_connect functions.

The blueprint plotting_blueprint is a simple one that has only one route that serves a static page. This page will load client’s javascript. The only parameter is window size - I’d like to see only last 100 seconds of data.

from flask import Blueprint, render_template

plotting_blueprint = Blueprint('plotting', __name__)


@plotting_blueprint.route('/')
def index():
    return render_template('index.html', x_window=100)

Template and client code

In the Jinja2 HTML template I’m putting window size inside meta tag so JS will be able to read that. Also I’m loading all libraries in the end of body

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="x_window" content={{x_window}}>
    <title>Flask data viz</title>
    <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='styles.css') }}">
</head>
<body>
<div id="plot"></div>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.6/socket.io.min.js"></script>
<script src="{{ url_for('static', filename='plot.js') }}"></script>
</body>
</html>

plot.js will contain all client-side logic.

var url = 'http://' + document.domain + ':' + location.port
var socket = io.connect(url);

socket.on('connect', function(msg) { // (1)
    console.log('connected to websocket on ' + url);
});

socket.on('bootstrap', function (msg) { // (2)
    plot_start = msg.x[0];
    makePlotly( msg.x, msg.y )
});

socket.on('update', function (msg) { // (3)
    streamPlotly( msg.x, msg.y )
});

(1) First we connect to websocket (using HTTP, not WS protocol).

(2) When connected Flask will send us bootstrap message with initial data. Here makePlotly function is invoked. It will initialize plotly stuff.

(3) When update message is received streamPlotly will use Plotly.extendTraces to add data to plotly traces. It also updates the layout so we’ll have nice sliding window.

Running the app

To run the app use uwsgi with gevent. uwsgi config looks like this:

[uwsgi]
module = app:application
uid = www-data

http = 127.0.0.1:5000

gevent-monkey-patch = true
http-websockets = true
gevent = 1000

Protocol for websockets is going to be HTTP so uwsgi will listen on port 5000 instead of communication through UNIX socket with nginx.

Nginx config should create a separate location for websocket url

server {
    listen       80 default_server;
    listen       [::]:80 default_server;
    server_name  localhost;
    root         /var/www/html;
    client_max_body_size 16M;

    location / {
        include proxy_params;
        proxy_pass http://127.0.0.1:5000;
    }

    location /socket.io {
        include proxy_params;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_set_header Host $host;
        proxy_pass http://127.0.0.1:5000/socket.io;
    }
}

Background job

A separate process gets CPU utilization with psutil and pushes messages to clients. Also it defines bootstrap function - but I’m just starting with empty lists for now.

streaming.py:

#!/usr/bin/env python
import time
from datetime import datetime
from psutil import cpu_percent
from flask_socketio import emit, SocketIO

DATE_FMT = "%Y-%m-%d %H:%M:%S"


def bootstrap_on_connect():
    emit('bootstrap', {'x': [datetime.now().strftime(DATE_FMT)], 'y': [0]})


def update_plot():
    socketio = SocketIO(message_queue='redis://localhost:6379/')
    while True:
        datetime_now = datetime.now().strftime(DATE_FMT)
        cpu_percent_second = cpu_percent(interval=1)
        socketio.emit('update', {'x': [datetime_now], 'y': [cpu_percent_second]})
        time.sleep(1)


if __name__ == '__main__':
    update_plot()

Running it all

  1. Install redis server and nginx
  2. Run uwsgi with uwsgi --ini uwsgi.ini
  3. Install all dependencies from requirements.txt into virtual environment
  4. Run streaming.py (chmod +x streaming.py && ./streaming.py)

Further thoughts

There is no persistence to the data and newly connected client won’t be able to see the history. For this some kind of storage is needed. Another approach is to periodically call Flask from javascript to get the data though AJAX call. I find a websocket solution more interesting - but more complex - because it requires to have a separate process to update clients, that offloads some work from web server.

Full code is available from this repo

Do you use websockets for streaming live data? Connect with me on linkedin to discuss it.

© Alexey Smirnov 2023