Curio Example Server

This example is a basic HTTP/2 server written using curio, David Beazley’s example of how to build a concurrent networking framework using Python 3.5’s new async/await syntax.

This example is notable for demonstrating the correct use of HTTP/2 flow control with h2. It is also a good example of the brand new syntax.

  1#!/usr/bin/env python3.5
  2# -*- coding: utf-8 -*-
  3"""
  4curio-server.py
  5~~~~~~~~~~~~~~~
  6
  7A fully-functional HTTP/2 server written for curio.
  8
  9Requires Python 3.5+.
 10"""
 11import mimetypes
 12import os
 13import sys
 14
 15from curio import Event, spawn, socket, ssl, run
 16
 17import h2.config
 18import h2.connection
 19import h2.events
 20
 21
 22# The maximum amount of a file we'll send in a single DATA frame.
 23READ_CHUNK_SIZE = 8192
 24
 25
 26async def create_listening_ssl_socket(address, certfile, keyfile):
 27    """
 28    Create and return a listening TLS socket on a given address.
 29    """
 30    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
 31    ssl_context.options |= (
 32        ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
 33    )
 34    ssl_context.set_ciphers("ECDHE+AESGCM")
 35    ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)
 36    ssl_context.set_alpn_protocols(["h2"])
 37
 38    sock = socket.socket()
 39    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 40    sock = await ssl_context.wrap_socket(sock)
 41    sock.bind(address)
 42    sock.listen()
 43
 44    return sock
 45
 46
 47async def h2_server(address, root, certfile, keyfile):
 48    """
 49    Create an HTTP/2 server at the given address.
 50    """
 51    sock = await create_listening_ssl_socket(address, certfile, keyfile)
 52    print("Now listening on %s:%d" % address)
 53
 54    async with sock:
 55        while True:
 56            client, _ = await sock.accept()
 57            server = H2Server(client, root)
 58            await spawn(server.run())
 59
 60
 61class H2Server:
 62    """
 63    A basic HTTP/2 file server. This is essentially very similar to
 64    SimpleHTTPServer from the standard library, but uses HTTP/2 instead of
 65    HTTP/1.1.
 66    """
 67    def __init__(self, sock, root):
 68        config = h2.config.H2Configuration(
 69            client_side=False, header_encoding='utf-8'
 70        )
 71        self.sock = sock
 72        self.conn = h2.connection.H2Connection(config=config)
 73        self.root = root
 74        self.flow_control_events = {}
 75
 76    async def run(self):
 77        """
 78        Loop over the connection, managing it appropriately.
 79        """
 80        self.conn.initiate_connection()
 81        await self.sock.sendall(self.conn.data_to_send())
 82
 83        while True:
 84            # 65535 is basically arbitrary here: this amounts to "give me
 85            # whatever data you have".
 86            data = await self.sock.recv(65535)
 87            if not data:
 88                break
 89
 90            events = self.conn.receive_data(data)
 91            for event in events:
 92                if isinstance(event, h2.events.RequestReceived):
 93                    await spawn(
 94                        self.request_received(event.headers, event.stream_id)
 95                    )
 96                elif isinstance(event, h2.events.DataReceived):
 97                    self.conn.reset_stream(event.stream_id)
 98                elif isinstance(event, h2.events.WindowUpdated):
 99                    await self.window_updated(event)
100
101            await self.sock.sendall(self.conn.data_to_send())
102
103    async def request_received(self, headers, stream_id):
104        """
105        Handle a request by attempting to serve a suitable file.
106        """
107        headers = dict(headers)
108        assert headers[':method'] == 'GET'
109
110        path = headers[':path'].lstrip('/')
111        full_path = os.path.join(self.root, path)
112
113        if not os.path.exists(full_path):
114            response_headers = (
115                (':status', '404'),
116                ('content-length', '0'),
117                ('server', 'curio-h2'),
118            )
119            self.conn.send_headers(
120                stream_id, response_headers, end_stream=True
121            )
122            await self.sock.sendall(self.conn.data_to_send())
123        else:
124            await self.send_file(full_path, stream_id)
125
126    async def send_file(self, file_path, stream_id):
127        """
128        Send a file, obeying the rules of HTTP/2 flow control.
129        """
130        filesize = os.stat(file_path).st_size
131        content_type, content_encoding = mimetypes.guess_type(file_path)
132        response_headers = [
133            (':status', '200'),
134            ('content-length', str(filesize)),
135            ('server', 'curio-h2'),
136        ]
137        if content_type:
138            response_headers.append(('content-type', content_type))
139        if content_encoding:
140            response_headers.append(('content-encoding', content_encoding))
141
142        self.conn.send_headers(stream_id, response_headers)
143        await self.sock.sendall(self.conn.data_to_send())
144
145        with open(file_path, 'rb', buffering=0) as f:
146            await self._send_file_data(f, stream_id)
147
148    async def _send_file_data(self, fileobj, stream_id):
149        """
150        Send the data portion of a file. Handles flow control rules.
151        """
152        while True:
153            while self.conn.local_flow_control_window(stream_id) < 1:
154                await self.wait_for_flow_control(stream_id)
155
156            chunk_size = min(
157                self.conn.local_flow_control_window(stream_id),
158                READ_CHUNK_SIZE,
159            )
160
161            data = fileobj.read(chunk_size)
162            keep_reading = (len(data) == chunk_size)
163
164            self.conn.send_data(stream_id, data, not keep_reading)
165            await self.sock.sendall(self.conn.data_to_send())
166
167            if not keep_reading:
168                break
169
170    async def wait_for_flow_control(self, stream_id):
171        """
172        Blocks until the flow control window for a given stream is opened.
173        """
174        evt = Event()
175        self.flow_control_events[stream_id] = evt
176        await evt.wait()
177
178    async def window_updated(self, event):
179        """
180        Unblock streams waiting on flow control, if needed.
181        """
182        stream_id = event.stream_id
183
184        if stream_id and stream_id in self.flow_control_events:
185            evt = self.flow_control_events.pop(stream_id)
186            await evt.set()
187        elif not stream_id:
188            # Need to keep a real list here to use only the events present at
189            # this time.
190            blocked_streams = list(self.flow_control_events.keys())
191            for stream_id in blocked_streams:
192                event = self.flow_control_events.pop(stream_id)
193                await event.set()
194        return
195
196
197if __name__ == '__main__':
198    host = sys.argv[2] if len(sys.argv) > 2 else "localhost"
199    print("Try GETting:")
200    print("    On OSX after 'brew install curl --with-c-ares --with-libidn --with-nghttp2 --with-openssl':")
201    print("/usr/local/opt/curl/bin/curl --tlsv1.2 --http2 -k https://localhost:5000/bundle.js")
202    print("Or open a browser to: https://localhost:5000/")
203    print("   (Accept all the warnings)")
204    run(h2_server((host, 5000), sys.argv[1],
205                  "{}.crt.pem".format(host),
206                  "{}.key".format(host)), with_monitor=True)