Asyncio Example ServerΒΆ
This example is a basic HTTP/2 server written using asyncio, using some functionality that was introduced in Python 3.5. This server represents basically just the same JSON-headers-returning server that was built in the Getting Started: Writing Your Own HTTP/2 Server document.
This example demonstrates some basic asyncio techniques.
1# -*- coding: utf-8 -*-
2"""
3asyncio-server.py
4~~~~~~~~~~~~~~~~~
5
6A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+.
7
8This example demonstrates handling requests with bodies, as well as handling
9those without. In particular, it demonstrates the fact that DataReceived may
10be called multiple times, and that applications must handle that possibility.
11"""
12import asyncio
13import io
14import json
15import ssl
16import collections
17from typing import List, Tuple
18
19from h2.config import H2Configuration
20from h2.connection import H2Connection
21from h2.events import (
22 ConnectionTerminated, DataReceived, RemoteSettingsChanged,
23 RequestReceived, StreamEnded, StreamReset, WindowUpdated
24)
25from h2.errors import ErrorCodes
26from h2.exceptions import ProtocolError, StreamClosedError
27from h2.settings import SettingCodes
28
29
30RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
31
32
33class H2Protocol(asyncio.Protocol):
34 def __init__(self):
35 config = H2Configuration(client_side=False, header_encoding='utf-8')
36 self.conn = H2Connection(config=config)
37 self.transport = None
38 self.stream_data = {}
39 self.flow_control_futures = {}
40
41 def connection_made(self, transport: asyncio.Transport):
42 self.transport = transport
43 self.conn.initiate_connection()
44 self.transport.write(self.conn.data_to_send())
45
46 def connection_lost(self, exc):
47 for future in self.flow_control_futures.values():
48 future.cancel()
49 self.flow_control_futures = {}
50
51 def data_received(self, data: bytes):
52 try:
53 events = self.conn.receive_data(data)
54 except ProtocolError as e:
55 self.transport.write(self.conn.data_to_send())
56 self.transport.close()
57 else:
58 self.transport.write(self.conn.data_to_send())
59 for event in events:
60 if isinstance(event, RequestReceived):
61 self.request_received(event.headers, event.stream_id)
62 elif isinstance(event, DataReceived):
63 self.receive_data(event.data, event.stream_id)
64 elif isinstance(event, StreamEnded):
65 self.stream_complete(event.stream_id)
66 elif isinstance(event, ConnectionTerminated):
67 self.transport.close()
68 elif isinstance(event, StreamReset):
69 self.stream_reset(event.stream_id)
70 elif isinstance(event, WindowUpdated):
71 self.window_updated(event.stream_id, event.delta)
72 elif isinstance(event, RemoteSettingsChanged):
73 if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
74 self.window_updated(None, 0)
75
76 self.transport.write(self.conn.data_to_send())
77
78 def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
79 headers = collections.OrderedDict(headers)
80 method = headers[':method']
81
82 # Store off the request data.
83 request_data = RequestData(headers, io.BytesIO())
84 self.stream_data[stream_id] = request_data
85
86 def stream_complete(self, stream_id: int):
87 """
88 When a stream is complete, we can send our response.
89 """
90 try:
91 request_data = self.stream_data[stream_id]
92 except KeyError:
93 # Just return, we probably 405'd this already
94 return
95
96 headers = request_data.headers
97 body = request_data.data.getvalue().decode('utf-8')
98
99 data = json.dumps(
100 {"headers": headers, "body": body}, indent=4
101 ).encode("utf8")
102
103 response_headers = (
104 (':status', '200'),
105 ('content-type', 'application/json'),
106 ('content-length', str(len(data))),
107 ('server', 'asyncio-h2'),
108 )
109 self.conn.send_headers(stream_id, response_headers)
110 asyncio.ensure_future(self.send_data(data, stream_id))
111
112 def receive_data(self, data: bytes, stream_id: int):
113 """
114 We've received some data on a stream. If that stream is one we're
115 expecting data on, save it off. Otherwise, reset the stream.
116 """
117 try:
118 stream_data = self.stream_data[stream_id]
119 except KeyError:
120 self.conn.reset_stream(
121 stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
122 )
123 else:
124 stream_data.data.write(data)
125
126 def stream_reset(self, stream_id):
127 """
128 A stream reset was sent. Stop sending data.
129 """
130 if stream_id in self.flow_control_futures:
131 future = self.flow_control_futures.pop(stream_id)
132 future.cancel()
133
134 async def send_data(self, data, stream_id):
135 """
136 Send data according to the flow control rules.
137 """
138 while data:
139 while self.conn.local_flow_control_window(stream_id) < 1:
140 try:
141 await self.wait_for_flow_control(stream_id)
142 except asyncio.CancelledError:
143 return
144
145 chunk_size = min(
146 self.conn.local_flow_control_window(stream_id),
147 len(data),
148 self.conn.max_outbound_frame_size,
149 )
150
151 try:
152 self.conn.send_data(
153 stream_id,
154 data[:chunk_size],
155 end_stream=(chunk_size == len(data))
156 )
157 except (StreamClosedError, ProtocolError):
158 # The stream got closed and we didn't get told. We're done
159 # here.
160 break
161
162 self.transport.write(self.conn.data_to_send())
163 data = data[chunk_size:]
164
165 async def wait_for_flow_control(self, stream_id):
166 """
167 Waits for a Future that fires when the flow control window is opened.
168 """
169 f = asyncio.Future()
170 self.flow_control_futures[stream_id] = f
171 await f
172
173 def window_updated(self, stream_id, delta):
174 """
175 A window update frame was received. Unblock some number of flow control
176 Futures.
177 """
178 if stream_id and stream_id in self.flow_control_futures:
179 f = self.flow_control_futures.pop(stream_id)
180 f.set_result(delta)
181 elif not stream_id:
182 for f in self.flow_control_futures.values():
183 f.set_result(delta)
184
185 self.flow_control_futures = {}
186
187
188ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
189ssl_context.options |= (
190 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
191)
192ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
193ssl_context.set_alpn_protocols(["h2"])
194
195loop = asyncio.get_event_loop()
196# Each client connection will create a new protocol instance
197coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
198server = loop.run_until_complete(coro)
199
200# Serve requests until Ctrl+C is pressed
201print('Serving on {}'.format(server.sockets[0].getsockname()))
202try:
203 loop.run_forever()
204except KeyboardInterrupt:
205 pass
206
207# Close the server
208server.close()
209loop.run_until_complete(server.wait_closed())
210loop.close()