Gevent Example ServerΒΆ

This example is a basic HTTP/2 server written using gevent, a powerful coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev or libuv event loop.

This example is inspired by the curio one and also demonstrates the correct use of HTTP/2 flow control with h2 and how gevent can be simple to use.

  1# -*- coding: utf-8 -*-
  2"""
  3gevent-server.py
  4================
  5
  6A simple HTTP/2 server written for gevent serving static files from a directory specified as input.
  7If no directory is provided, the current directory will be used.
  8"""
  9import mimetypes
 10import sys
 11from functools import partial
 12from pathlib import Path
 13from typing import Tuple, Dict, Optional
 14
 15from gevent import socket, ssl
 16from gevent.event import Event
 17from gevent.server import StreamServer
 18from h2 import events
 19from h2.config import H2Configuration
 20from h2.connection import H2Connection
 21
 22
 23def get_http2_tls_context() -> ssl.SSLContext:
 24    ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
 25    ctx.options |= (
 26            ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
 27    )
 28
 29    ctx.options |= ssl.OP_NO_COMPRESSION
 30    ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
 31    ctx.load_cert_chain(certfile='localhost.crt', keyfile='localhost.key')
 32    ctx.set_alpn_protocols(['h2'])
 33    try:
 34        ctx.set_npn_protocols(['h2'])
 35    except NotImplementedError:
 36        pass
 37
 38    return ctx
 39
 40
 41class H2Worker:
 42
 43    def __init__(self, sock: socket, address: Tuple[str, str], source_dir: str = None):
 44        self._sock = sock
 45        self._address = address
 46        self._flow_control_events: Dict[int, Event] = {}
 47        self._server_name = 'gevent-h2'
 48        self._connection: Optional[H2Connection] = None
 49        self._read_chunk_size = 8192  # The maximum amount of a file we'll send in a single DATA frame
 50
 51        self._check_sources_dir(source_dir)
 52        self._sources_dir = source_dir
 53
 54        self._run()
 55
 56    def _initiate_connection(self):
 57        config = H2Configuration(client_side=False, header_encoding='utf-8')
 58        self._connection = H2Connection(config=config)
 59        self._connection.initiate_connection()
 60        self._sock.sendall(self._connection.data_to_send())
 61
 62    @staticmethod
 63    def _check_sources_dir(sources_dir: str) -> None:
 64        p = Path(sources_dir)
 65        if not p.is_dir():
 66            raise NotADirectoryError(f'{sources_dir} does not exists')
 67
 68    def _send_error_response(self, status_code: str, event: events.RequestReceived) -> None:
 69        self._connection.send_headers(
 70            stream_id=event.stream_id,
 71            headers=[
 72                (':status', status_code),
 73                ('content-length', '0'),
 74                ('server', self._server_name),
 75            ],
 76            end_stream=True
 77        )
 78        self._sock.sendall(self._connection.data_to_send())
 79
 80    def _handle_request(self, event: events.RequestReceived) -> None:
 81        headers = dict(event.headers)
 82        if headers[':method'] != 'GET':
 83            self._send_error_response('405', event)
 84            return
 85
 86        file_path = Path(self._sources_dir) / headers[':path'].lstrip('/')
 87        if not file_path.is_file():
 88            self._send_error_response('404', event)
 89            return
 90
 91        self._send_file(file_path, event.stream_id)
 92
 93    def _send_file(self, file_path: Path, stream_id: int) -> None:
 94        """
 95        Send a file, obeying the rules of HTTP/2 flow control.
 96        """
 97        file_size = file_path.stat().st_size
 98        content_type, content_encoding = mimetypes.guess_type(str(file_path))
 99        response_headers = [
100            (':status', '200'),
101            ('content-length', str(file_size)),
102            ('server', self._server_name)
103        ]
104        if content_type:
105            response_headers.append(('content-type', content_type))
106        if content_encoding:
107            response_headers.append(('content-encoding', content_encoding))
108
109        self._connection.send_headers(stream_id, response_headers)
110        self._sock.sendall(self._connection.data_to_send())
111
112        with file_path.open(mode='rb', buffering=0) as f:
113            self._send_file_data(f, stream_id)
114
115    def _send_file_data(self, file_obj, stream_id: int) -> None:
116        """
117        Send the data portion of a file. Handles flow control rules.
118        """
119        while True:
120            while self._connection.local_flow_control_window(stream_id) < 1:
121                self._wait_for_flow_control(stream_id)
122
123            chunk_size = min(self._connection.local_flow_control_window(stream_id), self._read_chunk_size)
124            data = file_obj.read(chunk_size)
125            keep_reading = (len(data) == chunk_size)
126
127            self._connection.send_data(stream_id, data, not keep_reading)
128            self._sock.sendall(self._connection.data_to_send())
129
130            if not keep_reading:
131                break
132
133    def _wait_for_flow_control(self, stream_id: int) -> None:
134        """
135        Blocks until the flow control window for a given stream is opened.
136        """
137        event = Event()
138        self._flow_control_events[stream_id] = event
139        event.wait()
140
141    def _handle_window_update(self, event: events.WindowUpdated) -> None:
142        """
143        Unblock streams waiting on flow control, if needed.
144        """
145        stream_id = event.stream_id
146
147        if stream_id and stream_id in self._flow_control_events:
148            g_event = self._flow_control_events.pop(stream_id)
149            g_event.set()
150        elif not stream_id:
151            # Need to keep a real list here to use only the events present at this time.
152            blocked_streams = list(self._flow_control_events.keys())
153            for stream_id in blocked_streams:
154                g_event = self._flow_control_events.pop(stream_id)
155                g_event.set()
156
157    def _run(self) -> None:
158        self._initiate_connection()
159
160        while True:
161            data = self._sock.recv(65535)
162            if not data:
163                break
164
165            h2_events = self._connection.receive_data(data)
166            for event in h2_events:
167                if isinstance(event, events.RequestReceived):
168                    self._handle_request(event)
169                elif isinstance(event, events.DataReceived):
170                    self._connection.reset_stream(event.stream_id)
171                elif isinstance(event, events.WindowUpdated):
172                    self._handle_window_update(event)
173
174            data_to_send = self._connection.data_to_send()
175            if data_to_send:
176                self._sock.sendall(data_to_send)
177
178
179if __name__ == '__main__':
180    files_dir = sys.argv[1] if len(sys.argv) > 1 else f'{Path().cwd()}'
181    server = StreamServer(('127.0.0.1', 8080), partial(H2Worker, source_dir=files_dir),
182                          ssl_context=get_http2_tls_context())
183    try:
184        server.serve_forever()
185    except KeyboardInterrupt:
186        server.close()