Token authenticated aiohttp server#
In the previous blog post, a simple aiohttp server was introduced. In the following post we improve the previous server by introducing a simple token authentication mechanism.
Why authenticate?#
Authentication is a crucial part when dealing with server-client connections. If your resources are limited and your server has no filtering mechanism of whom to serve, an overload of requests would break your server. This the one of the simplest yet efficient Denial of Service attacks (DOS). Hence an authentication mechanism will help filter unrecognized requests and will spare you the computational power and attention.
Why use tokens?#
Token based authentication is one of the cheapest and the simplest approaches. It is an efficient filtering approach if the data communicated is not that important. Depending on how you generate your valid tokens, how you communicate/ store them you can improve your security but this will not be discussed here.
Implementation#
To improve the previous implementation, we will push all the config variables into one JSON file and we will name it config.json. This will include all the configuration parameters for our script. It will look like the following.
1{
2 "server": {
3 "http": {
4 "host": "127.0.0.1",
5 "port": 8000,
6 "request_max_size": 1048576,
7 "allowed_tokens": ["token1","token2"]
8 }
9 },
10 "log": {
11 "level": [{
12 "logger": "server_logger",
13 "level": "DEBUG"
14 }]
15 }
16}
As you see in the config, we are now able to configure the host and the port to run the server on. We are also able to constraint the size of the accepted requests along with providing a list of accepted valid tokens. Furthermore, we added a logger and a debug level to investigate possible errors. The following section will detail the work.
First we configure the logger.
1import logging 2 3# init loggging 4logger = logging.getLogger(__name__) 5logging.basicConfig(format="[%(asctime)s.%(msecs)03d] p%(process)s {%(pathname)s: %(funcName)s: %(lineno)d}: %(levelname)s: %(message)s", datefmt="%Y-%m-%d %p %I:%M:%S") 6logger.setLevel(10)
Load the configuration infos.
1import json 2 3# parse conf 4with open("config.json", "rb") as config_file: 5 conf = json.loads(config_file.read())
Write a request handler. Requests in aiohttp are processed using coroutines that will serve as the actual functions called by the client.
1import asyncio 2from aiohttp import web 3 4@asyncio.coroutine 5def ping(request): 6 return web.json_response({"text": "pong", "status": "success"})
Write the authentication and token handler
1import asyncio 2from aiohttp import web 3from typing import Callable, Coroutine, Tuple 4 5def token_auth_middleware(user_loader: Callable, 6 request_property: str = 'user', 7 auth_scheme: str = 'Token', 8 exclude_routes: Tuple = tuple(), 9 exclude_methods: Tuple = tuple()) -> Coroutine: 10 """ 11 Checks a auth token and adds a user from user_loader in request. 12 """ 13 @web.middleware 14 async def middleware(request, handler): 15 try : scheme, token = request.headers['Authorization'].strip().split(' ') 16 except KeyError : raise web.HTTPUnauthorized(reason='Missing authorization header',) 17 except ValueError : raise web.HTTPForbidden(reason='Invalid authorization header',) 18 19 if auth_scheme.lower() != scheme.lower(): 20 raise web.HTTPForbidden(reason='Invalid token scheme',) 21 22 user = await user_loader(token) 23 if user : request[request_property] = user 24 else : raise web.HTTPForbidden(reason='Token doesn\'t exist') 25 return await handler(request) 26 return middleware
Write the server init function and wrap it all.
1import asyncio 2from aiohttp import web 3 4async def init(): 5 """ 6 Init web application. 7 """ 8 async def user_loader(token: str): 9 user = {'uuid': 'fake-uuid'} if token in conf["server"]["http"]["allowed_tokens"] else None 10 return user 11 12 app = web.Application(client_max_size=conf["server"]["http"]["request_max_size"], 13 middlewares=[token_auth_middleware(user_loader)]) 14 app.router.add_route('GET', '/ping', ping) 15 return app 16 17 18web.run_app(init(),)
Code#
The previously listed steps, should look together in Python as follows:
1import json
2import asyncio
3import logging
4from aiohttp import web
5from typing import Callable, Coroutine, Tuple
6
7
8# init loggging
9logger = logging.getLogger(__name__)
10logging.basicConfig(format="[%(asctime)s.%(msecs)03d] p%(process)s {%(pathname)s: %(funcName)s: %(lineno)d}: %(levelname)s: %(message)s", datefmt="%Y-%m-%d %p %I:%M:%S")
11logger.setLevel(10)
12
13# parse conf
14with open("config.json", "rb") as config_file:
15 conf = json.loads(config_file.read())
16
17@asyncio.coroutine
18def ping(request):
19 logger.debug("-> Received PING")
20 response = web.json_response({"text": "pong", "status": "success"})
21 return response
22
23def token_auth_middleware(user_loader: Callable,
24 request_property: str = 'user',
25 auth_scheme: str = 'Token',
26 exclude_routes: Tuple = tuple(),
27 exclude_methods: Tuple = tuple()) -> Coroutine:
28 """
29 Checks a auth token and adds a user from user_loader in request.
30 """
31 @web.middleware
32 async def middleware(request, handler):
33 try : scheme, token = request.headers['Authorization'].strip().split(' ')
34 except KeyError : raise web.HTTPUnauthorized(reason='Missing authorization header',)
35 except ValueError : raise web.HTTPForbidden(reason='Invalid authorization header',)
36
37 if auth_scheme.lower() != scheme.lower():
38 raise web.HTTPForbidden(reason='Invalid token scheme',)
39
40 user = await user_loader(token)
41 if user : request[request_property] = user
42 else : raise web.HTTPForbidden(reason='Token doesn\'t exist')
43 return await handler(request)
44 return middleware
45
46async def init():
47 """
48 Init web application.
49 """
50 async def user_loader(token: str):
51 user = {'uuid': 'fake-uuid'} if token in conf["server"]["http"]["allowed_tokens"] else None
52 return user
53
54 app = web.Application(client_max_size=conf["server"]["http"]["request_max_size"],
55 middlewares=[token_auth_middleware(user_loader)])
56 app.router.add_route('GET', '/ping', ping)
57 return app
58
59if __name__ == '__main__':
60 web.run_app(init(),)
Testing#
The previous code when executed will start a server running on http://localhost:8080/
.
To test your server either type in your browser http://localhost:8080/ping
or simply curl it while passing your token using curl -H 'Authorization: Token token1' http://127.0.0.1:8080/ping
.
The response should be a json including pong and a success status.
Conclusion#
This blog presented an improved aiohttp server that implements token based authentication to filter out unwanted traffic and protect itself against possible spamming attacks that could be part of DOS attacks.