edq.net.exchangeserver

  1import glob
  2import http.server
  3import logging
  4import os
  5import threading
  6import time
  7import typing
  8
  9import edq.net.exchange
 10import edq.util.dirent
 11
 12_logger = logging.getLogger(__name__)
 13
 14SERVER_THREAD_START_WAIT_SEC: float = 0.02
 15SERVER_THREAD_REAP_WAIT_SEC: float = 0.15
 16
 17class HTTPExchangeServer():
 18    """
 19    An HTTP server meant for testing.
 20    This server is generally meant to already know about all the requests that will be made to it,
 21    and all the responses it should make in reaction to those respective requests.
 22    This allows the server to respond very quickly, and makes it ideal for testing.
 23    This makes it easy to mock external services for testing.
 24
 25    If a request is not found in the predefined requests,
 26    then missing_request() will be called.
 27    If a response is still not available (indicated by a None return from missing_request()),
 28    then an error will be raised.
 29    """
 30
 31    def __init__(self,
 32            port: typing.Union[int, None] = None,
 33            match_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 34            default_match_options: bool = True,
 35            verbose: bool = False,
 36            raise_on_404: bool = False,
 37            **kwargs: typing.Any) -> None:
 38        self.port: typing.Union[int, None] = port
 39        """
 40        The active port this server is using.
 41        If None, then a random port will be chosen when the server is started and this field will be populated.
 42        """
 43
 44        self._http_server: typing.Union[http.server.HTTPServer, None] = None
 45        """ The HTTP server listening for connections. """
 46
 47        self._thread: typing.Union[threading.Thread, None] = None
 48        """ The thread running the HTTP server. """
 49
 50        self._run_lock: threading.Lock = threading.Lock()
 51        """ A lock that the server holds while running. """
 52
 53        self.verbose: bool = verbose
 54        """ Log more information. """
 55
 56        self.raise_on_404: bool = raise_on_404
 57        """ Raise an exception when no exchange is matched (instead of a 404 error). """
 58
 59        self._exchanges: typing.Dict[str, typing.Dict[typing.Union[str, None], typing.Dict[str, typing.List[edq.net.exchange.HTTPExchange]]]] = {}
 60        """
 61        The HTTP exchanges (requests+responses) that this server knows about.
 62        Exchanges are stored in layers to help make errors for missing requests easier.
 63        Exchanges are stored as: {url_path: {anchor: {method: [exchange, ...]}, ...}, ...}.
 64        """
 65
 66        if (match_options is None):
 67            match_options = {}
 68
 69        if (default_match_options):
 70            if ('headers_to_skip' not in match_options):
 71                match_options['headers_to_skip'] = []
 72
 73            match_options['headers_to_skip'] += edq.net.exchange.DEFAULT_EXCHANGE_IGNORE_HEADERS
 74
 75        self.match_options: typing.Dict[str, typing.Any] = match_options.copy()
 76        """ Options to use when matching HTTP exchanges. """
 77
 78    def get_exchanges(self) -> typing.List[edq.net.exchange.HTTPExchange]:
 79        """
 80        Get a shallow list of all the exchanges in this server.
 81        Ordering is not guaranteed.
 82        """
 83
 84        exchanges = []
 85
 86        for url_exchanges in self._exchanges.values():
 87            for anchor_exchanges in url_exchanges.values():
 88                for method_exchanges in anchor_exchanges.values():
 89                    exchanges += method_exchanges
 90
 91        return exchanges
 92
 93    def start(self) -> None:
 94        """ Start this server in a thread and set the active port. """
 95
 96        class NestedHTTPHandler(_TestHTTPHandler):
 97            """ An HTTP handler as a nested class to bind this server object to the handler. """
 98
 99            _server = self
