Example HTTP/2-only WSGI Server

This example is a more complex HTTP/2 server that acts as a WSGI server, passing data to an arbitrary WSGI application. This example is written using asyncio. The server supports most of PEP-3333, and so could in principle be used as a production WSGI server: however, that’s not recommended as certain shortcuts have been taken to ensure ease of implementation and understanding.

The main advantages of this example are:

  1. It properly demonstrates HTTP/2 flow control management.

  2. It demonstrates how to plug hyper-h2 into a larger, more complex application.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
# -*- coding: utf-8 -*-
"""
asyncio-server.py
~~~~~~~~~~~~~~~~~

A fully-functional WSGI server, written using hyper-h2. Requires asyncio.

To test it, try installing httpbin from pip (``pip install httpbin``) and then
running the server (``python asyncio-server.py httpbin:app``).

This server does not support HTTP/1.1: it is a HTTP/2-only WSGI server. The
purpose of this code is to demonstrate how to integrate hyper-h2 into a more
complex application, and to demonstrate several principles of concurrent
programming.

The architecture looks like this:

+---------------------------------+
|     1x HTTP/2 Server Thread     |
|        (running asyncio)        |
+---------------------------------+
+---------------------------------+
|    N WSGI Application Threads   |
|           (no asyncio)          |
+---------------------------------+

Essentially, we spin up an asyncio-based event loop in the main thread. This
launches one HTTP/2 Protocol instance for each inbound connection, all of which
will read and write data from within the main thread in an asynchronous manner.

When each HTTP request comes in, the server will build the WSGI environment
dictionary and create a ``Stream`` object. This object will hold the relevant
state for the request/response pair and will act as the WSGI side of the logic.
That object will then be passed to a background thread pool, and when a worker
is available the WSGI logic will begin to be executed. This model ensures that
the asyncio web server itself is never blocked by the WSGI application.

The WSGI application and the HTTP/2 server communicate via an asyncio queue,
together with locks and threading events. The locks themselves are implicit in
asyncio's "call_soon_threadsafe", which allows for a background thread to
register an action with the main asyncio thread. When the asyncio thread
eventually takes the action in question it sets as threading event, signaling
to the background thread that it is free to continue its work.

To make the WSGI application work with flow control, there is a very important
invariant that must be observed. Any WSGI action that would cause data to be
emitted to the network MUST be accompanied by a threading Event that is not
set until that data has been written to the transport. This ensures that the
WSGI application *blocks* until the data is actually sent. The reason we
require this invariant is that the HTTP/2 server may choose to re-order some
data chunks for flow control reasons: that is, the application for stream X may
have actually written its data first, but the server may elect to send the data
for stream Y first. This means that it's vital that there not be *two* writes
for stream X active at any one point or they may get reordered, which would be
particularly terrible.

Thus, the server must cooperate to ensure that each threading event only fires
when the *complete* data for that event has been written to the asyncio
transport. Any earlier will cause untold craziness.
"""
import asyncio
import importlib
import queue
import ssl
import sys
import threading

from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
    DataReceived, RequestReceived, WindowUpdated, StreamEnded, StreamReset
)


# Used to signal that a request has completed.
#
# This is a convenient way to do "in-band" signaling of stream completion
# without doing anything so heavyweight as using a class. Essentially, we can
# test identity against this empty object. In fact, this is so convenient that
# we use this object for all streams, for data in both directions: in and out.
END_DATA_SENTINEL = object()

# The WSGI callable. Stored here so that the protocol instances can get hold
# of the data.
APPLICATION = None


