Celery background task with notifications through socket.io

If you have some long background tasks, sometimes it’s useful to notify a user about the progress. In this article i put an example for flask, celery and socket.io. In short form it’s described in flask-socketio documentation.

Backend

Some python requirements:

flask
flask-socketio
eventlet
redis
celery

An app configuration

If eventlet is used, it should be monkey patched to avoid freezes.

# in the app bootstraping
eventlet.monkey_patch()

REDIS_URL = 'redis://redis_host:6379/0'
socketio = SocketIO()
socketio.init_app(app, message_queue=REDIS_URL)

# on socketio connect join to room for the user
# if user have several browsers, all of them will receive message
@socketio.on('connect')
def connect():
    user_id = session.get('user_id')
    if user_id:
        join_room(user_id)

Some task example

The solution is described in the flask-socketio documentation.

from flask_socketio import SocketIO
socketio = SocketIO(message_queue=REDIS_URL)

@celery.task()
def process_something_for_user(user_id):
    total = 30
    for i in range(total):
        time.sleep(1)
        socketio.emit(
            'event', 
            {
                'code': 'progress', 
                'task': 'process_something',
                'data': {
                    'processed': i,
                    'total': total
                }
            }, room=user_id)
    socketio.emit(
        'event', 
        {
            'code': 'finish', 
            'task': 'process_something'
        }, room=user_id)                

An example of nginx config for socketio

location /socket.io {
    proxy_pass http://app/socket.io;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

    proxy_http_version 1.1;
    proxy_buffering off;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

Frontend

An example of use of socket.io in AngularJS controller.

vm.onEvent = function(msg) {
    $scope.$apply(function () {
        if (msg.code === 'progress') {
            vm.progress[msg.task] = msg.data;
        }
        if (msg.code === 'finish') {
            delete vm.progress[msg.task];
        }
    });
}

var socket = io.connect();
socket.on('event', vm.onEvent);
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/1.7.3/socket.io.min.js"></script> 

<div ng-repeat="(task, progress) in ctrl.progress" class="progress">
    <div class="progress-bar progress-bar-info"
         ng-class="{'progress-bar-striped active': !progress.total}"
         role="progressbar"
         aria-valuenow="{[ progress.processed ]}" aria-valuemin="0" aria-valuemax="{[ progress.total ]}"
         style="width: {[ progress.total ? progress.processed/progress.total*100 : 100  ]}%;">
        <span ng-if="progress.processed ">{[ progress.processed ]}</span><span ng-if="progress.total "> / {[ progress.total ]}</span> {[ task ]}
    </div>
</div>
comments powered by Disqus