Token authenticated aiohttp server#


In the previous blog post, a simple aiohttp server was introduced. In the following post we improve the previous server by introducing a simple token authentication mechanism.

Why authenticate?#

Authentication is a crucial part when dealing with server-client connections. If your resources are limited and your server has no filtering mechanism of whom to serve, an overload of requests would break your server. This the one of the simplest yet efficient Denial of Service attacks (DOS). Hence an authentication mechanism will help filter unrecognized requests and will spare you the computational power and attention.

Why use tokens?#

Token based authentication is one of the cheapest and the simplest approaches. It is an efficient filtering approach if the data communicated is not that important. Depending on how you generate your valid tokens, how you communicate/ store them you can improve your security but this will not be discussed here.

Implementation#

To improve the previous implementation, we will push all the config variables into one JSON file and we will name it config.json. This will include all the configuration parameters for our script. It will look like the following.

config.json#
 1{
 2  "server": {
 3    "http": {
 4      "host": "127.0.0.1",
 5      "port": 8000,
 6      "request_max_size": 1048576,
 7      "allowed_tokens": ["token1","token2"]
 8    }
 9  },
10  "log": {
11    "level": [{
12      "logger": "server_logger",
13      "level": "DEBUG"
14    }]
15  }
16}

As you see in the config, we are now able to configure the host and the port to run the server on. We are also able to constraint the size of the accepted requests along with providing a list of accepted valid tokens. Furthermore, we added a logger and a debug level to investigate possible errors. The following section will detail the work.

  1. First we configure the logger.

    1import logging
    2
    3# init loggging
    4logger = logging.getLogger(__name__)
    5logging.basicConfig(format="[%(asctime)s.%(msecs)03d] p%(process)s {%(pathname)s: %(funcName)s: %(lineno)d}: %(levelname)s: %(message)s", datefmt="%Y-%m-%d %p %I:%M:%S")
    6logger.setLevel(10)
    
  2. Load the configuration infos.

    1import json
    2
    3# parse conf
    4with open("config.json", "rb") as config_file:
    5    conf = json.loads(config_file.read())
    
  3. Write a request handler. Requests in aiohttp are processed using coroutines that will serve as the actual functions called by the client.

    1import asyncio
    2from aiohttp import web
    3
    4@asyncio.coroutine
    5def ping(request):
    6    return web.json_response({"text": "pong", "status": "success"})
    
  4. Write the authentication and token handler

     1import asyncio
     2from aiohttp import web
     3from typing import Callable, Coroutine, Tuple
     4
     5def token_auth_middleware(user_loader: Callable,
     6                          request_property: str = 'user',
     7                          auth_scheme: str = 'Token',
     8                          exclude_routes: Tuple = tuple(),
     9                          exclude_methods: Tuple = tuple()) -> Coroutine:
    10    """
    11    Checks a auth token and adds a user from user_loader in request.
    12    """
    13    @web.middleware
    14    async def middleware(request, handler):
    15        try               : scheme, token = request.headers['Authorization'].strip().split(' ')
    16        except KeyError   : raise web.HTTPUnauthorized(reason='Missing authorization header',)
    17        except ValueError : raise web.HTTPForbidden(reason='Invalid authorization header',)
    18
    19        if auth_scheme.lower() != scheme.lower():
    20            raise web.HTTPForbidden(reason='Invalid token scheme',)
    21
    22        user = await user_loader(token)
    23        if user : request[request_property] = user
    24        else    : raise web.HTTPForbidden(reason='Token doesn\'t exist')
    25        return await handler(request)
    26    return middleware
    
  5. Write the server init function and wrap it all.

     1import asyncio
     2from aiohttp import web
     3
     4async def init():
     5   """
     6   Init web application.
     7   """
     8   async def user_loader(token: str):
     9       user = {'uuid': 'fake-uuid'} if token in conf["server"]["http"]["allowed_tokens"] else None
    10       return user
    11
    12   app = web.Application(client_max_size=conf["server"]["http"]["request_max_size"],
    13                         middlewares=[token_auth_middleware(user_loader)])
    14   app.router.add_route('GET', '/ping', ping)
    15   return app
    16
    17
    18web.run_app(init(),)
    

Code#

The previously listed steps, should look together in Python as follows:

aiohttp-server-with-token-auth#
 1import json
 2import asyncio
 3import logging
 4from aiohttp import web
 5from typing import Callable, Coroutine, Tuple
 6
 7
 8# init loggging
 9logger = logging.getLogger(__name__)
10logging.basicConfig(format="[%(asctime)s.%(msecs)03d] p%(process)s {%(pathname)s: %(funcName)s: %(lineno)d}: %(levelname)s: %(message)s", datefmt="%Y-%m-%d %p %I:%M:%S")
11logger.setLevel(10)
12
13# parse conf
14with open("config.json", "rb") as config_file:
15    conf = json.loads(config_file.read())
16
17@asyncio.coroutine
18def ping(request):
19    logger.debug("-> Received PING")
20    response = web.json_response({"text": "pong", "status": "success"})
21    return response
22
23def token_auth_middleware(user_loader: Callable,
24                          request_property: str = 'user',
25                          auth_scheme: str = 'Token',
26                          exclude_routes: Tuple = tuple(),
27                          exclude_methods: Tuple = tuple()) -> Coroutine:
28    """
29    Checks a auth token and adds a user from user_loader in request.
30    """
31    @web.middleware
32    async def middleware(request, handler):
33        try               : scheme, token = request.headers['Authorization'].strip().split(' ')
34        except KeyError   : raise web.HTTPUnauthorized(reason='Missing authorization header',)
35        except ValueError : raise web.HTTPForbidden(reason='Invalid authorization header',)
36
37        if auth_scheme.lower() != scheme.lower():
38            raise web.HTTPForbidden(reason='Invalid token scheme',)
39
40        user = await user_loader(token)
41        if user : request[request_property] = user
42        else    : raise web.HTTPForbidden(reason='Token doesn\'t exist')
43        return await handler(request)
44    return middleware
45
46async def init():
47    """
48    Init web application.
49    """
50    async def user_loader(token: str):
51        user = {'uuid': 'fake-uuid'} if token in conf["server"]["http"]["allowed_tokens"] else None
52        return user
53
54    app = web.Application(client_max_size=conf["server"]["http"]["request_max_size"],
55                          middlewares=[token_auth_middleware(user_loader)])
56    app.router.add_route('GET', '/ping', ping)
57    return app
58
59if __name__ == '__main__':
60    web.run_app(init(),)

Testing#

The previous code when executed will start a server running on http://localhost:8080/. To test your server either type in your browser http://localhost:8080/ping or simply curl it while passing your token using curl -H 'Authorization: Token token1' http://127.0.0.1:8080/ping. The response should be a json including pong and a success status.

Conclusion#

This blog presented an improved aiohttp server that implements token based authentication to filter out unwanted traffic and protect itself against possible spamming attacks that could be part of DOS attacks.

Share this blog#

Updated on 08 April 2022

👨‍💻 edited and review were on 08.04.2022