class H2Protocol(asyncio.Protocol):
    def __init__(self):
        config = H2Configuration(client_side=False, header_encoding='utf-8')

        # Our server-side state machine.
        self.conn = H2Connection(config=config)

        # The backing transport.
        self.transport = None

        # A dictionary of ``Stream`` objects, keyed by their stream ID. This
        # makes it easy to route data to the correct WSGI application instance.
        self.streams = {}

        # A queue of data emitted by WSGI applications that has not yet been
        # sent. Each stream may only have one chunk of data in either this
        # queue or the flow_controlled_data dictionary at any one time.
        self._stream_data = asyncio.Queue()

        # Data that has been pulled off the queue that is for a stream blocked
        # behind flow control limitations. This is used to avoid spinning on
        # _stream_data queue when a stream cannot have its data sent. Data that
        # cannot be sent on the connection when it is popped off the queue gets
        # placed here until the stream flow control window opens up again.
        self._flow_controlled_data = {}

        # A reference to the loop in which this protocol runs. This is needed
        # to synchronise up with background threads.
        self._loop = asyncio.get_event_loop()

        # Any streams that have been remotely reset. We keep track of these to
        # ensure that we don't emit data from a WSGI application whose stream
        # has been cancelled.
        self._reset_streams = set()

        # Keep track of the loop sending task so we can kill it when the
        # connection goes away.
        self._send_loop_task = None

    def connection_made(self, transport):
        """
        The connection has been made. Here we need to save off our transport,
        do basic HTTP/2 connection setup, and then start our data writing
        coroutine.
        """
        self.transport = transport
        self.conn.initiate_connection()
        self.transport.write(self.conn.data_to_send())
        self._send_loop_task = self._loop.create_task(self.sending_loop())

    def connection_lost(self, exc):
        """
        With the end of the connection, we just want to cancel our data sending
        coroutine.
        """
        self._send_loop_task.cancel()

    def data_received(self, data):
        """
        Process inbound data.
        """
        events = self.conn.receive_data(data)

        for event in events:
            if isinstance(event, RequestReceived):
                self.request_received(event)
            elif isinstance(event, DataReceived):
                self.data_frame_received(event)
            elif isinstance(event, WindowUpdated):
                self.window_opened(event)
            elif isinstance(event, StreamEnded):
                self.end_stream(event)
            elif isinstance(event, StreamReset):
                self.reset_stream(event)

        outbound_data = self.conn.data_to_send()
        if outbound_data:
            self.transport.write(outbound_data)

    def window_opened(self, event):
        """
        The flow control window got opened.

        This is important because it's possible that we were unable to send
        some WSGI data because the flow control window was too small. If that
        happens, the sending_loop coroutine starts buffering data.

        As the window gets opened, we need to unbuffer the data. We do that by
        placing the data chunks back on the back of the send queue and letting
        the sending loop take another shot at sending them.

        This system only works because we require that each stream only have
        *one* data chunk in the sending queue at any time. The threading events
        force this invariant to remain true.
        """
        if event.stream_id:
            # This is specific to a single stream.
            if event.stream_id in self._flow_controlled_data:
                self._stream_data.put_nowait(
                    self._flow_controlled_data.pop(event.stream_id)
                )
        else:
            # This event is specific to the connection. Free up *all* the
            # streams. This is a bit tricky, but we *must not* yield the flow
            # of control here or it all goes wrong.
            for data in self._flow_controlled_data.values():
                self._stream_data.put_nowait(data)

            self._flow_controlled_data = {}

    @asyncio.coroutine
    def sending_loop(self):
        """
        A call that loops forever, attempting to send data. This sending loop
        contains most of the flow-control smarts of this class: it pulls data
        off of the asyncio queue and then attempts to send it.

        The difficulties here are all around flow control. Specifically, a
        chunk of data may be too large to send. In this case, what will happen
        is that this coroutine will attempt to send what it can and will then
        store the unsent data locally. When a flow control event comes in that
        data will be freed up and placed back onto the asyncio queue, causing
        it to pop back up into the sending logic of this coroutine.

        This method explicitly *does not* handle HTTP/2 priority. That adds an
        extra layer of complexity to what is already a fairly complex method,
        and we'll look at how to do it another time.

        This coroutine explicitly *does not end*.
        """
        while True:
            stream_id, data, event = yield from self._stream_data.get()

            # If this stream got reset, just drop the data on the floor. Note
            # that we need to reset the event here to make sure that
            # application doesn't lock up.
            if stream_id in self._reset_streams:
                event.set()

            # Check if the body is done. If it is, this is really easy! Again,
            # we *must* set the event here or the application will lock up.
            if data is END_DATA_SENTINEL:
                self.conn.end_stream(stream_id)
                self.transport.write(self.conn.data_to_send())
                event.set()
                continue

            # We need to send data, but not to exceed the flow control window.
            # For that reason, grab only the data that fits: we'll buffer the
            # rest.
            window_size = self.conn.local_flow_control_window(stream_id)
            chunk_size = min(window_size, len(data))
            data_to_send = data[:chunk_size]
            data_to_buffer = data[chunk_size:]

            if data_to_send:
                # There's a maximum frame size we have to respect. Because we
                # aren't paying any attention to priority here, we can quite
                # safely just split this string up into chunks of max frame
                # size and blast them out.
                #
                # In a *real* application you'd want to consider priority here.
                max_size = self.conn.max_outbound_frame_size
                chunks = (
                    data_to_send[x:x+max_size]
                    for x in range(0, len(data_to_send), max_size)
                )
                for chunk in chunks:
                    self.conn.send_data(stream_id, chunk)
                self.transport.write(self.conn.data_to_send())

            # If there's data left to buffer, we should do that. Put it in a
            # dictionary and *don't set the event*: the app must not generate
            # any more data until we got rid of all of this data.
            if data_to_buffer:
                self._flow_controlled_data[stream_id] = (
                    stream_id, data_to_buffer, event
                )
            else:
                # We sent everything. We can let the WSGI app progress.
                event.set()

    def request_received(self, event):
        """
        A HTTP/2 request has been received. We need to invoke the WSGI
        application in a background thread to handle it.
        """
        # First, we are going to want an object to hold all the relevant state
        # for this request/response. For that, we have a stream object. We
        # need to store the stream object somewhere reachable for when data
        # arrives later.
        s = Stream(event.stream_id, self)
        self.streams[event.stream_id] = s

        # Next, we need to build the WSGI environ dictionary.
        environ = _build_environ_dict(event.headers, s)

        # Finally, we want to throw these arguments out to a threadpool and
        # let it run.
        self._loop.run_in_executor(
            None,
            s.run_in_threadpool,
            APPLICATION,
            environ,
        )

    def data_frame_received(self, event):
        """
        Data has been received by WSGI server and needs to be dispatched to a
        running application.

        Note that the flow control window is not modified here. That's
        deliberate: see Stream.__next__ for a longer discussion of why.
        """
        # Grab the stream in question from our dictionary and pass it on.
        stream = self.streams[event.stream_id]
        stream.receive_data(event.data, event.flow_controlled_length)

    def end_stream(self, event):
        """
        The stream data is complete.
        """
        stream = self.streams[event.stream_id]
        stream.request_complete()

    def reset_stream(self, event):
        """
        A stream got forcefully reset.

        This is a tricky thing to deal with because WSGI doesn't really have a
        good notion for it. Essentially, you have to let the application run
        until completion, but not actually let it send any data.

        We do that by discarding any data we currently have for it, and then
        marking the stream as reset to allow us to spot when that stream is
        trying to send data and drop that data on the floor.

        We then *also* signal the WSGI application that no more data is
        incoming, to ensure that it does not attempt to do further reads of the
        data.
        """
        if event.stream_id in self._flow_controlled_data:
            del self._flow_controlled_data

        self._reset_streams.add(event.stream_id)
        self.end_stream(event)

    def data_for_stream(self, stream_id, data):
        """
        Thread-safe method called from outside the main asyncio thread in order
        to send data on behalf of a WSGI application.

        Places data being written by a stream on an asyncio queue. Returns a
        threading event that will fire when that data is sent.
        """
        event = threading.Event()
        self._loop.call_soon_threadsafe(
            self._stream_data.put_nowait,
            (stream_id, data, event)
        )
        return event

    def send_response(self, stream_id, headers):
        """
        Thread-safe method called from outside the main asyncio thread in order
        to send the HTTP response headers on behalf of a WSGI application.

        Returns a threading event that will fire when the headers have been
        emitted to the network.
        """
        event = threading.Event()

        def _inner_send(stream_id, headers, event):
            self.conn.send_headers(stream_id, headers, end_stream=False)
            self.transport.write(self.conn.data_to_send())
            event.set()

        self._loop.call_soon_threadsafe(
            _inner_send,
            stream_id,
            headers,
            event
        )
        return event

    def open_flow_control_window(self, stream_id, increment):
        """
        Opens a flow control window for the given stream by the given amount.
        Called from a WSGI thread. Does not return an event because there's no
        need to block on this action, it may take place at any time.
        """
        def _inner_open(stream_id, increment):
            self.conn.increment_flow_control_window(increment, stream_id)
            self.conn.increment_flow_control_window(increment, None)
            self.transport.write(self.conn.data_to_send())

        self._loop.call_soon_threadsafe(
            _inner_open,
            stream_id,
            increment,
        )