100            _verbose = self.verbose
101            _raise_on_404 = self.raise_on_404
102            _missing_request_func = self.missing_request
103
104        if (self.port is None):
105            self.port = edq.net.util.find_open_port()
106
107        self._http_server = http.server.HTTPServer(('', self.port), NestedHTTPHandler)
108
109        if (self.verbose):
110            _logger.info("Starting test server on port %d.", self.port)
111
112        # Use a barrier to ensure that the server thread has started.
113        server_startup_barrier = threading.Barrier(2)
114
115        def _run_server(server: 'HTTPExchangeServer', server_startup_barrier: threading.Barrier) -> None:
116            server_startup_barrier.wait()
117
118            if (server._http_server is None):
119                raise ValueError('Server was not initialized.')
120
121            # Run the server within the run lock context.
122            with server._run_lock:
123                server._http_server.serve_forever(poll_interval = 0.01)
124                server._http_server.server_close()
125
126            if (self.verbose):
127                _logger.info("Stopping test server.")
128
129        self._thread = threading.Thread(target = _run_server, args = (self, server_startup_barrier))
130        self._thread.start()
131
132        # Wait for the server to startup.
133        server_startup_barrier.wait()
134        time.sleep(SERVER_THREAD_START_WAIT_SEC)
135
136    def wait_for_completion(self) -> None:
137        """
138        Block until the server is not running.
139        If called while the server is not running, this will return immediately.
140        This function will handle keyboard interrupts (Ctrl-C).
141        """
142
143        try:
144            with self._run_lock:
145                pass
146        except KeyboardInterrupt:
147            self.stop()
148            self.wait_for_completion()
149
150    def start_and_wait(self) -> None:
151        """ Start the server and block until it is done. """
152
153        self.start()
154        self.wait_for_completion()
155
156    def stop(self) -> None:
157        """ Stop this server. """
158
159        self.port = None
160
161        if (self._http_server is not None):
162            self._http_server.shutdown()
163            self._http_server = None
164
165        if (self._thread is not None):
166            if (self._thread.is_alive()):
167                self._thread.join(SERVER_THREAD_REAP_WAIT_SEC)
168
169            self._thread = None
170
171    def missing_request(self, query: edq.net.exchange.HTTPExchange) -> typing.Union[edq.net.exchange.HTTPExchange, None]:
172        """
173        Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request
174        before the server raises an exception.
175        Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method.
176
177        Exchanges returned from this method are not cached/saved.
178        """
179
180        return None
181
182    def modify_exchanges(self, exchanges: typing.List[edq.net.exchange.HTTPExchange]) -> typing.List[edq.net.exchange.HTTPExchange]:
183        """
184        Modify any exchanges before they are saved into this server's cache.
185        The returned exchanges will be saved in this server's cache.
186
187        This method may be called multiple times with different collections of exchanges.
188        """
189
190        return exchanges
191
192    def lookup_exchange(self,
193            query: edq.net.exchange.HTTPExchange,
194            match_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
195            ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]:
196        """
197        Lookup the query exchange to see if it exists in this server.
198        If a match exists, the matching exchange (likely a full version of the query) will be returned along with None.
199        If a match does not exist, a None will be returned along with a message indicating how the query missed
200        (e.g., the URL was matched, but the method was not).
201        """
202
203        if (match_options is None):
204            match_options = {}
205
206        hint_display = query.url_path
207        target: typing.Any = self._exchanges
208
209        if (query.url_path not in target):
210            return None, f"Could not find matching URL path for '{hint_display}'."
211
212        hint_display = query.url_path
213        if (query.url_anchor is not None):
214            hint_display = f"{query.url_path}#{query.url_anchor}"
215
216        target = target[query.url_path]
217
218        if (query.url_anchor not in target):
219            return None, f"Found URL path, but could not find matching anchor for '{hint_display}'."
220
221        hint_display = f"{hint_display} ({query.method})"
222        target = target[query.url_anchor]
223
224        if (query.method not in target):
225            return None, f"Found URL, but could not find matching method for '{hint_display}'."
226
227        params = list(sorted(query.parameters.keys()))
228        hint_display = f"{hint_display}, (param keys = {params})"
229        target = target[query.method]
230
231        full_match_options = self.match_options.copy()
232        full_match_options.update(match_options)
233
234        hints = []
235        matches = []
236
237        for (i, exchange) in enumerate(target):
238            match, hint = exchange.match(query, **full_match_options)
239            if (match):
240                matches.append(exchange)
241                continue
242
243            # Collect hints for non-matches.
244            label = exchange.source_path
245            if (label is None):
246                label = str(i)
247
248            hints.append(f"{label}: {hint}")
249
250        if (len(matches) == 1):
251            # Found exactly one match.
252            return matches[0], None
253
254        if (len(matches) > 1):
255            # Found multiple matches.
256            match_paths = list(sorted([match.source_path for match in matches]))
257            return None, f"Found multiple matching exchanges for '{hint_display}': {match_paths}."
258
259        # Found no matches.
260        return None, f"Found matching URL and method, but could not find matching headers/params for '{hint_display}' (non-matching hints: {hints})."
261
262    def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None:
263        """ Load an exchange into this server. """
264
265        if (exchange is None):
266            raise ValueError("Cannot load a None exchange.")
267
268        target: typing.Any = self._exchanges
269        if (exchange.url_path not in target):
270            target[exchange.url_path] = {}
271
272        target = target[exchange.url_path]
273        if (exchange.url_anchor not in target):
274            target[exchange.url_anchor] = {}
275
276        target = target[exchange.url_anchor]
277        if (exchange.method not in target):
278            target[exchange.method] = []
279
280        target = target[exchange.method]
281        target.append(exchange)
282
283    def load_exchange_file(self, path: str) -> None:
284        """
285        Load an exchange from a file.
286        This will also handle setting the exchanges source path and resolving the exchange's paths.
287        """
288
289        self.load_exchange(edq.net.exchange.HTTPExchange.from_path(path))
290
291    def load_exchanges_dir(self, base_dir: str, extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION) -> None:
292        """ Load all exchanges found (recursively) within a directory. """
293
294        paths = list(sorted(glob.glob(os.path.join(base_dir, "**", f"*{extension}"), recursive = True)))
295        for path in paths:
296            self.load_exchange_file(path)
297
298@typing.runtime_checkable
299class MissingRequestFunction(typing.Protocol):
300    """
301    A function that can be used to create an exchange when none was found.
302    This is often the last resort when a test server cannot find an exchange matching a request.
303    """
304
305    def __call__(self,
306            query: edq.net.exchange.HTTPExchange,
307            ) -> typing.Union[edq.net.exchange.HTTPExchange, None]:
308        """
309        Create an exchange for the given query or return None.
310        """
311
312class _TestHTTPHandler(http.server.BaseHTTPRequestHandler):
313    _server: typing.Union[HTTPExchangeServer, None] = None
314    """ The test server this handler is being used for. """
315
316    _verbose: bool = False
317    """ Log more interactions. """
318
319    _raise_on_404: bool = True
320    """ Raise an exception when no exchange is matched (instead of a 404 error). """
321
322    _missing_request_func: typing.Union[MissingRequestFunction, None] = None
323    """ A fallback to get an exchange before resulting in a 404. """
324
325    # Quiet logs.
326    def log_message(self, format: str, *args: typing.Any) -> None:  # pylint: disable=redefined-builtin
327        pass
328
329    def do_DELETE(self) -> None:  # pylint: disable=invalid-name
330        """ A handler for DELETE requests. """
331
332        self._do_request('DELETE')
333
334    def do_GET(self) -> None:  # pylint: disable=invalid-name
335        """ A handler for GET requests. """
336
337        self._do_request('GET')
338
339    def do_HEAD(self) -> None:  # pylint: disable=invalid-name
340        """ A handler for HEAD requests. """
341
342        self._do_request('HEAD')
343
344    def do_OPTIONS(self) -> None:  # pylint: disable=invalid-name
345        """ A handler for OPTIONS requests. """
346
347        self._do_request('OPTIONS')
348
349    def do_PATCH(self) -> None:  # pylint: disable=invalid-name
350        """ A handler for PATCH requests. """
351
352        self._do_request('PATCH')
353
354    def do_POST(self) -> None:  # pylint: disable=invalid-name
355        """ A handler for POST requests. """
356
357        self._do_request('POST')
358
359    def do_PUT(self) -> None:  # pylint: disable=invalid-name
360        """ A handler for PUT requests. """
361
362        self._do_request('PUT')
363
364    def _do_request(self, method: str) -> None:
365        """ A common handler for multiple types of requests. """
366
367        if (self._server is None):
368            raise ValueError("Server has not been initialized.")
369
370        if (self._verbose):
371            _logger.debug("Incoming %s request: '%s'.", method, self.path)
372
373        # Parse data from the request url and body.
374        request_data, request_files = edq.net.util.parse_request_data(self.path, self.headers, self.rfile)
375
376        # Construct file info objects from the raw files.
377        files = [edq.net.exchange.FileInfo(name = name, content = content) for (name, content) in request_files.items()]
378
379        exchange, hint = self._get_exchange(method, parameters = request_data, files = files)  # type: ignore[arg-type]
380
381        if (exchange is None):
382            code = http.HTTPStatus.NOT_FOUND.value
383            headers = {}
384            payload = hint
385        else:
386            code = exchange.response_code
387            headers = exchange.response_headers
388            payload = exchange.response_body
389
390        if (payload is None):
391            payload = ''
392
393        self.send_response(code)
394        for (key, value) in headers.items():
395            self.send_header(key, value)
396        self.end_headers()
397
398        self.wfile.write(payload.encode(edq.util.dirent.DEFAULT_ENCODING))
399
400    def _get_exchange(self, method: str,
401            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
402            files: typing.Union[typing.List[typing.Union[edq.net.exchange.FileInfo, typing.Dict[str, str]]], None] = None,
403            ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]:
404        """ Get the matching exchange or raise an error. """
405
406        if (self._server is None):
407            raise ValueError("Server has not been initialized.")
408
409        query = edq.net.exchange.HTTPExchange(method = method,
410                url = self.path,
411                url_anchor = self.headers.get(edq.net.exchange.ANCHOR_HEADER_KEY, None),
412                headers = self.headers,  # type: ignore[arg-type]
413                parameters = parameters, files = files)
414
415        exchange, hint = self._server.lookup_exchange(query)
416
417        if ((exchange is None) and (self._missing_request_func is not None)):
418            exchange = self._missing_request_func(query)  # pylint: disable=not-callable
419
420        if ((exchange is None) and self._raise_on_404):
421            raise ValueError(f"Failed to lookup exchange: '{hint}'.")
422
423        return exchange, hint
SERVER_THREAD_START_WAIT_SEC: float = 0.02
SERVER_THREAD_REAP_WAIT_SEC: float = 0.15
class HTTPExchangeServer:
 18class HTTPExchangeServer():
 19    """
 20    An HTTP server meant for testing.
 21    This server is generally meant to already know about all the requests that will be made to it,
 22    and all the responses it should make in reaction to those respective requests.
 23    This allows the server to respond very quickly, and makes it ideal for testing.
 24    This makes it easy to mock external services for testing.
 25
 26    If a request is not found in the predefined requests,
 27    then missing_request() will be called.
 28    If a response is still not available (indicated by a None return from missing_request()),
 29    then an error will be raised.
 30    """
 31
 32    def __init__(self,
 33            port: typing.Union[int, None] = None,
 34            match_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 35            default_match_options: bool = True,
 36            verbose: bool = False,
 37            raise_on_404: bool = False,
 38            **kwargs: typing.Any) -> None:
 39        self.port: typing.Union[int, None] = port
 40        """
 41        The active port this server is using.
 42        If None, then a random port will be chosen when the server is started and this field will be populated.
 43        """
 44
 45        self._http_server: typing.Union[http.server.HTTPServer, None] = None
 46        """ The HTTP server listening for connections. """
 47
 48        self._thread: typing.Union[threading.Thread, None] = None
 49        """ The thread running the HTTP server. """
 50
 51        self._run_lock: threading.Lock = threading.Lock()
 52        """ A lock that the server holds while running. """
 53
 54        self.verbose: bool = verbose
 55        """ Log more information. """
 56
 57        self.raise_on_404: bool = raise_on_404
 58        """ Raise an exception when no exchange is matched (instead of a 404 error). """
 59
 60        self._exchanges: typing.Dict[str, typing.Dict[typing.Union[str, None], typing.Dict[str, typing.List[edq.net.exchange.HTTPExchange]]]] = {}
 61        """
 62        The HTTP exchanges (requests+responses) that this server knows about.
 63        Exchanges are stored in layers to help make errors for missing requests easier.
 64        Exchanges are stored as: {url_path: {anchor: {method: [exchange, ...]}, ...}, ...}.
 65        """
 66
 67        if (match_options is None):
 68            match_options = {}
 69
 70        if (default_match_options):
 71            if ('headers_to_skip' not in match_options):
 72                match_options['headers_to_skip'] = []
 73
 74            match_options['headers_to_skip'] += edq.net.exchange.DEFAULT_EXCHANGE_IGNORE_HEADERS
 75
 76        self.match_options: typing.Dict[str, typing.Any] = match_options.copy()
 77        """ Options to use when matching HTTP exchanges. """
 78
 79    def get_exchanges(self) -> typing.List[edq.net.exchange.HTTPExchange]:
 80        """
 81        Get a shallow list of all the exchanges in this server.
 82        Ordering is not guaranteed.
 83        """
 84
 85        exchanges = []
 86
 87        for url_exchanges in self._exchanges.values():
 88            for anchor_exchanges in url_exchanges.values():
 89                for method_exchanges in anchor_exchanges.values():
 90                    exchanges += method_exchanges
 91
 92        return exchanges
 93
 94    def start(self) -> None:
 95        """ Start this server in a thread and set the active port. """
 96
 97        class NestedHTTPHandler(_TestHTTPHandler):
 98            """ An HTTP handler as a nested class to bind this server object to the handler. """
 99
