Gevent Example ServerΒΆ
This example is a basic HTTP/2 server written using gevent, a powerful coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev or libuv event loop.
This example is inspired by the curio one and also demonstrates the correct use of HTTP/2 flow control with h2 and how gevent can be simple to use.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | # -*- coding: utf-8 -*- """ gevent-server.py ================ A simple HTTP/2 server written for gevent serving static files from a directory specified as input. If no directory is provided, the current directory will be used. """ import mimetypes import sys from functools import partial from pathlib import Path from typing import Tuple, Dict, Optional from gevent import socket, ssl from gevent.event import Event from gevent.server import StreamServer from h2 import events from h2.config import H2Configuration from h2.connection import H2Connection def get_http2_tls_context() -> ssl.SSLContext: ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) ctx.options |= ( ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 ) ctx.options |= ssl.OP_NO_COMPRESSION ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20') ctx.load_cert_chain(certfile='localhost.crt', keyfile='localhost.key') ctx.set_alpn_protocols(['h2']) try: ctx.set_npn_protocols(['h2']) except NotImplementedError: pass return ctx class H2Worker: def __init__(self, sock: socket, address: Tuple[str, str], source_dir: str = None): self._sock = sock self._address = address self._flow_control_events: Dict[int, Event] = {} self._server_name = 'gevent-h2' self._connection: Optional[H2Connection] = None self._read_chunk_size = 8192 # The maximum amount of a file we'll send in a single DATA frame self._check_sources_dir(source_dir) self._sources_dir = source_dir self._run() def _initiate_connection(self): config = H2Configuration(client_side=False, header_encoding='utf-8') self._connection = H2Connection(config=config) self._connection.initiate_connection() self._sock.sendall(self._connection.data_to_send()) @staticmethod def _check_sources_dir(sources_dir: str) -> None: p = Path(sources_dir) if not p.is_dir(): raise NotADirectoryError(f'{sources_dir} does not exists') def _send_error_response(self, status_code: str, event: events.RequestReceived) -> None: self._connection.send_headers( stream_id=event.stream_id, headers=[ (':status', status_code), ('content-length', '0'), ('server', self._server_name), ], end_stream=True ) self._sock.sendall(self._connection.data_to_send()) def _handle_request(self, event: events.RequestReceived) -> None: headers = dict(event.headers) if headers[':method'] != 'GET': self._send_error_response('405', event) return file_path = Path(self._sources_dir) / headers[':path'].lstrip('/') if not file_path.is_file(): self._send_error_response('404', event) return self._send_file(file_path, event.stream_id) def _send_file(self, file_path: Path, stream_id: int) -> None: """ Send a file, obeying the rules of HTTP/2 flow control. """ file_size = file_path.stat().st_size content_type, content_encoding = mimetypes.guess_type(str(file_path)) response_headers = [ (':status', '200'), ('content-length', str(file_size)), ('server', self._server_name) ] if content_type: response_headers.append(('content-type', content_type)) if content_encoding: response_headers.append(('content-encoding', content_encoding)) self._connection.send_headers(stream_id, response_headers) self._sock.sendall(self._connection.data_to_send()) with file_path.open(mode='rb', buffering=0) as f: self._send_file_data(f, stream_id) def _send_file_data(self, file_obj, stream_id: int) -> None: """ Send the data portion of a file. Handles flow control rules. """ while True: while self._connection.local_flow_control_window(stream_id) < 1: self._wait_for_flow_control(stream_id) chunk_size = min(self._connection.local_flow_control_window(stream_id), self._read_chunk_size) data = file_obj.read(chunk_size) keep_reading = (len(data) == chunk_size) self._connection.send_data(stream_id, data, not keep_reading) self._sock.sendall(self._connection.data_to_send()) if not keep_reading: break def _wait_for_flow_control(self, stream_id: int) -> None: """ Blocks until the flow control window for a given stream is opened. """ event = Event() self._flow_control_events[stream_id] = event event.wait() def _handle_window_update(self, event: events.WindowUpdated) -> None: """ Unblock streams waiting on flow control, if needed. """ stream_id = event.stream_id if stream_id and stream_id in self._flow_control_events: g_event = self._flow_control_events.pop(stream_id) g_event.set() elif not stream_id: # Need to keep a real list here to use only the events present at this time. blocked_streams = list(self._flow_control_events.keys()) for stream_id in blocked_streams: g_event = self._flow_control_events.pop(stream_id) g_event.set() def _run(self) -> None: self._initiate_connection() while True: data = self._sock.recv(65535) if not data: break h2_events = self._connection.receive_data(data) for event in h2_events: if isinstance(event, events.RequestReceived): self._handle_request(event) elif isinstance(event, events.DataReceived): self._connection.reset_stream(event.stream_id) elif isinstance(event, events.WindowUpdated): self._handle_window_update(event) data_to_send = self._connection.data_to_send() if data_to_send: self._sock.sendall(data_to_send) if __name__ == '__main__': files_dir = sys.argv[1] if len(sys.argv) > 1 else f'{Path().cwd()}' server = StreamServer(('127.0.0.1', 8080), partial(H2Worker, source_dir=files_dir), ssl_context=get_http2_tls_context()) try: server.serve_forever() except KeyboardInterrupt: server.close() |