Asyncio Example ServerΒΆ

This example is a basic HTTP/2 server written using asyncio, using some functionality that was introduced in Python 3.5. This server represents basically just the same JSON-headers-returning server that was built in the Getting Started: Writing Your Own HTTP/2 Server document.

This example demonstrates some basic asyncio techniques.

  1# -*- coding: utf-8 -*-
  2"""
  3asyncio-server.py
  4~~~~~~~~~~~~~~~~~
  5
  6A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+.
  7
  8This example demonstrates handling requests with bodies, as well as handling
  9those without. In particular, it demonstrates the fact that DataReceived may
 10be called multiple times, and that applications must handle that possibility.
 11"""
 12import asyncio
 13import io
 14import json
 15import ssl
 16import collections
 17from typing import List, Tuple
 18
 19from h2.config import H2Configuration
 20from h2.connection import H2Connection
 21from h2.events import (
 22    ConnectionTerminated, DataReceived, RemoteSettingsChanged,
 23    RequestReceived, StreamEnded, StreamReset, WindowUpdated
 24)
 25from h2.errors import ErrorCodes
 26from h2.exceptions import ProtocolError, StreamClosedError
 27from h2.settings import SettingCodes
 28
 29
 30RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
 31
 32
 33class H2Protocol(asyncio.Protocol):
 34    def __init__(self):
 35        config = H2Configuration(client_side=False, header_encoding='utf-8')
 36        self.conn = H2Connection(config=config)
 37        self.transport = None
 38        self.stream_data = {}
 39        self.flow_control_futures = {}
 40
 41    def connection_made(self, transport: asyncio.Transport):
 42        self.transport = transport
 43        self.conn.initiate_connection()
 44        self.transport.write(self.conn.data_to_send())
 45
 46    def connection_lost(self, exc):
 47        for future in self.flow_control_futures.values():
 48            future.cancel()
 49        self.flow_control_futures = {}
 50
 51    def data_received(self, data: bytes):
 52        try:
 53            events = self.conn.receive_data(data)
 54        except ProtocolError as e:
 55            self.transport.write(self.conn.data_to_send())
 56            self.transport.close()
 57        else:
 58            self.transport.write(self.conn.data_to_send())
 59            for event in events:
 60                if isinstance(event, RequestReceived):
 61                    self.request_received(event.headers, event.stream_id)
 62                elif isinstance(event, DataReceived):
 63                    self.receive_data(event.data, event.stream_id)
 64                elif isinstance(event, StreamEnded):
 65                    self.stream_complete(event.stream_id)
 66                elif isinstance(event, ConnectionTerminated):
 67                    self.transport.close()
 68                elif isinstance(event, StreamReset):
 69                    self.stream_reset(event.stream_id)
 70                elif isinstance(event, WindowUpdated):
 71                    self.window_updated(event.stream_id, event.delta)
 72                elif isinstance(event, RemoteSettingsChanged):
 73                    if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
 74                        self.window_updated(None, 0)
 75
 76                self.transport.write(self.conn.data_to_send())
 77
 78    def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
 79        headers = collections.OrderedDict(headers)
 80        method = headers[':method']
 81
 82        # Store off the request data.
 83        request_data = RequestData(headers, io.BytesIO())
 84        self.stream_data[stream_id] = request_data
 85
 86    def stream_complete(self, stream_id: int):
 87        """
 88        When a stream is complete, we can send our response.
 89        """
 90        try:
 91            request_data = self.stream_data[stream_id]
 92        except KeyError:
 93            # Just return, we probably 405'd this already
 94            return
 95
 96        headers = request_data.headers
 97        body = request_data.data.getvalue().decode('utf-8')
 98
 99        data = json.dumps(
100            {"headers": headers, "body": body}, indent=4
101        ).encode("utf8")
102
103        response_headers = (
104            (':status', '200'),
105            ('content-type', 'application/json'),
106            ('content-length', str(len(data))),
107            ('server', 'asyncio-h2'),
108        )
109        self.conn.send_headers(stream_id, response_headers)
110        asyncio.ensure_future(self.send_data(data, stream_id))
111
112    def receive_data(self, data: bytes, stream_id: int):
113        """
114        We've received some data on a stream. If that stream is one we're
115        expecting data on, save it off. Otherwise, reset the stream.
116        """
117        try:
118            stream_data = self.stream_data[stream_id]
119        except KeyError:
120            self.conn.reset_stream(
121                stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
122            )
123        else:
124            stream_data.data.write(data)
125
126    def stream_reset(self, stream_id):
127        """
128        A stream reset was sent. Stop sending data.
129        """
130        if stream_id in self.flow_control_futures:
131            future = self.flow_control_futures.pop(stream_id)
132            future.cancel()
133
134    async def send_data(self, data, stream_id):
135        """
136        Send data according to the flow control rules.
137        """
138        while data:
139            while self.conn.local_flow_control_window(stream_id) < 1:
140                try:
141                    await self.wait_for_flow_control(stream_id)
142                except asyncio.CancelledError:
143                    return
144
145            chunk_size = min(
146                self.conn.local_flow_control_window(stream_id),
147                len(data),
148                self.conn.max_outbound_frame_size,
149            )
150
151            try:
152                self.conn.send_data(
153                    stream_id,
154                    data[:chunk_size],
155                    end_stream=(chunk_size == len(data))
156                )
157            except (StreamClosedError, ProtocolError):
158                # The stream got closed and we didn't get told. We're done
159                # here.
160                break
161
162            self.transport.write(self.conn.data_to_send())
163            data = data[chunk_size:]
164
165    async def wait_for_flow_control(self, stream_id):
166        """
167        Waits for a Future that fires when the flow control window is opened.
168        """
169        f = asyncio.Future()
170        self.flow_control_futures[stream_id] = f
171        await f
172
173    def window_updated(self, stream_id, delta):
174        """
175        A window update frame was received. Unblock some number of flow control
176        Futures.
177        """
178        if stream_id and stream_id in self.flow_control_futures:
179            f = self.flow_control_futures.pop(stream_id)
180            f.set_result(delta)
181        elif not stream_id:
182            for f in self.flow_control_futures.values():
183                f.set_result(delta)
184
185            self.flow_control_futures = {}
186
187
188ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
189ssl_context.options |= (
190    ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
191)
192ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
193ssl_context.set_alpn_protocols(["h2"])
194
195loop = asyncio.get_event_loop()
196# Each client connection will create a new protocol instance
197coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
198server = loop.run_until_complete(coro)
199
200# Serve requests until Ctrl+C is pressed
201print('Serving on {}'.format(server.sockets[0].getsockname()))
202try:
203    loop.run_forever()
204except KeyboardInterrupt:
205    pass
206
207# Close the server
208server.close()
209loop.run_until_complete(server.wait_closed())
210loop.close()