100            _server = self
101            _verbose = self.verbose
102            _raise_on_404 = self.raise_on_404
103            _missing_request_func = self.missing_request
104
105        if (self.port is None):
106            self.port = edq.net.util.find_open_port()
107
108        self._http_server = http.server.HTTPServer(('', self.port), NestedHTTPHandler)
109
110        if (self.verbose):
111            _logger.info("Starting test server on port %d.", self.port)
112
113        # Use a barrier to ensure that the server thread has started.
114        server_startup_barrier = threading.Barrier(2)
115
116        def _run_server(server: 'HTTPExchangeServer', server_startup_barrier: threading.Barrier) -> None:
117            server_startup_barrier.wait()
118
119            if (server._http_server is None):
120                raise ValueError('Server was not initialized.')
121
122            # Run the server within the run lock context.
123            with server._run_lock:
124                server._http_server.serve_forever(poll_interval = 0.01)
125                server._http_server.server_close()
126
127            if (self.verbose):
128                _logger.info("Stopping test server.")
129
130        self._thread = threading.Thread(target = _run_server, args = (self, server_startup_barrier))
131        self._thread.start()
132
133        # Wait for the server to startup.
134        server_startup_barrier.wait()
135        time.sleep(SERVER_THREAD_START_WAIT_SEC)
136
137    def wait_for_completion(self) -> None:
138        """
139        Block until the server is not running.
140        If called while the server is not running, this will return immediately.
141        This function will handle keyboard interrupts (Ctrl-C).
142        """
143
144        try:
145            with self._run_lock:
146                pass
147        except KeyboardInterrupt:
148            self.stop()
149            self.wait_for_completion()
150
151    def start_and_wait(self) -> None:
152        """ Start the server and block until it is done. """
153
154        self.start()
155        self.wait_for_completion()
156
157    def stop(self) -> None:
158        """ Stop this server. """
159
160        self.port = None
161
162        if (self._http_server is not None):
163            self._http_server.shutdown()
164            self._http_server = None
165
166        if (self._thread is not None):
167            if (self._thread.is_alive()):
168                self._thread.join(SERVER_THREAD_REAP_WAIT_SEC)
169
170            self._thread = None
171
172    def missing_request(self, query: edq.net.exchange.HTTPExchange) -> typing.Union[edq.net.exchange.HTTPExchange, None]:
173        """
174        Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request
175        before the server raises an exception.
176        Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method.
177
178        Exchanges returned from this method are not cached/saved.
179        """
180
181        return None
182
183    def modify_exchanges(self, exchanges: typing.List[edq.net.exchange.HTTPExchange]) -> typing.List[edq.net.exchange.HTTPExchange]:
184        """
185        Modify any exchanges before they are saved into this server's cache.
186        The returned exchanges will be saved in this server's cache.
187
188        This method may be called multiple times with different collections of exchanges.
189        """
190
191        return exchanges
192
193    def lookup_exchange(self,
194            query: edq.net.exchange.HTTPExchange,
195            match_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
196            ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]:
197        """
198        Lookup the query exchange to see if it exists in this server.
199        If a match exists, the matching exchange (likely a full version of the query) will be returned along with None.
200        If a match does not exist, a None will be returned along with a message indicating how the query missed
201        (e.g., the URL was matched, but the method was not).
202        """
203
204        if (match_options is None):
205            match_options = {}
206
207        hint_display = query.url_path
208        target: typing.Any = self._exchanges
209
210        if (query.url_path not in target):
211            return None, f"Could not find matching URL path for '{hint_display}'."
212
213        hint_display = query.url_path
214        if (query.url_anchor is not None):
215            hint_display = f"{query.url_path}#{query.url_anchor}"
216
217        target = target[query.url_path]
218
219        if (query.url_anchor not in target):
220            return None, f"Found URL path, but could not find matching anchor for '{hint_display}'."
221
222        hint_display = f"{hint_display} ({query.method})"
223        target = target[query.url_anchor]
224
225        if (query.method not in target):
226            return None, f"Found URL, but could not find matching method for '{hint_display}'."
227
228        params = list(sorted(query.parameters.keys()))
229        hint_display = f"{hint_display}, (param keys = {params})"
230        target = target[query.method]
231
232        full_match_options = self.match_options.copy()
233        full_match_options.update(match_options)
234
235        hints = []
236        matches = []
237
238        for (i, exchange) in enumerate(target):
239            match, hint = exchange.match(query, **full_match_options)
240            if (match):
241                matches.append(exchange)
242                continue
243
244            # Collect hints for non-matches.
245            label = exchange.source_path
246            if (label is None):
247                label = str(i)
248
249            hints.append(f"{label}: {hint}")
250
251        if (len(matches) == 1):
252            # Found exactly one match.
253            return matches[0], None
254
255        if (len(matches) > 1):
256            # Found multiple matches.
257            match_paths = list(sorted([match.source_path for match in matches]))
258            return None, f"Found multiple matching exchanges for '{hint_display}': {match_paths}."
259
260        # Found no matches.
261        return None, f"Found matching URL and method, but could not find matching headers/params for '{hint_display}' (non-matching hints: {hints})."
262
263    def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None:
264        """ Load an exchange into this server. """
265
266        if (exchange is None):
267            raise ValueError("Cannot load a None exchange.")
268
269        target: typing.Any = self._exchanges
270        if (exchange.url_path not in target):
271            target[exchange.url_path] = {}
272
273        target = target[exchange.url_path]
274        if (exchange.url_anchor not in target):
275            target[exchange.url_anchor] = {}
276
277        target = target[exchange.url_anchor]
278        if (exchange.method not in target):
279            target[exchange.method] = []
280
281        target = target[exchange.method]
282        target.append(exchange)
283
284    def load_exchange_file(self, path: str) -> None:
285        """
286        Load an exchange from a file.
287        This will also handle setting the exchanges source path and resolving the exchange's paths.
288        """
289
290        self.load_exchange(edq.net.exchange.HTTPExchange.from_path(path))
291
292    def load_exchanges_dir(self, base_dir: str, extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION) -> None:
293        """ Load all exchanges found (recursively) within a directory. """
294
295        paths = list(sorted(glob.glob(os.path.join(base_dir, "**", f"*{extension}"), recursive = True)))
296        for path in paths:
297            self.load_exchange_file(path)