class Stream:
    """
    This class holds all of the state for a single stream. It also provides
    several of the callables used by the WSGI application. Finally, it provides
    the logic for actually interfacing with the WSGI application.

    For these reasons, the object has *strict* requirements on thread-safety.
    While the object can be initialized in the main WSGI thread, the
    ``run_in_threadpool`` method *must* be called from outside that thread. At
    that point, the main WSGI thread may only call specific methods.
    """
    def __init__(self, stream_id, protocol):
        self.stream_id = stream_id
        self._protocol = protocol

        # Queue for data that has been received from the network. This is a
        # thread-safe queue, to allow both the WSGI application to block on
        # receiving more data and to allow the asyncio server to keep sending
        # more data.
        #
        # This queue is unbounded in size, but in practice it cannot contain
        # too much data because the flow control window doesn't get adjusted
        # unless data is removed from it.
        self._received_data = queue.Queue()

        # This buffer is used to hold partial chunks of data from
        # _received_data that were not returned out of ``read`` and friends.
        self._temp_buffer = b''

        # Temporary variables that allow us to keep hold of the headers and
        # response status until such time as the application needs us to send
        # them.
        self._response_status = b''
        self._response_headers = []
        self._headers_emitted = False

        # Whether the application has received all the data from the network
        # or not. This allows us to short-circuit some reads.
        self._complete = False

    def receive_data(self, data, flow_controlled_size):
        """
        Called by the H2Protocol when more data has been received from the
        network.

        Places the data directly on the queue in a thread-safe manner without
        blocking. Does not introspect or process the data.
        """
        self._received_data.put_nowait((data, flow_controlled_size))

    def request_complete(self):
        """
        Called by the H2Protocol when all the request data has been received.

        This works by placing the ``END_DATA_SENTINEL`` on the queue. The
        reading code knows, when it sees the ``END_DATA_SENTINEL``, to expect
        no more data from the network. This ensures that the state of the
        application only changes when it has finished processing the data from
        the network, even though the server may have long-since finished
        receiving all the data for this request.
        """
        self._received_data.put_nowait((END_DATA_SENTINEL, None))

    def run_in_threadpool(self, wsgi_application, environ):
        """
        This method should be invoked in a threadpool. At the point this method
        is invoked, the only safe methods to call from the original thread are
        ``receive_data`` and ``request_complete``: any other method is unsafe.

        This method handles the WSGI logic. It invokes the application callable
        in this thread, passing control over to the WSGI application. It then
        ensures that the data makes it back to the HTTP/2 connection via
        the thread-safe APIs provided below.
        """
        result = wsgi_application(environ, self.start_response)

        try:
            for data in result:
                self.write(data)
        finally:
            # This signals that we're done with data. The server will know that
            # this allows it to clean up its state: we're done here.
            self.write(END_DATA_SENTINEL)

    # The next few methods are called by the WSGI application. Firstly, the
    # three methods provided by the input stream.
    def read(self, size=None):
        """
        Called by the WSGI application to read data.

        This method is the one of two that explicitly pumps the input data
        queue, which means it deals with the ``_complete`` flag and the
        ``END_DATA_SENTINEL``.
        """
        # If we've already seen the END_DATA_SENTINEL, return immediately.
        if self._complete:
            return b''

        # If we've been asked to read everything, just iterate over ourselves.
        if size is None:
            return b''.join(self)

        # Otherwise, as long as we don't have enough data, spin looking for
        # another data chunk.
        data = b''
        while len(data) < size:
            try:
                chunk = next(self)
            except StopIteration:
                break

            # Concatenating strings this way is slow, but that's ok, this is
            # just a demo.
            data += chunk

        # We have *at least* enough data to return, but we may have too much.
        # If we do, throw it on a buffer: we'll use it later.
        to_return = data[:size]
        self._temp_buffer = data[size:]
        return to_return

    def readline(self, hint=None):
        """
        Called by the WSGI application to read a single line of data.

        This method rigorously observes the ``hint`` parameter: it will only
        ever read that much data. It then splits the data on a newline
        character and throws everything it doesn't need into a buffer.
        """
        data = self.read(hint)
        first_newline = data.find(b'\n')
        if first_newline == -1:
            # No newline, return all the data
            return data

        # We want to slice the data so that the head *includes* the first
        # newline. Then, any data left in this line we don't care about should
        # be prepended to the internal buffer.
        head, tail = data[:first_newline + 1], data[first_newline + 1:]
        self._temp_buffer = tail + self._temp_buffer

        return head

    def readlines(self, hint=None):
        """
        Called by the WSGI application to read several lines of data.

        This method is really pretty stupid. It rigorously observes the
        ``hint`` parameter, and quite happily returns the input split into
        lines.
        """
        # This method is *crazy inefficient*, but it's also a pretty stupid
        # method to call.
        data = self.read(hint)
        lines = data.split(b'\n')

        # Split removes the newline character, but we want it, so put it back.
        lines = [line + b'\n' for line in lines]

        # Except if the last character was a newline character we now have an
        # extra line that is just a newline: pull that out.
        if lines[-1] == b'\n':
            lines = lines[:-1]
        return lines

    def start_response(self, status, response_headers, exc_info=None):
        """
        This is the PEP-3333 mandated start_response callable.

        All it does is store the headers for later sending, and return our
        ```write`` callable.
        """
        if self._headers_emitted and exc_info is not None:
            raise exc_info[1].with_traceback(exc_info[2])

        assert not self._response_status or exc_info is not None
        self._response_status = status
        self._response_headers = response_headers

        return self.write

    def write(self, data):
        """
        Provides some data to write.

        This function *blocks* until such time as the data is allowed by
        HTTP/2 flow control. This allows a client to slow or pause the response
        as needed.

        This function is not supposed to be used, according to PEP-3333, but
        once we have it it becomes quite convenient to use it, so this app
        actually runs all writes through this function.
        """
        if not self._headers_emitted:
            self._emit_headers()
        event = self._protocol.data_for_stream(self.stream_id, data)
        event.wait()
        return

    def _emit_headers(self):
        """
        Sends the response headers.

        This is only called from the write callable and should only ever be
        called once. It does some minor processing (converts the status line
        into a status code because reason phrases are evil) and then passes
        the headers on to the server. This call explicitly blocks until the
        server notifies us that the headers have reached the network.
        """
        assert self._response_status and self._response_headers
        assert not self._headers_emitted
        self._headers_emitted = True

        # We only need the status code
        status = self._response_status.split(" ", 1)[0]
        headers = [(":status", status)]
        headers.extend(self._response_headers)
        event = self._protocol.send_response(self.stream_id, headers)
        event.wait()
        return

    # These two methods implement the iterator protocol. This allows a WSGI
    # application to iterate over this Stream object to get the data.
    def __iter__(self):
        return self

    def __next__(self):
        # If the complete request has been read, abort immediately.
        if self._complete:
            raise StopIteration()

        # If we have data stored in a temporary buffer for any reason, return
        # that and clear the buffer.
        #
        # This can actually only happen when the application uses one of the
        # read* callables, but that's fine.
        if self._temp_buffer:
            buffered_data = self._temp_buffer
            self._temp_buffer = b''
            return buffered_data

        # Otherwise, pull data off the queue (blocking as needed). If this is
        # the end of the request, we're done here: mark ourselves as complete
        # and call it time. Otherwise, open the flow control window an
        # appropriate amount and hand the chunk off.
        chunk, chunk_size = self._received_data.get()
        if chunk is END_DATA_SENTINEL:
            self._complete = True
            raise StopIteration()

        # Let's talk a little bit about why we're opening the flow control
        # window *here*, and not in the server thread.
        #
        # The purpose of HTTP/2 flow control is to allow for servers and
        # clients to avoid needing to buffer data indefinitely because their
        # peer is producing data faster than they can consume it. As a result,
        # it's important that the flow control window be opened as late in the
        # processing as possible. In this case, we open the flow control window
        # exactly when the server hands the data to the application. This means
        # that the flow control window essentially signals to the remote peer
        # how much data hasn't even been *seen* by the application yet.
        #
        # If you wanted to be really clever you could consider not opening the
        # flow control window until the application asks for the *next* chunk
        # of data. That means that any buffers at the application level are now
        # included in the flow control window processing. In my opinion, the
        # advantage of that process does not outweigh the extra logical
        # complexity involved in doing it, so we don't bother here.
        #
        # Another note: you'll notice that we don't include the _temp_buffer in
        # our flow control considerations. This means you could in principle
        # lead us to buffer slightly more than one connection flow control
        # window's worth of data. That risk is considered acceptable for the
        # much simpler logic available here.
        #
        # Finally, this is a pretty dumb flow control window management scheme:
        # it causes us to emit a *lot* of window updates. A smarter server
        # would want to use the content-length header to determine whether
        # flow control window updates need to be emitted at all, and then to be
        # more efficient about emitting them to avoid firing them off really
        # frequently. For an example like this, there's very little gained by
        # worrying about that.
        self._protocol.open_flow_control_window(self.stream_id, chunk_size)

        return chunk


