Twisted Example ServerΒΆ
This example is a basic HTTP/2 server written for the Twisted asynchronous networking framework. This is a relatively fleshed out example, and in particular it makes sure to obey HTTP/2 flow control rules.
This server differs from some of the other example servers by serving files, rather than simply sending JSON responses. This makes the example lengthier, but also brings it closer to a real-world use-case.
1# -*- coding: utf-8 -*-
2"""
3twisted-server.py
4~~~~~~~~~~~~~~~~~
5
6A fully-functional HTTP/2 server written for Twisted.
7"""
8import functools
9import mimetypes
10import os
11import os.path
12import sys
13
14from OpenSSL import crypto
15from twisted.internet.defer import Deferred, inlineCallbacks
16from twisted.internet.protocol import Protocol, Factory
17from twisted.internet import endpoints, reactor, ssl
18from h2.config import H2Configuration
19from h2.connection import H2Connection
20from h2.events import (
21 RequestReceived, DataReceived, WindowUpdated
22)
23from h2.exceptions import ProtocolError
24
25
26def close_file(file, d):
27 file.close()
28
29
30READ_CHUNK_SIZE = 8192
31
32
33class H2Protocol(Protocol):
34 def __init__(self, root):
35 config = H2Configuration(client_side=False)
36 self.conn = H2Connection(config=config)
37 self.known_proto = None
38 self.root = root
39
40 self._flow_control_deferreds = {}
41
42 def connectionMade(self):
43 self.conn.initiate_connection()
44 self.transport.write(self.conn.data_to_send())
45
46 def dataReceived(self, data):
47 if not self.known_proto:
48 self.known_proto = True
49
50 try:
51 events = self.conn.receive_data(data)
52 except ProtocolError:
53 if self.conn.data_to_send:
54 self.transport.write(self.conn.data_to_send())
55 self.transport.loseConnection()
56 else:
57 for event in events:
58 if isinstance(event, RequestReceived):
59 self.requestReceived(event.headers, event.stream_id)
60 elif isinstance(event, DataReceived):
61 self.dataFrameReceived(event.stream_id)
62 elif isinstance(event, WindowUpdated):
63 self.windowUpdated(event)
64
65 if self.conn.data_to_send:
66 self.transport.write(self.conn.data_to_send())
67
68 def requestReceived(self, headers, stream_id):
69 headers = dict(headers) # Invalid conversion, fix later.
70 assert headers[b':method'] == b'GET'
71
72 path = headers[b':path'].lstrip(b'/')
73 full_path = os.path.join(self.root, path)
74
75 if not os.path.exists(full_path):
76 response_headers = (
77 (':status', '404'),
78 ('content-length', '0'),
79 ('server', 'twisted-h2'),
80 )
81 self.conn.send_headers(
82 stream_id, response_headers, end_stream=True
83 )
84 self.transport.write(self.conn.data_to_send())
85 else:
86 self.sendFile(full_path, stream_id)
87
88 return
89
90 def dataFrameReceived(self, stream_id):
91 self.conn.reset_stream(stream_id)
92 self.transport.write(self.conn.data_to_send())
93
94 def sendFile(self, file_path, stream_id):
95 filesize = os.stat(file_path).st_size
96 content_type, content_encoding = mimetypes.guess_type(
97 file_path.decode('utf-8')
98 )
99 response_headers = [
100 (':status', '200'),
101 ('content-length', str(filesize)),
102 ('server', 'twisted-h2'),
103 ]
104 if content_type:
105 response_headers.append(('content-type', content_type))
106 if content_encoding:
107 response_headers.append(('content-encoding', content_encoding))
108
109 self.conn.send_headers(stream_id, response_headers)
110 self.transport.write(self.conn.data_to_send())
111
112 f = open(file_path, 'rb')
113 d = self._send_file(f, stream_id)
114 d.addErrback(functools.partial(close_file, f))
115
116 def windowUpdated(self, event):
117 """
118 Handle a WindowUpdated event by firing any waiting data sending
119 callbacks.
120 """
121 stream_id = event.stream_id
122
123 if stream_id and stream_id in self._flow_control_deferreds:
124 d = self._flow_control_deferreds.pop(stream_id)
125 d.callback(event.delta)
126 elif not stream_id:
127 for d in self._flow_control_deferreds.values():
128 d.callback(event.delta)
129
130 self._flow_control_deferreds = {}
131
132 return
133
134 @inlineCallbacks
135 def _send_file(self, file, stream_id):
136 """
137 This callback sends more data for a given file on the stream.
138 """
139 keep_reading = True
140 while keep_reading:
141 while not self.conn.remote_flow_control_window(stream_id):
142 yield self.wait_for_flow_control(stream_id)
143
144 chunk_size = min(
145 self.conn.remote_flow_control_window(stream_id), READ_CHUNK_SIZE
146 )
147 data = file.read(chunk_size)
148 keep_reading = len(data) == chunk_size
149 self.conn.send_data(stream_id, data, not keep_reading)
150 self.transport.write(self.conn.data_to_send())
151
152 if not keep_reading:
153 break
154
155 file.close()
156
157 def wait_for_flow_control(self, stream_id):
158 """
159 Returns a Deferred that fires when the flow control window is opened.
160 """
161 d = Deferred()
162 self._flow_control_deferreds[stream_id] = d
163 return d
164
165
166class H2Factory(Factory):
167 def __init__(self, root):
168 self.root = root
169
170 def buildProtocol(self, addr):
171 print(H2Protocol)
172 return H2Protocol(self.root)
173
174
175root = sys.argv[1].encode('utf-8')
176
177with open('server.crt', 'r') as f:
178 cert_data = f.read()
179with open('server.key', 'r') as f:
180 key_data = f.read()
181
182cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
183key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_data)
184options = ssl.CertificateOptions(
185 privateKey=key,
186 certificate=cert,
187 acceptableProtocols=[b'h2'],
188)
189
190endpoint = endpoints.SSL4ServerEndpoint(reactor, 8080, options, backlog=128)
191endpoint.listen(H2Factory(root))
192reactor.run()