An HTTP server meant for testing. This server is generally meant to already know about all the requests that will be made to it, and all the responses it should make in reaction to those respective requests. This allows the server to respond very quickly, and makes it ideal for testing. This makes it easy to mock external services for testing.

If a request is not found in the predefined requests, then missing_request() will be called. If a response is still not available (indicated by a None return from missing_request()), then an error will be raised.

HTTPExchangeServer( port: Optional[int] = None, match_options: Optional[Dict[str, Any]] = None, default_match_options: bool = True, verbose: bool = False, raise_on_404: bool = False, **kwargs: Any)
32    def __init__(self,
33            port: typing.Union[int, None] = None,
34            match_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
35            default_match_options: bool = True,
36            verbose: bool = False,
37            raise_on_404: bool = False,
38            **kwargs: typing.Any) -> None:
39        self.port: typing.Union[int, None] = port
40        """
41        The active port this server is using.
42        If None, then a random port will be chosen when the server is started and this field will be populated.
43        """
44
45        self._http_server: typing.Union[http.server.HTTPServer, None] = None
46        """ The HTTP server listening for connections. """
47
48        self._thread: typing.Union[threading.Thread, None] = None
49        """ The thread running the HTTP server. """
50
51        self._run_lock: threading.Lock = threading.Lock()
52        """ A lock that the server holds while running. """
53
54        self.verbose: bool = verbose
55        """ Log more information. """
56
57        self.raise_on_404: bool = raise_on_404
58        """ Raise an exception when no exchange is matched (instead of a 404 error). """
59
60        self._exchanges: typing.Dict[str, typing.Dict[typing.Union[str, None], typing.Dict[str, typing.List[edq.net.exchange.HTTPExchange]]]] = {}
61        """
62        The HTTP exchanges (requests+responses) that this server knows about.
63        Exchanges are stored in layers to help make errors for missing requests easier.
64        Exchanges are stored as: {url_path: {anchor: {method: [exchange, ...]}, ...}, ...}.
65        """
66
67        if (match_options is None):
68            match_options = {}
69
70        if (default_match_options):
71            if ('headers_to_skip' not in match_options):
72                match_options['headers_to_skip'] = []
73
74            match_options['headers_to_skip'] += edq.net.exchange.DEFAULT_EXCHANGE_IGNORE_HEADERS
75
76        self.match_options: typing.Dict[str, typing.Any] = match_options.copy()
77        """ Options to use when matching HTTP exchanges. """
port: Optional[int]