def _build_environ_dict(headers, stream):
    """
    Build the WSGI environ dictionary for a given request. To do that, we'll
    temporarily create a dictionary for the headers. While this isn't actually
    a valid way to represent headers, we know that the special headers we need
    can only have one appearance in the block.

    This code is arguably somewhat incautious: the conversion to dictionary
    should only happen in a way that allows us to correctly join headers that
    appear multiple times. That's acceptable in a demo app: in a productised
    version you'd want to fix it.
    """
    header_dict = dict(headers)
    path = header_dict.pop(u':path')
    try:
        path, query = path.split(u'?', 1)
    except ValueError:
        query = u""
    server_name = header_dict.pop(u':authority')
    try:
        server_name, port = server_name.split(u':', 1)
    except ValueError as e:
        port = "8443"

    environ = {
        u'REQUEST_METHOD': header_dict.pop(u':method'),
        u'SCRIPT_NAME': u'',
        u'PATH_INFO': path,
        u'QUERY_STRING': query,
        u'SERVER_NAME': server_name,
        u'SERVER_PORT': port,
        u'SERVER_PROTOCOL': u'HTTP/2',
        u'HTTPS': u"on",
        u'SSL_PROTOCOL': u'TLSv1.2',
        u'wsgi.version': (1, 0),
        u'wsgi.url_scheme': header_dict.pop(u':scheme'),
        u'wsgi.input': stream,
        u'wsgi.errors': sys.stderr,
        u'wsgi.multithread': True,
        u'wsgi.multiprocess': False,
        u'wsgi.run_once': False,
    }
    if u'content-type' in header_dict:
        environ[u'CONTENT_TYPE'] = header_dict[u'content-type']
    if u'content-length' in header_dict:
        environ[u'CONTENT_LENGTH'] = header_dict[u'content-length']
    for name, value in header_dict.items():
        environ[u'HTTP_' + name.upper()] = value
    return environ


# Set up the WSGI app.
application_string = sys.argv[1]
path, func = application_string.split(':', 1)
module = importlib.import_module(path)
APPLICATION = getattr(module, func)

# Set up TLS
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (
    ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
)
ssl_context.set_ciphers("ECDHE+AESGCM")
ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
ssl_context.set_alpn_protocols(["h2"])

# Do the asnycio bits
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()