edq.net.exchangeserver
1import glob 2import http.server 3import logging 4import os 5import threading 6import time 7import typing 8 9import edq.net.exchange 10import edq.util.dirent 11 12_logger = logging.getLogger(__name__) 13 14SERVER_THREAD_START_WAIT_SEC: float = 0.02 15SERVER_THREAD_REAP_WAIT_SEC: float = 0.15 16 17class HTTPExchangeServer(): 18 """ 19 An HTTP server meant for testing. 20 This server is generally meant to already know about all the requests that will be made to it, 21 and all the responses it should make in reaction to those respective requests. 22 This allows the server to respond very quickly, and makes it ideal for testing. 23 This makes it easy to mock external services for testing. 24 25 If a request is not found in the predefined requests, 26 then missing_request() will be called. 27 If a response is still not available (indicated by a None return from missing_request()), 28 then an error will be raised. 29 """ 30 31 def __init__(self, 32 port: typing.Union[int, None] = None, 33 match_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 34 default_match_options: bool = True, 35 verbose: bool = False, 36 raise_on_404: bool = False, 37 **kwargs: typing.Any) -> None: 38 self.port: typing.Union[int, None] = port 39 """ 40 The active port this server is using. 41 If None, then a random port will be chosen when the server is started and this field will be populated. 42 """ 43 44 self._http_server: typing.Union[http.server.HTTPServer, None] = None 45 """ The HTTP server listening for connections. """ 46 47 self._thread: typing.Union[threading.Thread, None] = None 48 """ The thread running the HTTP server. """ 49 50 self._run_lock: threading.Lock = threading.Lock() 51 """ A lock that the server holds while running. """ 52 53 self.verbose: bool = verbose 54 """ Log more information. """ 55 56 self.raise_on_404: bool = raise_on_404 57 """ Raise an exception when no exchange is matched (instead of a 404 error). """ 58 59 self._exchanges: typing.Dict[str, typing.Dict[typing.Union[str, None], typing.Dict[str, typing.List[edq.net.exchange.HTTPExchange]]]] = {} 60 """ 61 The HTTP exchanges (requests+responses) that this server knows about. 62 Exchanges are stored in layers to help make errors for missing requests easier. 63 Exchanges are stored as: {url_path: {anchor: {method: [exchange, ...]}, ...}, ...}. 64 """ 65 66 if (match_options is None): 67 match_options = {} 68 69 if (default_match_options): 70 if ('headers_to_skip' not in match_options): 71 match_options['headers_to_skip'] = [] 72 73 match_options['headers_to_skip'] += edq.net.exchange.DEFAULT_EXCHANGE_IGNORE_HEADERS 74 75 self.match_options: typing.Dict[str, typing.Any] = match_options.copy() 76 """ Options to use when matching HTTP exchanges. """ 77 78 def get_exchanges(self) -> typing.List[edq.net.exchange.HTTPExchange]: 79 """ 80 Get a shallow list of all the exchanges in this server. 81 Ordering is not guaranteed. 82 """ 83 84 exchanges = [] 85 86 for url_exchanges in self._exchanges.values(): 87 for anchor_exchanges in url_exchanges.values(): 88 for method_exchanges in anchor_exchanges.values(): 89 exchanges += method_exchanges 90 91 return exchanges 92 93 def start(self) -> None: 94 """ Start this server in a thread and set the active port. """ 95 96 class NestedHTTPHandler(_TestHTTPHandler): 97 """ An HTTP handler as a nested class to bind this server object to the handler. """ 98 99 _server = self 100 _verbose = self.verbose 101 _raise_on_404 = self.raise_on_404 102 _missing_request_func = self.missing_request 103 104 if (self.port is None): 105 self.port = edq.net.util.find_open_port() 106 107 self._http_server = http.server.HTTPServer(('', self.port), NestedHTTPHandler) 108 109 if (self.verbose): 110 _logger.info("Starting test server on port %d.", self.port) 111 112 # Use a barrier to ensure that the server thread has started. 113 server_startup_barrier = threading.Barrier(2) 114 115 def _run_server(server: 'HTTPExchangeServer', server_startup_barrier: threading.Barrier) -> None: 116 server_startup_barrier.wait() 117 118 if (server._http_server is None): 119 raise ValueError('Server was not initialized.') 120 121 # Run the server within the run lock context. 122 with server._run_lock: 123 server._http_server.serve_forever(poll_interval = 0.01) 124 server._http_server.server_close() 125 126 if (self.verbose): 127 _logger.info("Stopping test server.") 128 129 self._thread = threading.Thread(target = _run_server, args = (self, server_startup_barrier)) 130 self._thread.start() 131 132 # Wait for the server to startup. 133 server_startup_barrier.wait() 134 time.sleep(SERVER_THREAD_START_WAIT_SEC) 135 136 def wait_for_completion(self) -> None: 137 """ 138 Block until the server is not running. 139 If called while the server is not running, this will return immediately. 140 This function will handle keyboard interrupts (Ctrl-C). 141 """ 142 143 try: 144 with self._run_lock: 145 pass 146 except KeyboardInterrupt: 147 self.stop() 148 self.wait_for_completion() 149 150 def start_and_wait(self) -> None: 151 """ Start the server and block until it is done. """ 152 153 self.start() 154 self.wait_for_completion() 155 156 def stop(self) -> None: 157 """ Stop this server. """ 158 159 self.port = None 160 161 if (self._http_server is not None): 162 self._http_server.shutdown() 163 self._http_server = None 164 165 if (self._thread is not None): 166 if (self._thread.is_alive()): 167 self._thread.join(SERVER_THREAD_REAP_WAIT_SEC) 168 169 self._thread = None 170 171 def missing_request(self, query: edq.net.exchange.HTTPExchange) -> typing.Union[edq.net.exchange.HTTPExchange, None]: 172 """ 173 Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request 174 before the server raises an exception. 175 Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method. 176 177 Exchanges returned from this method are not cached/saved. 178 """ 179 180 return None 181 182 def modify_exchanges(self, exchanges: typing.List[edq.net.exchange.HTTPExchange]) -> typing.List[edq.net.exchange.HTTPExchange]: 183 """ 184 Modify any exchanges before they are saved into this server's cache. 185 The returned exchanges will be saved in this server's cache. 186 187 This method may be called multiple times with different collections of exchanges. 188 """ 189 190 return exchanges 191 192 def lookup_exchange(self, 193 query: edq.net.exchange.HTTPExchange, 194 match_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 195 ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]: 196 """ 197 Lookup the query exchange to see if it exists in this server. 198 If a match exists, the matching exchange (likely a full version of the query) will be returned along with None. 199 If a match does not exist, a None will be returned along with a message indicating how the query missed 200 (e.g., the URL was matched, but the method was not). 201 """ 202 203 if (match_options is None): 204 match_options = {} 205 206 hint_display = query.url_path 207 target: typing.Any = self._exchanges 208 209 if (query.url_path not in target): 210 return None, f"Could not find matching URL path for '{hint_display}'." 211 212 hint_display = query.url_path 213 if (query.url_anchor is not None): 214 hint_display = f"{query.url_path}#{query.url_anchor}" 215 216 target = target[query.url_path] 217 218 if (query.url_anchor not in target): 219 return None, f"Found URL path, but could not find matching anchor for '{hint_display}'." 220 221 hint_display = f"{hint_display} ({query.method})" 222 target = target[query.url_anchor] 223 224 if (query.method not in target): 225 return None, f"Found URL, but could not find matching method for '{hint_display}'." 226 227 params = list(sorted(query.parameters.keys())) 228 hint_display = f"{hint_display}, (param keys = {params})" 229 target = target[query.method] 230 231 full_match_options = self.match_options.copy() 232 full_match_options.update(match_options) 233 234 hints = [] 235 matches = [] 236 237 for (i, exchange) in enumerate(target): 238 match, hint = exchange.match(query, **full_match_options) 239 if (match): 240 matches.append(exchange) 241 continue 242 243 # Collect hints for non-matches. 244 label = exchange.source_path 245 if (label is None): 246 label = str(i) 247 248 hints.append(f"{label}: {hint}") 249 250 if (len(matches) == 1): 251 # Found exactly one match. 252 return matches[0], None 253 254 if (len(matches) > 1): 255 # Found multiple matches. 256 match_paths = list(sorted([match.source_path for match in matches])) 257 return None, f"Found multiple matching exchanges for '{hint_display}': {match_paths}." 258 259 # Found no matches. 260 return None, f"Found matching URL and method, but could not find matching headers/params for '{hint_display}' (non-matching hints: {hints})." 261 262 def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None: 263 """ Load an exchange into this server. """ 264 265 if (exchange is None): 266 raise ValueError("Cannot load a None exchange.") 267 268 target: typing.Any = self._exchanges 269 if (exchange.url_path not in target): 270 target[exchange.url_path] = {} 271 272 target = target[exchange.url_path] 273 if (exchange.url_anchor not in target): 274 target[exchange.url_anchor] = {} 275 276 target = target[exchange.url_anchor] 277 if (exchange.method not in target): 278 target[exchange.method] = [] 279 280 target = target[exchange.method] 281 target.append(exchange) 282 283 def load_exchange_file(self, path: str) -> None: 284 """ 285 Load an exchange from a file. 286 This will also handle setting the exchanges source path and resolving the exchange's paths. 287 """ 288 289 self.load_exchange(edq.net.exchange.HTTPExchange.from_path(path)) 290 291 def load_exchanges_dir(self, base_dir: str, extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION) -> None: 292 """ Load all exchanges found (recursively) within a directory. """ 293 294 paths = list(sorted(glob.glob(os.path.join(base_dir, "**", f"*{extension}"), recursive = True))) 295 for path in paths: 296 self.load_exchange_file(path) 297 298@typing.runtime_checkable 299class MissingRequestFunction(typing.Protocol): 300 """ 301 A function that can be used to create an exchange when none was found. 302 This is often the last resort when a test server cannot find an exchange matching a request. 303 """ 304 305 def __call__(self, 306 query: edq.net.exchange.HTTPExchange, 307 ) -> typing.Union[edq.net.exchange.HTTPExchange, None]: 308 """ 309 Create an exchange for the given query or return None. 310 """ 311 312class _TestHTTPHandler(http.server.BaseHTTPRequestHandler): 313 _server: typing.Union[HTTPExchangeServer, None] = None 314 """ The test server this handler is being used for. """ 315 316 _verbose: bool = False 317 """ Log more interactions. """ 318 319 _raise_on_404: bool = True 320 """ Raise an exception when no exchange is matched (instead of a 404 error). """ 321 322 _missing_request_func: typing.Union[MissingRequestFunction, None] = None 323 """ A fallback to get an exchange before resulting in a 404. """ 324 325 # Quiet logs. 326 def log_message(self, format: str, *args: typing.Any) -> None: # pylint: disable=redefined-builtin 327 pass 328 329 def do_DELETE(self) -> None: # pylint: disable=invalid-name 330 """ A handler for DELETE requests. """ 331 332 self._do_request('DELETE') 333 334 def do_GET(self) -> None: # pylint: disable=invalid-name 335 """ A handler for GET requests. """ 336 337 self._do_request('GET') 338 339 def do_HEAD(self) -> None: # pylint: disable=invalid-name 340 """ A handler for HEAD requests. """ 341 342 self._do_request('HEAD') 343 344 def do_OPTIONS(self) -> None: # pylint: disable=invalid-name 345 """ A handler for OPTIONS requests. """ 346 347 self._do_request('OPTIONS') 348 349 def do_PATCH(self) -> None: # pylint: disable=invalid-name 350 """ A handler for PATCH requests. """ 351 352 self._do_request('PATCH') 353 354 def do_POST(self) -> None: # pylint: disable=invalid-name 355 """ A handler for POST requests. """ 356 357 self._do_request('POST') 358 359 def do_PUT(self) -> None: # pylint: disable=invalid-name 360 """ A handler for PUT requests. """ 361 362 self._do_request('PUT') 363 364 def _do_request(self, method: str) -> None: 365 """ A common handler for multiple types of requests. """ 366 367 if (self._server is None): 368 raise ValueError("Server has not been initialized.") 369 370 if (self._verbose): 371 _logger.debug("Incoming %s request: '%s'.", method, self.path) 372 373 # Parse data from the request url and body. 374 request_data, request_files = edq.net.util.parse_request_data(self.path, self.headers, self.rfile) 375 376 # Construct file info objects from the raw files. 377 files = [edq.net.exchange.FileInfo(name = name, content = content) for (name, content) in request_files.items()] 378 379 exchange, hint = self._get_exchange(method, parameters = request_data, files = files) # type: ignore[arg-type] 380 381 if (exchange is None): 382 code = http.HTTPStatus.NOT_FOUND.value 383 headers = {} 384 payload = hint 385 else: 386 code = exchange.response_code 387 headers = exchange.response_headers 388 payload = exchange.response_body 389 390 if (payload is None): 391 payload = '' 392 393 self.send_response(code) 394 for (key, value) in headers.items(): 395 self.send_header(key, value) 396 self.end_headers() 397 398 self.wfile.write(payload.encode(edq.util.dirent.DEFAULT_ENCODING)) 399 400 def _get_exchange(self, method: str, 401 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 402 files: typing.Union[typing.List[typing.Union[edq.net.exchange.FileInfo, typing.Dict[str, str]]], None] = None, 403 ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]: 404 """ Get the matching exchange or raise an error. """ 405 406 if (self._server is None): 407 raise ValueError("Server has not been initialized.") 408 409 query = edq.net.exchange.HTTPExchange(method = method, 410 url = self.path, 411 url_anchor = self.headers.get(edq.net.exchange.ANCHOR_HEADER_KEY, None), 412 headers = self.headers, # type: ignore[arg-type] 413 parameters = parameters, files = files) 414 415 exchange, hint = self._server.lookup_exchange(query) 416 417 if ((exchange is None) and (self._missing_request_func is not None)): 418 exchange = self._missing_request_func(query) # pylint: disable=not-callable 419 420 if ((exchange is None) and self._raise_on_404): 421 raise ValueError(f"Failed to lookup exchange: '{hint}'.") 422 423 return exchange, hint
18class HTTPExchangeServer(): 19 """ 20 An HTTP server meant for testing. 21 This server is generally meant to already know about all the requests that will be made to it, 22 and all the responses it should make in reaction to those respective requests. 23 This allows the server to respond very quickly, and makes it ideal for testing. 24 This makes it easy to mock external services for testing. 25 26 If a request is not found in the predefined requests, 27 then missing_request() will be called. 28 If a response is still not available (indicated by a None return from missing_request()), 29 then an error will be raised. 30 """ 31 32 def __init__(self, 33 port: typing.Union[int, None] = None, 34 match_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 35 default_match_options: bool = True, 36 verbose: bool = False, 37 raise_on_404: bool = False, 38 **kwargs: typing.Any) -> None: 39 self.port: typing.Union[int, None] = port 40 """ 41 The active port this server is using. 42 If None, then a random port will be chosen when the server is started and this field will be populated. 43 """ 44 45 self._http_server: typing.Union[http.server.HTTPServer, None] = None 46 """ The HTTP server listening for connections. """ 47 48 self._thread: typing.Union[threading.Thread, None] = None 49 """ The thread running the HTTP server. """ 50 51 self._run_lock: threading.Lock = threading.Lock() 52 """ A lock that the server holds while running. """ 53 54 self.verbose: bool = verbose 55 """ Log more information. """ 56 57 self.raise_on_404: bool = raise_on_404 58 """ Raise an exception when no exchange is matched (instead of a 404 error). """ 59 60 self._exchanges: typing.Dict[str, typing.Dict[typing.Union[str, None], typing.Dict[str, typing.List[edq.net.exchange.HTTPExchange]]]] = {} 61 """ 62 The HTTP exchanges (requests+responses) that this server knows about. 63 Exchanges are stored in layers to help make errors for missing requests easier. 64 Exchanges are stored as: {url_path: {anchor: {method: [exchange, ...]}, ...}, ...}. 65 """ 66 67 if (match_options is None): 68 match_options = {} 69 70 if (default_match_options): 71 if ('headers_to_skip' not in match_options): 72 match_options['headers_to_skip'] = [] 73 74 match_options['headers_to_skip'] += edq.net.exchange.DEFAULT_EXCHANGE_IGNORE_HEADERS 75 76 self.match_options: typing.Dict[str, typing.Any] = match_options.copy() 77 """ Options to use when matching HTTP exchanges. """ 78 79 def get_exchanges(self) -> typing.List[edq.net.exchange.HTTPExchange]: 80 """ 81 Get a shallow list of all the exchanges in this server. 82 Ordering is not guaranteed. 83 """ 84 85 exchanges = [] 86 87 for url_exchanges in self._exchanges.values(): 88 for anchor_exchanges in url_exchanges.values(): 89 for method_exchanges in anchor_exchanges.values(): 90 exchanges += method_exchanges 91 92 return exchanges 93 94 def start(self) -> None: 95 """ Start this server in a thread and set the active port. """ 96 97 class NestedHTTPHandler(_TestHTTPHandler): 98 """ An HTTP handler as a nested class to bind this server object to the handler. """ 99 100 _server = self 101 _verbose = self.verbose 102 _raise_on_404 = self.raise_on_404 103 _missing_request_func = self.missing_request 104 105 if (self.port is None): 106 self.port = edq.net.util.find_open_port() 107 108 self._http_server = http.server.HTTPServer(('', self.port), NestedHTTPHandler) 109 110 if (self.verbose): 111 _logger.info("Starting test server on port %d.", self.port) 112 113 # Use a barrier to ensure that the server thread has started. 114 server_startup_barrier = threading.Barrier(2) 115 116 def _run_server(server: 'HTTPExchangeServer', server_startup_barrier: threading.Barrier) -> None: 117 server_startup_barrier.wait() 118 119 if (server._http_server is None): 120 raise ValueError('Server was not initialized.') 121 122 # Run the server within the run lock context. 123 with server._run_lock: 124 server._http_server.serve_forever(poll_interval = 0.01) 125 server._http_server.server_close() 126 127 if (self.verbose): 128 _logger.info("Stopping test server.") 129 130 self._thread = threading.Thread(target = _run_server, args = (self, server_startup_barrier)) 131 self._thread.start() 132 133 # Wait for the server to startup. 134 server_startup_barrier.wait() 135 time.sleep(SERVER_THREAD_START_WAIT_SEC) 136 137 def wait_for_completion(self) -> None: 138 """ 139 Block until the server is not running. 140 If called while the server is not running, this will return immediately. 141 This function will handle keyboard interrupts (Ctrl-C). 142 """ 143 144 try: 145 with self._run_lock: 146 pass 147 except KeyboardInterrupt: 148 self.stop() 149 self.wait_for_completion() 150 151 def start_and_wait(self) -> None: 152 """ Start the server and block until it is done. """ 153 154 self.start() 155 self.wait_for_completion() 156 157 def stop(self) -> None: 158 """ Stop this server. """ 159 160 self.port = None 161 162 if (self._http_server is not None): 163 self._http_server.shutdown() 164 self._http_server = None 165 166 if (self._thread is not None): 167 if (self._thread.is_alive()): 168 self._thread.join(SERVER_THREAD_REAP_WAIT_SEC) 169 170 self._thread = None 171 172 def missing_request(self, query: edq.net.exchange.HTTPExchange) -> typing.Union[edq.net.exchange.HTTPExchange, None]: 173 """ 174 Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request 175 before the server raises an exception. 176 Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method. 177 178 Exchanges returned from this method are not cached/saved. 179 """ 180 181 return None 182 183 def modify_exchanges(self, exchanges: typing.List[edq.net.exchange.HTTPExchange]) -> typing.List[edq.net.exchange.HTTPExchange]: 184 """ 185 Modify any exchanges before they are saved into this server's cache. 186 The returned exchanges will be saved in this server's cache. 187 188 This method may be called multiple times with different collections of exchanges. 189 """ 190 191 return exchanges 192 193 def lookup_exchange(self, 194 query: edq.net.exchange.HTTPExchange, 195 match_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 196 ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]: 197 """ 198 Lookup the query exchange to see if it exists in this server. 199 If a match exists, the matching exchange (likely a full version of the query) will be returned along with None. 200 If a match does not exist, a None will be returned along with a message indicating how the query missed 201 (e.g., the URL was matched, but the method was not). 202 """ 203 204 if (match_options is None): 205 match_options = {} 206 207 hint_display = query.url_path 208 target: typing.Any = self._exchanges 209 210 if (query.url_path not in target): 211 return None, f"Could not find matching URL path for '{hint_display}'." 212 213 hint_display = query.url_path 214 if (query.url_anchor is not None): 215 hint_display = f"{query.url_path}#{query.url_anchor}" 216 217 target = target[query.url_path] 218 219 if (query.url_anchor not in target): 220 return None, f"Found URL path, but could not find matching anchor for '{hint_display}'." 221 222 hint_display = f"{hint_display} ({query.method})" 223 target = target[query.url_anchor] 224 225 if (query.method not in target): 226 return None, f"Found URL, but could not find matching method for '{hint_display}'." 227 228 params = list(sorted(query.parameters.keys())) 229 hint_display = f"{hint_display}, (param keys = {params})" 230 target = target[query.method] 231 232 full_match_options = self.match_options.copy() 233 full_match_options.update(match_options) 234 235 hints = [] 236 matches = [] 237 238 for (i, exchange) in enumerate(target): 239 match, hint = exchange.match(query, **full_match_options) 240 if (match): 241 matches.append(exchange) 242 continue 243 244 # Collect hints for non-matches. 245 label = exchange.source_path 246 if (label is None): 247 label = str(i) 248 249 hints.append(f"{label}: {hint}") 250 251 if (len(matches) == 1): 252 # Found exactly one match. 253 return matches[0], None 254 255 if (len(matches) > 1): 256 # Found multiple matches. 257 match_paths = list(sorted([match.source_path for match in matches])) 258 return None, f"Found multiple matching exchanges for '{hint_display}': {match_paths}." 259 260 # Found no matches. 261 return None, f"Found matching URL and method, but could not find matching headers/params for '{hint_display}' (non-matching hints: {hints})." 262 263 def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None: 264 """ Load an exchange into this server. """ 265 266 if (exchange is None): 267 raise ValueError("Cannot load a None exchange.") 268 269 target: typing.Any = self._exchanges 270 if (exchange.url_path not in target): 271 target[exchange.url_path] = {} 272 273 target = target[exchange.url_path] 274 if (exchange.url_anchor not in target): 275 target[exchange.url_anchor] = {} 276 277 target = target[exchange.url_anchor] 278 if (exchange.method not in target): 279 target[exchange.method] = [] 280 281 target = target[exchange.method] 282 target.append(exchange) 283 284 def load_exchange_file(self, path: str) -> None: 285 """ 286 Load an exchange from a file. 287 This will also handle setting the exchanges source path and resolving the exchange's paths. 288 """ 289 290 self.load_exchange(edq.net.exchange.HTTPExchange.from_path(path)) 291 292 def load_exchanges_dir(self, base_dir: str, extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION) -> None: 293 """ Load all exchanges found (recursively) within a directory. """ 294 295 paths = list(sorted(glob.glob(os.path.join(base_dir, "**", f"*{extension}"), recursive = True))) 296 for path in paths: 297 self.load_exchange_file(path)
An HTTP server meant for testing. This server is generally meant to already know about all the requests that will be made to it, and all the responses it should make in reaction to those respective requests. This allows the server to respond very quickly, and makes it ideal for testing. This makes it easy to mock external services for testing.
If a request is not found in the predefined requests, then missing_request() will be called. If a response is still not available (indicated by a None return from missing_request()), then an error will be raised.
32 def __init__(self, 33 port: typing.Union[int, None] = None, 34 match_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 35 default_match_options: bool = True, 36 verbose: bool = False, 37 raise_on_404: bool = False, 38 **kwargs: typing.Any) -> None: 39 self.port: typing.Union[int, None] = port 40 """ 41 The active port this server is using. 42 If None, then a random port will be chosen when the server is started and this field will be populated. 43 """ 44 45 self._http_server: typing.Union[http.server.HTTPServer, None] = None 46 """ The HTTP server listening for connections. """ 47 48 self._thread: typing.Union[threading.Thread, None] = None 49 """ The thread running the HTTP server. """ 50 51 self._run_lock: threading.Lock = threading.Lock() 52 """ A lock that the server holds while running. """ 53 54 self.verbose: bool = verbose 55 """ Log more information. """ 56 57 self.raise_on_404: bool = raise_on_404 58 """ Raise an exception when no exchange is matched (instead of a 404 error). """ 59 60 self._exchanges: typing.Dict[str, typing.Dict[typing.Union[str, None], typing.Dict[str, typing.List[edq.net.exchange.HTTPExchange]]]] = {} 61 """ 62 The HTTP exchanges (requests+responses) that this server knows about. 63 Exchanges are stored in layers to help make errors for missing requests easier. 64 Exchanges are stored as: {url_path: {anchor: {method: [exchange, ...]}, ...}, ...}. 65 """ 66 67 if (match_options is None): 68 match_options = {} 69 70 if (default_match_options): 71 if ('headers_to_skip' not in match_options): 72 match_options['headers_to_skip'] = [] 73 74 match_options['headers_to_skip'] += edq.net.exchange.DEFAULT_EXCHANGE_IGNORE_HEADERS 75 76 self.match_options: typing.Dict[str, typing.Any] = match_options.copy() 77 """ Options to use when matching HTTP exchanges. """
The active port this server is using. If None, then a random port will be chosen when the server is started and this field will be populated.
79 def get_exchanges(self) -> typing.List[edq.net.exchange.HTTPExchange]: 80 """ 81 Get a shallow list of all the exchanges in this server. 82 Ordering is not guaranteed. 83 """ 84 85 exchanges = [] 86 87 for url_exchanges in self._exchanges.values(): 88 for anchor_exchanges in url_exchanges.values(): 89 for method_exchanges in anchor_exchanges.values(): 90 exchanges += method_exchanges 91 92 return exchanges
Get a shallow list of all the exchanges in this server. Ordering is not guaranteed.
94 def start(self) -> None: 95 """ Start this server in a thread and set the active port. """ 96 97 class NestedHTTPHandler(_TestHTTPHandler): 98 """ An HTTP handler as a nested class to bind this server object to the handler. """ 99 100 _server = self 101 _verbose = self.verbose 102 _raise_on_404 = self.raise_on_404 103 _missing_request_func = self.missing_request 104 105 if (self.port is None): 106 self.port = edq.net.util.find_open_port() 107 108 self._http_server = http.server.HTTPServer(('', self.port), NestedHTTPHandler) 109 110 if (self.verbose): 111 _logger.info("Starting test server on port %d.", self.port) 112 113 # Use a barrier to ensure that the server thread has started. 114 server_startup_barrier = threading.Barrier(2) 115 116 def _run_server(server: 'HTTPExchangeServer', server_startup_barrier: threading.Barrier) -> None: 117 server_startup_barrier.wait() 118 119 if (server._http_server is None): 120 raise ValueError('Server was not initialized.') 121 122 # Run the server within the run lock context. 123 with server._run_lock: 124 server._http_server.serve_forever(poll_interval = 0.01) 125 server._http_server.server_close() 126 127 if (self.verbose): 128 _logger.info("Stopping test server.") 129 130 self._thread = threading.Thread(target = _run_server, args = (self, server_startup_barrier)) 131 self._thread.start() 132 133 # Wait for the server to startup. 134 server_startup_barrier.wait() 135 time.sleep(SERVER_THREAD_START_WAIT_SEC)
Start this server in a thread and set the active port.
137 def wait_for_completion(self) -> None: 138 """ 139 Block until the server is not running. 140 If called while the server is not running, this will return immediately. 141 This function will handle keyboard interrupts (Ctrl-C). 142 """ 143 144 try: 145 with self._run_lock: 146 pass 147 except KeyboardInterrupt: 148 self.stop() 149 self.wait_for_completion()
Block until the server is not running. If called while the server is not running, this will return immediately. This function will handle keyboard interrupts (Ctrl-C).
151 def start_and_wait(self) -> None: 152 """ Start the server and block until it is done. """ 153 154 self.start() 155 self.wait_for_completion()
Start the server and block until it is done.
157 def stop(self) -> None: 158 """ Stop this server. """ 159 160 self.port = None 161 162 if (self._http_server is not None): 163 self._http_server.shutdown() 164 self._http_server = None 165 166 if (self._thread is not None): 167 if (self._thread.is_alive()): 168 self._thread.join(SERVER_THREAD_REAP_WAIT_SEC) 169 170 self._thread = None
Stop this server.
172 def missing_request(self, query: edq.net.exchange.HTTPExchange) -> typing.Union[edq.net.exchange.HTTPExchange, None]: 173 """ 174 Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request 175 before the server raises an exception. 176 Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method. 177 178 Exchanges returned from this method are not cached/saved. 179 """ 180 181 return None
Provide the server (specifically, child classes) one last chance to resolve an incoming HTTP request before the server raises an exception. Usually exchanges are loaded from disk, but technically a server can resolve all requests with this method.
Exchanges returned from this method are not cached/saved.
183 def modify_exchanges(self, exchanges: typing.List[edq.net.exchange.HTTPExchange]) -> typing.List[edq.net.exchange.HTTPExchange]: 184 """ 185 Modify any exchanges before they are saved into this server's cache. 186 The returned exchanges will be saved in this server's cache. 187 188 This method may be called multiple times with different collections of exchanges. 189 """ 190 191 return exchanges
Modify any exchanges before they are saved into this server's cache. The returned exchanges will be saved in this server's cache.
This method may be called multiple times with different collections of exchanges.
193 def lookup_exchange(self, 194 query: edq.net.exchange.HTTPExchange, 195 match_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 196 ) -> typing.Tuple[typing.Union[edq.net.exchange.HTTPExchange, None], typing.Union[str, None]]: 197 """ 198 Lookup the query exchange to see if it exists in this server. 199 If a match exists, the matching exchange (likely a full version of the query) will be returned along with None. 200 If a match does not exist, a None will be returned along with a message indicating how the query missed 201 (e.g., the URL was matched, but the method was not). 202 """ 203 204 if (match_options is None): 205 match_options = {} 206 207 hint_display = query.url_path 208 target: typing.Any = self._exchanges 209 210 if (query.url_path not in target): 211 return None, f"Could not find matching URL path for '{hint_display}'." 212 213 hint_display = query.url_path 214 if (query.url_anchor is not None): 215 hint_display = f"{query.url_path}#{query.url_anchor}" 216 217 target = target[query.url_path] 218 219 if (query.url_anchor not in target): 220 return None, f"Found URL path, but could not find matching anchor for '{hint_display}'." 221 222 hint_display = f"{hint_display} ({query.method})" 223 target = target[query.url_anchor] 224 225 if (query.method not in target): 226 return None, f"Found URL, but could not find matching method for '{hint_display}'." 227 228 params = list(sorted(query.parameters.keys())) 229 hint_display = f"{hint_display}, (param keys = {params})" 230 target = target[query.method] 231 232 full_match_options = self.match_options.copy() 233 full_match_options.update(match_options) 234 235 hints = [] 236 matches = [] 237 238 for (i, exchange) in enumerate(target): 239 match, hint = exchange.match(query, **full_match_options) 240 if (match): 241 matches.append(exchange) 242 continue 243 244 # Collect hints for non-matches. 245 label = exchange.source_path 246 if (label is None): 247 label = str(i) 248 249 hints.append(f"{label}: {hint}") 250 251 if (len(matches) == 1): 252 # Found exactly one match. 253 return matches[0], None 254 255 if (len(matches) > 1): 256 # Found multiple matches. 257 match_paths = list(sorted([match.source_path for match in matches])) 258 return None, f"Found multiple matching exchanges for '{hint_display}': {match_paths}." 259 260 # Found no matches. 261 return None, f"Found matching URL and method, but could not find matching headers/params for '{hint_display}' (non-matching hints: {hints})."
Lookup the query exchange to see if it exists in this server. If a match exists, the matching exchange (likely a full version of the query) will be returned along with None. If a match does not exist, a None will be returned along with a message indicating how the query missed (e.g., the URL was matched, but the method was not).
263 def load_exchange(self, exchange: edq.net.exchange.HTTPExchange) -> None: 264 """ Load an exchange into this server. """ 265 266 if (exchange is None): 267 raise ValueError("Cannot load a None exchange.") 268 269 target: typing.Any = self._exchanges 270 if (exchange.url_path not in target): 271 target[exchange.url_path] = {} 272 273 target = target[exchange.url_path] 274 if (exchange.url_anchor not in target): 275 target[exchange.url_anchor] = {} 276 277 target = target[exchange.url_anchor] 278 if (exchange.method not in target): 279 target[exchange.method] = [] 280 281 target = target[exchange.method] 282 target.append(exchange)
Load an exchange into this server.
284 def load_exchange_file(self, path: str) -> None: 285 """ 286 Load an exchange from a file. 287 This will also handle setting the exchanges source path and resolving the exchange's paths. 288 """ 289 290 self.load_exchange(edq.net.exchange.HTTPExchange.from_path(path))
Load an exchange from a file. This will also handle setting the exchanges source path and resolving the exchange's paths.
292 def load_exchanges_dir(self, base_dir: str, extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION) -> None: 293 """ Load all exchanges found (recursively) within a directory. """ 294 295 paths = list(sorted(glob.glob(os.path.join(base_dir, "**", f"*{extension}"), recursive = True))) 296 for path in paths: 297 self.load_exchange_file(path)
Load all exchanges found (recursively) within a directory.
299@typing.runtime_checkable 300class MissingRequestFunction(typing.Protocol): 301 """ 302 A function that can be used to create an exchange when none was found. 303 This is often the last resort when a test server cannot find an exchange matching a request. 304 """ 305 306 def __call__(self, 307 query: edq.net.exchange.HTTPExchange, 308 ) -> typing.Union[edq.net.exchange.HTTPExchange, None]: 309 """ 310 Create an exchange for the given query or return None. 311 """
A function that can be used to create an exchange when none was found. This is often the last resort when a test server cannot find an exchange matching a request.
1953def _no_init_or_replace_init(self, *args, **kwargs): 1954 cls = type(self) 1955 1956 if cls._is_protocol: 1957 raise TypeError('Protocols cannot be instantiated') 1958 1959 # Already using a custom `__init__`. No need to calculate correct 1960 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1961 if cls.__init__ is not _no_init_or_replace_init: 1962 return 1963 1964 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1965 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1966 # searches for a proper new `__init__` in the MRO. The new `__init__` 1967 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1968 # instantiation of the protocol subclass will thus use the new 1969 # `__init__` and no longer call `_no_init_or_replace_init`. 1970 for base in cls.__mro__: 1971 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1972 if init is not _no_init_or_replace_init: 1973 cls.__init__ = init 1974 break 1975 else: 1976 # should not happen 1977 cls.__init__ = object.__init__ 1978 1979 cls.__init__(self, *args, **kwargs)