The active port this server is using. If None, then a random port will be chosen when the server is started and this field will be populated.

verbose: bool

Log more information.

raise_on_404: bool

Raise an exception when no exchange is matched (instead of a 404 error).

match_options: Dict[str, Any]

Options to use when matching HTTP exchanges.

def get_exchanges(self) -> List[edq.net.exchange.HTTPExchange]:
79    def get_exchanges(self) -> typing.List[edq.net.exchange.HTTPExchange]:
80        """
81        Get a shallow list of all the exchanges in this server.
82        Ordering is not guaranteed.
83        """
84
85        exchanges = []
86
87        for url_exchanges in self._exchanges.values():
88            for anchor_exchanges in url_exchanges.values():
89                for method_exchanges in anchor_exchanges.values():
90                    exchanges += method_exchanges
91
92        return exchanges

Get a shallow list of all the exchanges in this server. Ordering is not guaranteed.

def start(self) -> None:
 94    def start(self) -> None:
 95        """ Start this server in a thread and set the active port. """
 96
 97        class NestedHTTPHandler(_TestHTTPHandler):
 98            """ An HTTP handler as a nested class to bind this server object to the handler. """
 99
100            _server = self
101            _verbose = self.verbose
102            _raise_on_404 = self.raise_on_404
103            _missing_request_func = self.missing_request
104
105        if (self.port is None):
106            self.port = edq.net.util.find_open_port()
107
108        self._http_server = http.server.HTTPServer(('', self.port), NestedHTTPHandler)
109
110        if (self.verbose):
111            _logger.info("Starting test server on port %d.", self.port)
112
113        # Use a barrier to ensure that the server thread has started.
114        server_startup_barrier = threading.Barrier(2)
115
116        def _run_server(server: 'HTTPExchangeServer', server_startup_barrier: threading.Barrier) -> None:
117            server_startup_barrier.wait()
118
119            if (server._http_server is None):
120                raise ValueError('Server was not initialized.')
121
122            # Run the server within the run lock context.
123            with server._run_lock:
124                server._http_server.serve_forever(poll_interval = 0.01)
125                server._http_server.server_close()
126
127            if (self.verbose):
128                _logger.info("Stopping test server.")
129
130        self._thread = threading.Thread(target = _run_server, args = (self, server_startup_barrier))
131        self._thread.start()
132
133        # Wait for the server to startup.
134        server_startup_barrier.wait()
135        time.sleep(SERVER_THREAD_START_WAIT_SEC)

