Mock aiohttp request in unittests

| Comments

Here is an example for the python 3.7.3 and the next versions of libs

aiohttp==3.5.4
asynctest==0.12.3
from typing import Dict
from aiohttp import ClientSession
from aiohttp.web_exceptions import HTTPInternalServerError


async def send_request() -> Dict:
    async with ClientSession() as session:
        async with session.get('http://localhost') as response:
            if response.status != 200:
                raise HTTPInternalServerError()
            return await response.json()


from asynctest import patch, CoroutineMock
from aiohttp import web
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop

from .main import send_request


class SendRequestTestCase(AioHTTPTestCase):

    def set_get_mock_response(self, mock_get, status, json_data):
        mock_get.return_value.__aenter__.return_value.status = status
        mock_get.return_value.__aenter__.return_value.json = CoroutineMock(return_value=json_data)

    async def get_application(self):
        return web.Application()

    @unittest_run_loop
    @patch('aiohttp.ClientSession.get')
    async def test_success(self, mock_get):
        self.set_get_mock_response(mock_get, 200, {'test': True})
        response = await send_request()
        assert response['test']

    @unittest_run_loop
    @patch('aiohttp.ClientSession.get')
    async def test_fail(self, mock_get):
        self.set_get_mock_response(mock_get, 500, {})
        with self.assertRaises(HTTPInternalServerError):
            await send_request()

A script to wait for a service running in docker swarm

| Comments

For docker stack deploy there is no option to wait while all services are in running state. With docker service ps it’s possible to find desired state and current state for one service for each node.

$ docker service ps redis

ID            NAME         IMAGE        NODE      DESIRED STATE  CURRENT STATE                   ERROR  PORTS
50qe8lfnxaxk  redis.1      redis:3.0.6  manager1  Running        Running 6 seconds ago
ky2re9oz86r9   \_ redis.1  redis:3.0.5  manager1  Shutdown       Shutdown 8 seconds ago
3s46te2nzl4i  redis.2      redis:3.0.6  worker2   Running        Running less than a second ago
nvjljf7rmor4   \_ redis.2  redis:3.0.6  worker2   Shutdown       Rejected 23 seconds ago   

To check if service is running on all nodes, it’s necessary to filter all tasks by desired state in ready or running and then check that they all have current state in running.

docker service ps -f desired-state=running -f desired-state=ready --format "{{ .DesiredState }} {{.CurrentState }}" my_service | grep -v "Running Running"

To run this in a loop.

check_running() {
   docker service ps -f desired-state=running -f desired-state=ready --format "{{ .DesiredState }} {{.CurrentState }}" my_service | grep -v "Running Running"
}
ATTEMPTS=0
RESULT=0
until [ $RESULT -eq 1 ] || [ $ATTEMPTS -eq 3 ]; do
  sleep 30
  check_running
  RESULT=$?
  ATTEMPTS=$((ATTEMPTS + 1))
done

set -e
test $RESULT -eq 1

Nginx proxy with prefix

| Comments

It can be done this way.

    location /my_app {
        rewrite ^/also/?(.*)$ /$1 break;
        proxy_pass http://my_app_upstream;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        ...
    }

Without rewrite it will pass /my_app to upstream, so prefix should be handled on my_app level.

    location /my_app {
        proxy_pass http://my_app_upstream;

And with / in upstream it will add an additional /, so urls in proxied service will look like //some-url

    location /my_app {
        proxy_pass http://my_app_upstream/;

How to connect to a service in docker swarm with docker

| Comments

It’s a rare case, suppose there is a myapp swarm cluster with myapp_mongo database without published port and there is a need to run a command from some docker image with connection to this database.

By default docker stack deploy creates a non-attachable network, so docker run --network myapp_default will output Error response from daemon: Could not attach to network myapp_default: rpc error: code = PermissionDenied desc = network myapp_default not manually attachable.

A way to bypass it is to create a new attachable network and attach it to the service.

docker network create --driver overlay --attachable mongo_network
docker service update --network-add mongo_network myapp_mongo
docker run --rm --network mongo_network mongo:4.0.6 mongodump -h lsicloud_mongo ...
docker service update --network-rm mongo_network lsicloud_mongo
docker network rm mongo_network

Get blinker signal's receivers names

| Comments
from unittest import TestCase
from app.main import init_app
from app.signals import my_signal


class SignalTests(TestCase):

    def setUp(self):
        init_app()

    def test_signal_is_connected_to_my_receiver(self)
        receivers = [r().__name__ for r in signal.receivers.values()]
        self.assertIn('my_receiver', receivers)

MongoDB dump and restore with docker

| Comments

Here are two commands to take a partial dump of the collection from production database and put it in dev mongo instance running through docker-compose.

docker run -v `pwd`/:/dump mongo mongodump --gzip --archive=/dump/my_collection.agz --host <connection url> --ssl --username <username> --password <password> --authenticationDatabase admin --db <prod_db> --collection my_collection --query '{date: {$gte:  ISODate("2019-02-01T00:00:00.000+0000")}}'

docker-compose run -v `pwd`/my_collection.agz:/my_collection.agz mongo mongorestore --gzip --archive=/my_collection.agz --host mongo --nsFrom <prod_db>.my_collection --nsTo <dev_db>.my_collection

Easy charts in python app with plotly

| Comments

An example how to add a chart to Flask app with plotly library:

def get_chart_data():
    return mongo_db['item'].aggregate([
        {'$sort': {'date': 1}},
        {'$group': {
            '_id': {'year': {'$year': '$date'}, 'month': {'$month': '$date'}},
            'num': {'$sum': 1}
        }},
        {'$project': {
            'date': {'$dateFromParts' : {'year': '$_id.year', 'month': '$_id.month'}},
            'num': '$num'
        }},
        {'$sort': {'date': 1}}
    ])


def get_chart():
    data = list(get_chart_data())
    layout = plotly.graph_objs.Layout(title='Items by month')
    scatter_data = [
        plotly.graph_objs.Scatter(
            x=[d['date'] for d in data], 
            y=[d['num'] for d in data]
        )
    ]
    fig = plotly.graph_objs.Figure(data=scatter_data, layout=layout)
    return plotly.offline.plot(fig, include_plotlyjs=True, output_type='div')


@route('/chart')
def chart():
    return render_template('chart.html', chart=get_chart())

Jinja template:

{{ chart|safe }}

For jupyter notebook it will be:

from plotly.offline import init_notebook_mode, iplot
init_notebook_mode(connected=True)
iplot(get_chart())

Mongoengine as_pymongo performance

| Comments

When you need to get only several fields from the list of complex objects, it works much faster with as_pymongo function.

In my situation I have a 16x increase:

%timeit list(SomeObject.objects.scalar('id')[0:500])
# 129 ms ± 11 ms per loop

%timeit list(o['_id'] for o in SomeObject.objects.scalar('id')[0:500].as_pymongo())
# 7.98 ms ± 849 µs per loop
« 4/13 »