Start this server in a thread and set the active port.

def wait_for_completion(self) -> None:
137    def wait_for_completion(self) -> None:
138        """
139        Block until the server is not running.
140        If called while the server is not running, this will return immediately.
141        This function will handle keyboard interrupts (Ctrl-C).
142        """
143
144        try:
145            with self._run_lock:
146                pass
147        except KeyboardInterrupt:
148            self.stop()
149            self.wait_for_completion()

Block until the server is not running. If called while the server is not running, this will return immediately. This function will handle keyboard interrupts (Ctrl-C).

def start_and_wait(self) -> None:
151    def start_and_wait(self) -> None:
152        """ Start the server and block until it is done. """
153
154        self.start()
155        self.wait_for_completion()

Start the server and block until it is done.

def stop(self) -> None:
157    def stop(self) -> None:
158        """ Stop this server. """
159
160        self.port = None
161
162        if (self._http_server is not None):
163            self._http_server.shutdown()
164            self._http_server = None
165
166        if (self._thread is not None):
167            if (self._thread.is_alive()):
168                self._thread.join(SERVER_THREAD_REAP_WAIT_SEC)
169
170            self._thread = None

Stop this server.

def missing_request( self, query: edq.net.exchange.HTTPExchange) -> Optional[edq.net.exchange.HTTPExchange]:
172    def missing_request(self, query: edq.net.exchange.HTTPExchange) -> typing.Union[edq.net.exchange.HTTPExchange, None]:
173        """
174        Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request
175        before the server raises an exception.
176        Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method.
177
178        Exchanges returned from this method are not cached/saved.
179        """
180
181        return None

Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request before the server raises an exception. Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method.

Exchanges returned from this method are not cached/saved.

def modify_exchanges( self, exchanges: List[edq.net.exchange.HTTPExchange]) -> List[edq.net.exchange.HTTPExchange]:
183    def modify_exchanges(self, exchanges: typing.List[edq.net.exchange.HTTPExchange]) -> typing.List[edq.net.exchange.HTTPExchange]:
184        """
185        Modify any exchanges before they are saved into this server's cache.
186        The returned exchanges will be saved in this server's cache.
187
188        This method may be called multiple times with different collections of exchanges.
189        """
190
191        return exchanges

Modify any exchanges before they are saved into this server's cache. The returned exchanges will be saved in this server's cache.

This method may be called multiple times with different collections of exchanges.

def lookup_exchange( self, query: edq.net.exchange.HTTPExchange, match_options: Optional[Dict[str, Any]] = None) -> Tuple[Optional[edq.net.exchange.HTTPExchange], Optional[str]]:
193    def lookup_exchange(self,
194            query: edq.net.exchange.HTTPExchange,
195            match_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
196            ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]:
197        """
198        Lookup the query exchange to see if it exists in this server.
199        If a match exists, the matching exchange (likely a full version of the query) will be returned along with None.
200        If a match does not exist, a None will be returned along with a message indicating how the query missed
201        (e.g., the URL was matched, but the method was not).
202        """
203
204        if (match_options is None):
205            match_options = {}
206
207        hint_display = query.url_path
208        target: typing.Any = self._exchanges
209
210        if (query.url_path not in target):
211            return None, f"Could not find matching URL path for '{hint_display}'."
212
213        hint_display = query.url_path
214        if (query.url_anchor is not None):
215            hint_display = f"{query.url_path}#{query.url_anchor}"
216
217        target = target[query.url_path]
218
219        if (query.url_anchor not in target):
220            return None, f"Found URL path, but could not find matching anchor for '{hint_display}'."
221
222        hint_display = f"{hint_display} ({query.method})"
223        target = target[query.url_anchor]
224
225        if (query.method not in target):
226            return None, f"Found URL, but could not find matching method for '{hint_display}'."
227
228        params = list(sorted(query.parameters.keys()))
229        hint_display = f"{hint_display}, (param keys = {params})"
230        target = target[query.method]
231
232        full_match_options = self.match_options.copy()
233        full_match_options.update(match_options)
234
235        hints = []
236        matches = []
237
238        for (i, exchange) in enumerate(target):
239            match, hint = exchange.match(query, **full_match_options)
240            if (match):
241                matches.append(exchange)
242                continue
243
244            # Collect hints for non-matches.
245            label = exchange.source_path
246            if (label is None):
247                label = str(i)
248
249            hints.append(f"{label}: {hint}")
250
251        if (len(matches) == 1):
252            # Found exactly one match.
253            return matches[0], None
254
255        if (len(matches) > 1):
256            # Found multiple matches.
257            match_paths = list(sorted([match.source_path for match in matches]))
258            return None, f"Found multiple matching exchanges for '{hint_display}': {match_paths}."
259
260        # Found no matches.
261        return None, f"Found matching URL and method, but could not find matching headers/params for '{hint_display}' (non-matching hints: {hints})."

Lookup the query exchange to see if it exists in this server. If a match exists, the matching exchange (likely a full version of the query) will be returned along with None. If a match does not exist, a None will be returned along with a message indicating how the query missed (e.g., the URL was matched, but the method was not).

def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None:
263    def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None:
264        """ Load an exchange into this server. """
265
266        if (exchange is None):
267            raise ValueError("Cannot load a None exchange.")
268
269        target: typing.Any = self._exchanges
270        if (exchange.url_path not in target):
271            target[exchange.url_path] = {}
272
273        target = target[exchange.url_path]
274        if (exchange.url_anchor not in target):
275            target[exchange.url_anchor] = {}
276
277        target = target[exchange.url_anchor]
278        if (exchange.method not in target):
279            target[exchange.method] = []
280
281        target = target[exchange.method]
282        target.append(exchange)

Load an exchange into this server.

def load_exchange_file(self, path: str) -> None:
284    def load_exchange_file(self, path: str) -> None:
285        """
286        Load an exchange from a file.
287        This will also handle setting the exchanges source path and resolving the exchange's paths.
288        """
289
290        self.load_exchange(edq.net.exchange.HTTPExchange.from_path(path))

Load an exchange from a file. This will also handle setting the exchanges source path and resolving the exchange's paths.

def load_exchanges_dir(self, base_dir: str, extension: str = '.httpex.json') -> None:
292    def load_exchanges_dir(self, base_dir: str, extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION) -> None:
293        """ Load all exchanges found (recursively) within a directory. """
294
295        paths = list(sorted(glob.glob(os.path.join(base_dir, "**", f"*{extension}"), recursive = True)))
296        for path in paths:
297            self.load_exchange_file(path)

Load all exchanges found (recursively) within a directory.

@typing.runtime_checkable
class MissingRequestFunction(typing.Protocol):
299@typing.runtime_checkable
300class MissingRequestFunction(typing.Protocol):
301    """
302    A function that can be used to create an exchange when none was found.
303    This is often the last resort when a test server cannot find an exchange matching a request.
304    """
305
306    def __call__(self,
307            query: edq.net.exchange.HTTPExchange,
308            ) -> typing.Union[edq.net.exchange.HTTPExchange, None]:
309        """
310        Create an exchange for the given query or return None.
311        """

A function that can be used to create an exchange when none was found. This is often the last resort when a test server cannot find an exchange matching a request.

MissingRequestFunction(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)