edq.util.net
Utilities for network and HTTP.
1""" 2Utilities for network and HTTP. 3""" 4 5import argparse 6import email.message 7import errno 8import http.server 9import io 10import logging 11import os 12import pathlib 13import socket 14import time 15import typing 16import urllib.parse 17 18import requests 19import requests_toolbelt.multipart.decoder 20 21import edq.util.dirent 22import edq.util.encoding 23import edq.util.hash 24import edq.util.json 25import edq.util.pyimport 26 27DEFAULT_START_PORT: int = 30000 28DEFAULT_END_PORT: int = 40000 29DEFAULT_PORT_SEARCH_WAIT_SEC: float = 0.01 30 31DEFAULT_REQUEST_TIMEOUT_SECS: float = 10.0 32 33DEFAULT_HTTP_EXCHANGE_EXTENSION: str= '.httpex.json' 34 35QUERY_CLIP_LENGTH: int = 100 36""" If the filename of an HTTPExhange being saved is longer than this, then clip it. """ 37 38ANCHOR_HEADER_KEY: str = 'edq-anchor' 39""" 40By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL. 41Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information. 42""" 43 44ALLOWED_METHODS: typing.List[str] = [ 45 'DELETE', 46 'GET', 47 'HEAD', 48 'OPTIONS', 49 'PATCH', 50 'POST', 51 'PUT', 52] 53""" Allowed HTTP methods for an HTTPExchange. """ 54 55DEFAULT_EXCHANGE_IGNORE_HEADERS: typing.List[str] = [ 56 'accept', 57 'accept-encoding', 58 'accept-language', 59 'cache-control', 60 'connection', 61 'content-length', 62 'content-security-policy', 63 'content-type', 64 'cookie', 65 'date', 66 'dnt', 67 'etag', 68 'host', 69 'link', 70 'location', 71 'priority', 72 'referrer-policy', 73 'sec-fetch-dest', 74 'sec-fetch-mode', 75 'sec-fetch-site', 76 'sec-fetch-user', 77 'sec-gpc', 78 'server', 79 'server-timing', 80 'set-cookie', 81 'upgrade-insecure-requests', 82 'user-agent', 83 'x-content-type-options', 84 'x-download-options', 85 'x-permitted-cross-domain-policies', 86 'x-rate-limit-remaining', 87 'x-request-context-id', 88 'x-request-cost', 89 'x-runtime', 90 'x-session-id', 91 'x-xss-protection', 92 ANCHOR_HEADER_KEY, 93] 94""" 95By default, ignore these headers during exchange matching. 96Some are sent automatically and we don't need to record (like content-length), 97and some are additional information we don't need. 98""" 99 100_exchanges_out_dir: typing.Union[str, None] = None 101""" If not None, all requests made via make_request() will be saved as an HTTPExchange in this directory. """ 102 103_exchanges_clean_func: typing.Union[str, None] = None 104""" If not None, all created exchanges (in HTTPExchange.make_request() and HTTPExchange.from_response()) will use this response modifier. """ 105 106@typing.runtime_checkable 107class ResponseModifierFunction(typing.Protocol): 108 """ 109 A function that can be used to modify an exchange's response. 110 Exchanges can use these functions to normalize their responses before saving. 111 """ 112 113 def __call__(self, 114 response: requests.Response, 115 body: str, 116 ) -> str: 117 """ 118 Modify the http response. 119 Headers may be modified in the response directly, 120 while the modified (or same) body must be returned. 121 """ 122 123class FileInfo(edq.util.json.DictConverter): 124 """ Store info about files used in HTTP exchanges. """ 125 126 def __init__(self, 127 path: typing.Union[str, None] = None, 128 name: typing.Union[str, None] = None, 129 content: typing.Union[str, bytes, None] = None, 130 b64_encoded: bool = False, 131 **kwargs: typing.Any) -> None: 132 # Normalize the path from POSIX-style to the system's style. 133 if (path is not None): 134 path = str(pathlib.PurePath(pathlib.PurePosixPath(path))) 135 136 self.path: typing.Union[str, None] = path 137 """ The on-disk path to a file. """ 138 139 if ((name is None) and (self.path is not None)): 140 name = os.path.basename(self.path) 141 142 if (name is None): 143 raise ValueError("No name was provided for file.") 144 145 self.name: str = name 146 """ The name for this file used in an HTTP request. """ 147 148 self.content: typing.Union[str, bytes, None] = content 149 """ The contents of this file. """ 150 151 self.b64_encoded: bool = b64_encoded 152 """ Whether the content is a string encoded in Base64. """ 153 154 if ((self.path is None) and (self.content is None)): 155 raise ValueError("File must have either path or content specified.") 156 157 def resolve_path(self, base_dir: str, load_file: bool = True) -> None: 158 """ Resolve this path relative to the given base dir. """ 159 160 if ((self.path is not None) and (not os.path.isabs(self.path))): 161 self.path = os.path.abspath(os.path.join(base_dir, self.path)) 162 163 if ((self.path is not None) and (self.content is None) and load_file): 164 self.content = edq.util.dirent.read_file_bytes(self.path) 165 166 def hash_content(self) -> str: 167 """ 168 Compute a hash for the content present. 169 If no content is provided, use the path. 170 """ 171 172 hash_content = self.content 173 174 if (self.b64_encoded and isinstance(hash_content, str)): 175 hash_content = edq.util.encoding.from_base64(hash_content) 176 177 if (hash_content is None): 178 hash_content = self.path 179 180 return edq.util.hash.sha256_hex(hash_content) 181 182 def to_dict(self) -> typing.Dict[str, typing.Any]: 183 data = vars(self).copy() 184 185 # JSON does not support raw bytes, so we will need to base64 encode any binary content. 186 if (isinstance(self.content, bytes)): 187 data['content'] = edq.util.encoding.to_base64(self.content) 188 data['b64_encoded'] = True 189 190 return data 191 192 @classmethod 193 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 194 return FileInfo(**data) 195 196class HTTPExchange(edq.util.json.DictConverter): 197 """ 198 The request and response making up a full HTTP exchange. 199 """ 200 201 def __init__(self, 202 method: str = 'GET', 203 url: typing.Union[str, None] = None, 204 url_path: typing.Union[str, None] = None, 205 url_anchor: typing.Union[str, None] = None, 206 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 207 files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None, 208 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 209 allow_redirects: typing.Union[bool, None] = None, 210 response_code: int = http.HTTPStatus.OK, 211 response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 212 json_body: typing.Union[bool, None] = None, 213 response_body: typing.Union[str, dict, list, None] = None, 214 source_path: typing.Union[str, None] = None, 215 response_modifier: typing.Union[str, None] = None, 216 extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 217 **kwargs: typing.Any) -> None: 218 method = str(method).upper() 219 if (method not in ALLOWED_METHODS): 220 raise ValueError(f"Got unknown/disallowed method: '{method}'.") 221 222 self.method: str = method 223 """ The HTTP method for this exchange. """ 224 225 url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters) 226 227 self.url_path: str = url_path 228 """ 229 The path portion of the request URL. 230 Only the path (not domain, port, params, anchor, etc) should be included. 231 """ 232 233 self.url_anchor: typing.Union[str, None] = url_anchor 234 """ 235 The anchor portion of the request URL (if it exists). 236 """ 237 238 self.parameters: typing.Dict[str, typing.Any] = parameters 239 """ 240 The parameters/arguments for this request. 241 Parameters should be provided here and not encoded into URLs, 242 regardless of the request method. 243 With the exception of files, all parameters should be placed here. 244 """ 245 246 if (files is None): 247 files = [] 248 249 parsed_files = [] 250 for file in files: 251 if (isinstance(file, FileInfo)): 252 parsed_files.append(file) 253 else: 254 parsed_files.append(FileInfo(**file)) 255 256 self.files: typing.List[FileInfo] = parsed_files 257 """ 258 A list of files to include in the request. 259 The files are represented as dicts with a 260 "path" (path to the file on disk) and "name" (the filename to send in the request) field. 261 These paths must be POSIX-style paths, 262 they will be converted to system-specific paths. 263 Once this exchange is ready for use, these paths should be resolved (and probably absolute). 264 However, when serialized these paths should probably be relative. 265 To reconcile this, resolve_paths() should be called before using this exchange. 266 """ 267 268 if (headers is None): 269 headers = {} 270 271 self.headers: typing.Dict[str, typing.Any] = headers 272 """ Headers in the request. """ 273 274 if (allow_redirects is None): 275 allow_redirects = True 276 277 self.allow_redirects: bool = allow_redirects 278 """ Follow redirects. """ 279 280 self.response_code: int = response_code 281 """ The HTTP status code of the response. """ 282 283 if (response_headers is None): 284 response_headers = {} 285 286 self.response_headers: typing.Dict[str, typing.Any] = response_headers 287 """ Headers in the response. """ 288 289 if (json_body is None): 290 json_body = isinstance(response_body, (dict, list)) 291 292 self.json_body: bool = json_body 293 """ 294 Indicates that the response is JSON and should be converted to/from a string. 295 If the response body is passed in a dict/list and this is passed as None, 296 then this will be set as true. 297 """ 298 299 if (self.json_body and isinstance(response_body, (dict, list))): 300 response_body = edq.util.json.dumps(response_body) 301 302 self.response_body: typing.Union[str, None] = response_body # type: ignore[assignment] 303 """ 304 The response that should be sent in this exchange. 305 """ 306 307 self.response_modifier: typing.Union[str, None] = response_modifier 308 """ 309 This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) 310 before sent back to the caller. 311 This reference must be importable via edq.util.pyimport.fetch(). 312 """ 313 314 self.source_path: typing.Union[str, None] = source_path 315 """ 316 The path that this exchange was loaded from (if it was loaded from a file). 317 This value should never be serialized, but can be useful for testing. 318 """ 319 320 if (extra_options is None): 321 extra_options = {} 322 323 self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy() 324 """ 325 Additional options for this exchange. 326 This library will not use these options, but other's may. 327 kwargs will also be added to this. 328 """ 329 330 self.extra_options.update(kwargs) 331 332 def _parse_url_components(self, 333 url: typing.Union[str, None] = None, 334 url_path: typing.Union[str, None] = None, 335 url_anchor: typing.Union[str, None] = None, 336 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 337 ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]: 338 """ 339 Parse out all URL-based components from raw inputs. 340 The URL's path and anchor can either be supplied separately, or as part of the full given URL. 341 If content is present in both places, they much match (or an error will be raised). 342 Query parameters may be provided in the full URL, 343 but will be overwritten by any that are provided separately. 344 Any information from the URL aside from the path, anchor/fragment, and query will be ignored. 345 Note that path parameters (not query parameters) will be ignored. 346 The final url path, url anchor, and parameters will be returned. 347 """ 348 349 # Do base initialization and cleanup. 350 351 if (url_path is not None): 352 url_path = url_path.strip() 353 if (url_path == ''): 354 url_path = '' 355 else: 356 url_path = url_path.lstrip('/') 357 358 if (url_anchor is not None): 359 url_anchor = url_anchor.strip() 360 if (url_anchor == ''): 361 url_anchor = None 362 else: 363 url_anchor = url_anchor.lstrip('#') 364 365 if (parameters is None): 366 parameters = {} 367 368 # Parse the URL (if present). 369 370 if ((url is not None) and (url.strip() != '')): 371 parts = urllib.parse.urlparse(url) 372 373 # Handle the path. 374 375 path = parts.path.lstrip('/') 376 377 if ((url_path is not None) and (url_path != path)): 378 raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').") 379 380 url_path = path 381 382 # Check the optional anchor/fragment. 383 384 if (parts.fragment != ''): 385 fragment = parts.fragment.lstrip('#') 386 387 if ((url_anchor is not None) and (url_anchor != fragment)): 388 raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').") 389 390 url_anchor = fragment 391 392 # Check for any parameters. 393 394 url_params = parse_query_string(parts.query) 395 for (key, value) in url_params.items(): 396 if (key not in parameters): 397 parameters[key] = value 398 399 if (url_path is None): 400 raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.') 401 402 # Sort parameter keys for consistency. 403 parameters = {key: parameters[key] for key in sorted(parameters.keys())} 404 405 return url_path, url_anchor, parameters 406 407 def resolve_paths(self, base_dir: str) -> None: 408 """ Resolve any paths relative to the given base dir. """ 409 410 for file_info in self.files: 411 file_info.resolve_path(base_dir) 412 413 def match(self, query: 'HTTPExchange', 414 match_headers: bool = True, 415 headers_to_skip: typing.Union[typing.List[str], None] = None, 416 params_to_skip: typing.Union[typing.List[str], None] = None, 417 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 418 """ 419 Check if this exchange matches the query exchange. 420 If they match, `(True, None)` will be returned. 421 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 422 423 Note that this is not an equality check, 424 as a query exchange is often missing the response components. 425 This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange. 426 """ 427 428 if (query.method != self.method): 429 return False, f"HTTP method does not match (query = {query.method}, target = {self.method})." 430 431 if (query.url_path != self.url_path): 432 return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})." 433 434 if (query.url_anchor != self.url_anchor): 435 return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})." 436 437 if (headers_to_skip is None): 438 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 439 440 if (params_to_skip is None): 441 params_to_skip = [] 442 443 if (match_headers): 444 match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip) 445 if (not match): 446 return False, hint 447 448 match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip) 449 if (not match): 450 return False, hint 451 452 # Check file names and hash contents. 453 query_filenames = {(file.name, file.hash_content()) for file in query.files} 454 target_filenames = {(file.name, file.hash_content()) for file in self.files} 455 if (query_filenames != target_filenames): 456 return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})." 457 458 return True, None 459 460 def _match_dict(self, label: str, 461 query_dict: typing.Dict[str, typing.Any], 462 target_dict: typing.Dict[str, typing.Any], 463 keys_to_skip: typing.Union[typing.List[str], None] = None, 464 query_label: str = 'query', 465 target_label: str = 'target', 466 normalize_key_case: bool = True, 467 ) -> typing.Tuple[bool, typing.Union[str, None]]: 468 """ A subcheck in match(), specifically for a dictionary. """ 469 470 if (keys_to_skip is None): 471 keys_to_skip = [] 472 473 if (normalize_key_case): 474 keys_to_skip = [key.lower() for key in keys_to_skip] 475 query_dict = {key.lower(): value for (key, value) in query_dict.items()} 476 target_dict = {key.lower(): value for (key, value) in target_dict.items()} 477 478 query_keys = set(query_dict.keys()) - set(keys_to_skip) 479 target_keys = set(target_dict.keys()) - set(keys_to_skip) 480 481 if (query_keys != target_keys): 482 return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})." 483 484 for key in sorted(query_keys): 485 query_value = query_dict[key] 486 target_value = target_dict[key] 487 488 if (query_value != target_value): 489 comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'" 490 return False, f"{label.title()} '{key}' has a non-matching value ({comparison})." 491 492 return True, None 493 494 def get_url(self) -> str: 495 """ Get the URL path and anchor combined. """ 496 497 url = self.url_path 498 499 if (self.url_anchor is not None): 500 url += ('#' + self.url_anchor) 501 502 return url 503 504 def make_request(self, base_url: str, raise_for_status: bool = True, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 505 """ Perform the HTTP request described by this exchange. """ 506 507 files = [] 508 for file_info in self.files: 509 content = file_info.content 510 511 # Content is base64 encoded. 512 if (file_info.b64_encoded and isinstance(content, str)): 513 content = edq.util.encoding.from_base64(content) 514 515 # Content is missing and must be in a file. 516 if (content is None): 517 content = open(file_info.path, 'rb') # type: ignore[assignment,arg-type] # pylint: disable=consider-using-with 518 519 files.append((file_info.name, content)) 520 521 url = f"{base_url}/{self.get_url()}" 522 523 response, body = make_request(self.method, url, 524 headers = self.headers, 525 data = self.parameters, 526 files = files, 527 raise_for_status = raise_for_status, 528 allow_redirects = self.allow_redirects, 529 **kwargs, 530 ) 531 532 if (self.response_modifier is not None): 533 modify_func = edq.util.pyimport.fetch(self.response_modifier) 534 body = modify_func(response, body) 535 536 return response, body 537 538 def match_response(self, response: requests.Response, 539 override_body: typing.Union[str, None] = None, 540 headers_to_skip: typing.Union[typing.List[str], None] = None, 541 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 542 """ 543 Check if this exchange matches the given response. 544 If they match, `(True, None)` will be returned. 545 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 546 """ 547 548 if (headers_to_skip is None): 549 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 550 551 response_body = override_body 552 if (response_body is None): 553 response_body = response.text 554 555 if (self.response_code != response.status_code): 556 return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})" 557 558 expected_body = self.response_body 559 actual_body = None 560 561 if (self.json_body): 562 actual_body = response.json() 563 564 # Normalize the actual and expected bodies. 565 566 actual_body = edq.util.json.dumps(actual_body) 567 568 if (isinstance(expected_body, str)): 569 expected_body = edq.util.json.loads(expected_body) 570 571 expected_body = edq.util.json.dumps(expected_body) 572 else: 573 actual_body = response_body 574 575 if (self.response_body != actual_body): 576 body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'" 577 return False, f"body does not match ({body_hint})" 578 579 match, hint = self._match_dict('header', response.headers, self.response_headers, 580 keys_to_skip = headers_to_skip, 581 query_label = 'response', target_label = 'exchange') 582 583 if (not match): 584 return False, hint 585 586 return True, None 587 588 def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str: 589 """ Create a consistent, semi-unique, and relative path for this exchange. """ 590 591 url = self.get_url().strip() 592 parts = url.split('/') 593 594 595 if (url in ['', '/']): 596 filename = '_index_' 597 dirname = '' 598 else: 599 filename = parts[-1] 600 601 if (len(parts) > 1): 602 dirname = os.path.join(*parts[0:-1]) 603 else: 604 dirname = '' 605 606 parameters = {} 607 for key in sorted(self.parameters.keys()): 608 parameters[key] = self.parameters[key] 609 610 # Treat files as params as well. 611 for file_info in self.files: 612 parameters[f"file-{file_info.name}"] = file_info.hash_content() 613 614 query = urllib.parse.urlencode(parameters) 615 if (query != ''): 616 # The query can get very long, so we may have to clip it. 617 query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH) 618 619 # Note that the '?' is URL encoded. 620 filename += f"%3F{query_text}" 621 622 filename += f"_{self.method}{http_exchange_extension}" 623 624 return os.path.join(dirname, filename) 625 626 def to_dict(self) -> typing.Dict[str, typing.Any]: 627 return vars(self) 628 629 @classmethod 630 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 631 return HTTPExchange(**data) 632 633 @classmethod 634 def from_path(cls, path: str, 635 set_source_path: bool = True, 636 ) -> 'HTTPExchange': 637 """ 638 Load an exchange from a file. 639 This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths. 640 """ 641 642 exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange)) 643 644 if (set_source_path): 645 exchange.source_path = os.path.abspath(path) 646 647 exchange.resolve_paths(os.path.abspath(os.path.dirname(path))) 648 649 return exchange 650 651 @classmethod 652 def from_response(cls, 653 response: requests.Response, 654 headers_to_skip: typing.Union[typing.List[str], None] = None, 655 params_to_skip: typing.Union[typing.List[str], None] = None, 656 allow_redirects: typing.Union[bool, None] = None, 657 ) -> 'HTTPExchange': 658 """ Create a full excahnge from a response. """ 659 660 if (headers_to_skip is None): 661 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 662 663 if (params_to_skip is None): 664 params_to_skip = [] 665 666 body = response.text 667 668 # Use a clean function (if one exists). 669 if (_exchanges_clean_func is not None): 670 modify_func = edq.util.pyimport.fetch(_exchanges_clean_func) 671 body = modify_func(response, body) 672 673 request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()} 674 response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()} 675 676 # Clean headers. 677 for key in headers_to_skip: 678 key = key.lower() 679 680 request_headers.pop(key, None) 681 response_headers.pop(key, None) 682 683 request_data, request_files = parse_request_data(response.request.url, response.request.headers, response.request.body) 684 685 # Clean parameters. 686 for key in params_to_skip: 687 request_data.pop(key, None) 688 689 files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()] 690 691 data = { 692 'method': response.request.method, 693 'url': response.request.url, 694 'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None), 695 'parameters': request_data, 696 'files': files, 697 'headers': request_headers, 698 'response_code': response.status_code, 699 'response_headers': response_headers, 700 'response_body': body, 701 'response_modifier': _exchanges_clean_func, 702 'allow_redirects': allow_redirects, 703 } 704 705 return HTTPExchange(**data) 706 707@typing.runtime_checkable 708class HTTPExchangeComplete(typing.Protocol): 709 """ 710 A function that can be called after a request has been made (and exchange constructed). 711 """ 712 713 def __call__(self, 714 exchange: HTTPExchange 715 ) -> str: 716 """ 717 Called after an HTTP exchange has been completed. 718 """ 719 720_make_request_exchange_complete_func: typing.Union[HTTPExchangeComplete, None] = None # pylint: disable=invalid-name 721""" If not None, call this func after make_request() has created its HTTPExchange. """ 722 723def find_open_port( 724 start_port: int = DEFAULT_START_PORT, end_port: int = DEFAULT_END_PORT, 725 wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC) -> int: 726 """ 727 Find an open port on this machine within the given range (inclusive). 728 If no open port is found, an error is raised. 729 """ 730 731 for port in range(start_port, end_port + 1): 732 try: 733 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 734 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 735 sock.bind(('127.0.0.1', port)) 736 737 # Explicitly close the port and wait a short amount of time for the port to clear. 738 # This should not be required because of the socket option above, 739 # but the cost is small. 740 sock.close() 741 time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC) 742 743 return port 744 except socket.error as ex: 745 sock.close() 746 747 if (ex.errno == errno.EADDRINUSE): 748 continue 749 750 # Unknown error. 751 raise ex 752 753 raise ValueError(f"Could not find open port in [{start_port}, {end_port}].") 754 755def make_request(method: str, url: str, 756 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 757 data: typing.Union[typing.Dict[str, typing.Any], None] = None, 758 files: typing.Union[typing.List[typing.Any], None] = None, 759 raise_for_status: bool = True, 760 timeout_secs: float = DEFAULT_REQUEST_TIMEOUT_SECS, 761 output_dir: typing.Union[str, None] = None, 762 send_anchor_header: bool = True, 763 headers_to_skip: typing.Union[typing.List[str], None] = None, 764 params_to_skip: typing.Union[typing.List[str], None] = None, 765 http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION, 766 add_http_prefix: bool = True, 767 additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 768 exchange_complete_func: typing.Union[HTTPExchangeComplete, None] = None, 769 allow_redirects: typing.Union[bool, None] = None, 770 **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 771 """ 772 Make an HTTP request and return the response object and text body. 773 """ 774 775 if (add_http_prefix and (not url.lower().startswith('http'))): 776 url = 'http://' + url 777 778 if (output_dir is None): 779 output_dir = _exchanges_out_dir 780 781 if (headers is None): 782 headers = {} 783 784 if (data is None): 785 data = {} 786 787 if (files is None): 788 files = [] 789 790 if (additional_requests_options is None): 791 additional_requests_options = {} 792 793 # Add in the anchor as a header (since it is not traditionally sent in an HTTP request). 794 if (send_anchor_header): 795 headers = headers.copy() 796 797 parts = urllib.parse.urlparse(url) 798 headers[ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#') 799 800 options = additional_requests_options.copy() 801 options.update({ 802 'headers': headers, 803 'files': files, 804 'timeout': timeout_secs, 805 }) 806 807 if (allow_redirects is not None): 808 options['allow_redirects'] = allow_redirects 809 810 if (method == 'GET'): 811 options['params'] = data 812 else: 813 options['data'] = data 814 815 logging.debug("Making %s request: '%s' (options = %s).", method, url, options) 816 response = requests.request(method, url, **options) 817 818 body = response.text 819 logging.debug("Response:\n%s", body) 820 821 if (raise_for_status): 822 # Handle 404s a little special, as their body may contain useful information. 823 if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')): 824 response.reason += f" (Body: '{body.strip()}')" 825 826 response.raise_for_status() 827 828 exchange = None 829 if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)): 830 exchange = HTTPExchange.from_response(response, 831 headers_to_skip = headers_to_skip, params_to_skip = params_to_skip, 832 allow_redirects = options.get('allow_redirects', None)) 833 834 if ((output_dir is not None) and (exchange is not None)): 835 relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension) 836 path = os.path.abspath(os.path.join(output_dir, relpath)) 837 838 edq.util.dirent.mkdir(os.path.dirname(path)) 839 edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False) 840 841 if ((exchange_complete_func is not None) and (exchange is not None)): 842 exchange_complete_func(exchange) 843 844 if ((_make_request_exchange_complete_func is not None) and (exchange is not None)): 845 _make_request_exchange_complete_func(exchange) # pylint: disable=not-callable 846 847 return response, body 848 849def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 850 """ 851 Make a GET request and return the response object and text body. 852 """ 853 854 return make_request('GET', url, **kwargs) 855 856def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 857 """ 858 Make a POST request and return the response object and text body. 859 """ 860 861 return make_request('POST', url, **kwargs) 862 863def parse_request_data( 864 url: str, 865 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 866 body: typing.Union[bytes, str, io.BufferedIOBase], 867 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 868 """ Parse data and files from an HTTP request URL and body. """ 869 870 # Parse data from the request body. 871 request_data, request_files = parse_request_body_data(headers, body) 872 873 # Parse parameters from the URL. 874 url_parts = urllib.parse.urlparse(url) 875 request_data.update(parse_query_string(url_parts.query)) 876 877 return request_data, request_files 878 879def parse_request_body_data( 880 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 881 body: typing.Union[bytes, str, io.BufferedIOBase], 882 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 883 """ Parse data and files from an HTTP request body. """ 884 885 data: typing.Dict[str, typing.Any] = {} 886 files: typing.Dict[str, bytes] = {} 887 888 length = int(headers.get('Content-Length', 0)) 889 if (length == 0): 890 return data, files 891 892 if (isinstance(body, io.BufferedIOBase)): 893 raw_content = body.read(length) 894 elif (isinstance(body, str)): 895 raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING) 896 else: 897 raw_content = body 898 899 content_type = headers.get('Content-Type', '') 900 901 if (content_type in ['', 'application/x-www-form-urlencoded']): 902 data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip()) 903 return data, files 904 905 if (content_type.startswith('multipart/form-data')): 906 decoder = requests_toolbelt.multipart.decoder.MultipartDecoder( 907 raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING) 908 909 for multipart_section in decoder.parts: 910 values = parse_content_dispositions(multipart_section.headers) 911 912 name = values.get('name', None) 913 if (name is None): 914 raise ValueError("Could not find name for multipart section.") 915 916 # Look for a "filename" field to indicate a multipart section is a file. 917 # The file's desired name is still in "name", but an alternate name is in "filename". 918 if ('filename' in values): 919 filename = values.get('name', '') 920 if (filename == ''): 921 raise ValueError("Unable to find filename for multipart section.") 922 923 files[filename] = multipart_section.content 924 else: 925 # Normal Parameter 926 data[name] = multipart_section.text 927 928 return data, files 929 930 raise ValueError(f"Unknown content type: '{content_type}'.") 931 932def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]: 933 """ Parse a request's content dispositions from headers. """ 934 935 values = {} 936 for (key, value) in headers.items(): 937 if (isinstance(key, bytes)): 938 key = key.decode(edq.util.dirent.DEFAULT_ENCODING) 939 940 if (isinstance(value, bytes)): 941 value = value.decode(edq.util.dirent.DEFAULT_ENCODING) 942 943 key = key.strip().lower() 944 if (key != 'content-disposition'): 945 continue 946 947 # The Python stdlib recommends using the email library for this parsing, 948 # but I have not had a good experience with it. 949 for part in value.strip().split(';'): 950 part = part.strip() 951 952 parts = part.split('=') 953 if (len(parts) != 2): 954 continue 955 956 cd_key = parts[0].strip() 957 cd_value = parts[1].strip().strip('"') 958 959 values[cd_key] = cd_value 960 961 return values 962 963def parse_query_string(text: str, 964 replace_single_lists: bool = True, 965 keep_blank_values: bool = True, 966 **kwargs: typing.Any) -> typing.Dict[str, typing.Any]: 967 """ 968 Parse a query string (like urllib.parse.parse_qs()), and normalize the result. 969 If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value. 970 """ 971 972 results = urllib.parse.parse_qs(text, keep_blank_values = True) 973 for (key, value) in results.items(): 974 if (replace_single_lists and (len(value) == 1)): 975 results[key] = value[0] # type: ignore[assignment] 976 977 return results 978 979def set_cli_args(parser: argparse.ArgumentParser, extra_state: typing.Dict[str, typing.Any]) -> None: 980 """ 981 Set common CLI arguments. 982 This is a sibling to init_from_args(), as the arguments set here can be interpreted there. 983 """ 984 985 parser.add_argument('--http-exchanges-out-dir', dest = 'http_exchanges_out_dir', 986 action = 'store', type = str, default = None, 987 help = 'If set, write all outgoing HTTP requests as exchanges to this directory.') 988 989 parser.add_argument('--http-exchanges-clean-func', dest = 'http_exchanges_clean_func', 990 action = 'store', type = str, default = None, 991 help = 'If set, default all created exchanges to this modifier function.') 992 993def init_from_args( 994 parser: argparse.ArgumentParser, 995 args: argparse.Namespace, 996 extra_state: typing.Dict[str, typing.Any]) -> None: 997 """ 998 Take in args from a parser that was passed to set_cli_args(), 999 and call init() with the appropriate arguments. 1000 """ 1001 1002 global _exchanges_out_dir # pylint: disable=global-statement 1003 if (args.http_exchanges_out_dir is not None): 1004 _exchanges_out_dir = args.http_exchanges_out_dir 1005 1006 global _exchanges_clean_func # pylint: disable=global-statement 1007 if (args.http_exchanges_clean_func is not None): 1008 _exchanges_clean_func = args.http_exchanges_clean_func
If the filename of an HTTPExhange being saved is longer than this, then clip it.
By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL. Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information.
Allowed HTTP methods for an HTTPExchange.
By default, ignore these headers during exchange matching. Some are sent automatically and we don't need to record (like content-length), and some are additional information we don't need.
107@typing.runtime_checkable 108class ResponseModifierFunction(typing.Protocol): 109 """ 110 A function that can be used to modify an exchange's response. 111 Exchanges can use these functions to normalize their responses before saving. 112 """ 113 114 def __call__(self, 115 response: requests.Response, 116 body: str, 117 ) -> str: 118 """ 119 Modify the http response. 120 Headers may be modified in the response directly, 121 while the modified (or same) body must be returned. 122 """
A function that can be used to modify an exchange's response. Exchanges can use these functions to normalize their responses before saving.
1953def _no_init_or_replace_init(self, *args, **kwargs): 1954 cls = type(self) 1955 1956 if cls._is_protocol: 1957 raise TypeError('Protocols cannot be instantiated') 1958 1959 # Already using a custom `__init__`. No need to calculate correct 1960 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1961 if cls.__init__ is not _no_init_or_replace_init: 1962 return 1963 1964 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1965 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1966 # searches for a proper new `__init__` in the MRO. The new `__init__` 1967 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1968 # instantiation of the protocol subclass will thus use the new 1969 # `__init__` and no longer call `_no_init_or_replace_init`. 1970 for base in cls.__mro__: 1971 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1972 if init is not _no_init_or_replace_init: 1973 cls.__init__ = init 1974 break 1975 else: 1976 # should not happen 1977 cls.__init__ = object.__init__ 1978 1979 cls.__init__(self, *args, **kwargs)
124class FileInfo(edq.util.json.DictConverter): 125 """ Store info about files used in HTTP exchanges. """ 126 127 def __init__(self, 128 path: typing.Union[str, None] = None, 129 name: typing.Union[str, None] = None, 130 content: typing.Union[str, bytes, None] = None, 131 b64_encoded: bool = False, 132 **kwargs: typing.Any) -> None: 133 # Normalize the path from POSIX-style to the system's style. 134 if (path is not None): 135 path = str(pathlib.PurePath(pathlib.PurePosixPath(path))) 136 137 self.path: typing.Union[str, None] = path 138 """ The on-disk path to a file. """ 139 140 if ((name is None) and (self.path is not None)): 141 name = os.path.basename(self.path) 142 143 if (name is None): 144 raise ValueError("No name was provided for file.") 145 146 self.name: str = name 147 """ The name for this file used in an HTTP request. """ 148 149 self.content: typing.Union[str, bytes, None] = content 150 """ The contents of this file. """ 151 152 self.b64_encoded: bool = b64_encoded 153 """ Whether the content is a string encoded in Base64. """ 154 155 if ((self.path is None) and (self.content is None)): 156 raise ValueError("File must have either path or content specified.") 157 158 def resolve_path(self, base_dir: str, load_file: bool = True) -> None: 159 """ Resolve this path relative to the given base dir. """ 160 161 if ((self.path is not None) and (not os.path.isabs(self.path))): 162 self.path = os.path.abspath(os.path.join(base_dir, self.path)) 163 164 if ((self.path is not None) and (self.content is None) and load_file): 165 self.content = edq.util.dirent.read_file_bytes(self.path) 166 167 def hash_content(self) -> str: 168 """ 169 Compute a hash for the content present. 170 If no content is provided, use the path. 171 """ 172 173 hash_content = self.content 174 175 if (self.b64_encoded and isinstance(hash_content, str)): 176 hash_content = edq.util.encoding.from_base64(hash_content) 177 178 if (hash_content is None): 179 hash_content = self.path 180 181 return edq.util.hash.sha256_hex(hash_content) 182 183 def to_dict(self) -> typing.Dict[str, typing.Any]: 184 data = vars(self).copy() 185 186 # JSON does not support raw bytes, so we will need to base64 encode any binary content. 187 if (isinstance(self.content, bytes)): 188 data['content'] = edq.util.encoding.to_base64(self.content) 189 data['b64_encoded'] = True 190 191 return data 192 193 @classmethod 194 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 195 return FileInfo(**data)
Store info about files used in HTTP exchanges.
127 def __init__(self, 128 path: typing.Union[str, None] = None, 129 name: typing.Union[str, None] = None, 130 content: typing.Union[str, bytes, None] = None, 131 b64_encoded: bool = False, 132 **kwargs: typing.Any) -> None: 133 # Normalize the path from POSIX-style to the system's style. 134 if (path is not None): 135 path = str(pathlib.PurePath(pathlib.PurePosixPath(path))) 136 137 self.path: typing.Union[str, None] = path 138 """ The on-disk path to a file. """ 139 140 if ((name is None) and (self.path is not None)): 141 name = os.path.basename(self.path) 142 143 if (name is None): 144 raise ValueError("No name was provided for file.") 145 146 self.name: str = name 147 """ The name for this file used in an HTTP request. """ 148 149 self.content: typing.Union[str, bytes, None] = content 150 """ The contents of this file. """ 151 152 self.b64_encoded: bool = b64_encoded 153 """ Whether the content is a string encoded in Base64. """ 154 155 if ((self.path is None) and (self.content is None)): 156 raise ValueError("File must have either path or content specified.")
158 def resolve_path(self, base_dir: str, load_file: bool = True) -> None: 159 """ Resolve this path relative to the given base dir. """ 160 161 if ((self.path is not None) and (not os.path.isabs(self.path))): 162 self.path = os.path.abspath(os.path.join(base_dir, self.path)) 163 164 if ((self.path is not None) and (self.content is None) and load_file): 165 self.content = edq.util.dirent.read_file_bytes(self.path)
Resolve this path relative to the given base dir.
167 def hash_content(self) -> str: 168 """ 169 Compute a hash for the content present. 170 If no content is provided, use the path. 171 """ 172 173 hash_content = self.content 174 175 if (self.b64_encoded and isinstance(hash_content, str)): 176 hash_content = edq.util.encoding.from_base64(hash_content) 177 178 if (hash_content is None): 179 hash_content = self.path 180 181 return edq.util.hash.sha256_hex(hash_content)
Compute a hash for the content present. If no content is provided, use the path.
183 def to_dict(self) -> typing.Dict[str, typing.Any]: 184 data = vars(self).copy() 185 186 # JSON does not support raw bytes, so we will need to base64 encode any binary content. 187 if (isinstance(self.content, bytes)): 188 data['content'] = edq.util.encoding.to_base64(self.content) 189 data['b64_encoded'] = True 190 191 return data
Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.
A general (but inefficient) implementation is provided by default.
193 @classmethod 194 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 195 return FileInfo(**data)
Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.
A general (but inefficient) implementation is provided by default.
197class HTTPExchange(edq.util.json.DictConverter): 198 """ 199 The request and response making up a full HTTP exchange. 200 """ 201 202 def __init__(self, 203 method: str = 'GET', 204 url: typing.Union[str, None] = None, 205 url_path: typing.Union[str, None] = None, 206 url_anchor: typing.Union[str, None] = None, 207 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 208 files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None, 209 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 210 allow_redirects: typing.Union[bool, None] = None, 211 response_code: int = http.HTTPStatus.OK, 212 response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 213 json_body: typing.Union[bool, None] = None, 214 response_body: typing.Union[str, dict, list, None] = None, 215 source_path: typing.Union[str, None] = None, 216 response_modifier: typing.Union[str, None] = None, 217 extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 218 **kwargs: typing.Any) -> None: 219 method = str(method).upper() 220 if (method not in ALLOWED_METHODS): 221 raise ValueError(f"Got unknown/disallowed method: '{method}'.") 222 223 self.method: str = method 224 """ The HTTP method for this exchange. """ 225 226 url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters) 227 228 self.url_path: str = url_path 229 """ 230 The path portion of the request URL. 231 Only the path (not domain, port, params, anchor, etc) should be included. 232 """ 233 234 self.url_anchor: typing.Union[str, None] = url_anchor 235 """ 236 The anchor portion of the request URL (if it exists). 237 """ 238 239 self.parameters: typing.Dict[str, typing.Any] = parameters 240 """ 241 The parameters/arguments for this request. 242 Parameters should be provided here and not encoded into URLs, 243 regardless of the request method. 244 With the exception of files, all parameters should be placed here. 245 """ 246 247 if (files is None): 248 files = [] 249 250 parsed_files = [] 251 for file in files: 252 if (isinstance(file, FileInfo)): 253 parsed_files.append(file) 254 else: 255 parsed_files.append(FileInfo(**file)) 256 257 self.files: typing.List[FileInfo] = parsed_files 258 """ 259 A list of files to include in the request. 260 The files are represented as dicts with a 261 "path" (path to the file on disk) and "name" (the filename to send in the request) field. 262 These paths must be POSIX-style paths, 263 they will be converted to system-specific paths. 264 Once this exchange is ready for use, these paths should be resolved (and probably absolute). 265 However, when serialized these paths should probably be relative. 266 To reconcile this, resolve_paths() should be called before using this exchange. 267 """ 268 269 if (headers is None): 270 headers = {} 271 272 self.headers: typing.Dict[str, typing.Any] = headers 273 """ Headers in the request. """ 274 275 if (allow_redirects is None): 276 allow_redirects = True 277 278 self.allow_redirects: bool = allow_redirects 279 """ Follow redirects. """ 280 281 self.response_code: int = response_code 282 """ The HTTP status code of the response. """ 283 284 if (response_headers is None): 285 response_headers = {} 286 287 self.response_headers: typing.Dict[str, typing.Any] = response_headers 288 """ Headers in the response. """ 289 290 if (json_body is None): 291 json_body = isinstance(response_body, (dict, list)) 292 293 self.json_body: bool = json_body 294 """ 295 Indicates that the response is JSON and should be converted to/from a string. 296 If the response body is passed in a dict/list and this is passed as None, 297 then this will be set as true. 298 """ 299 300 if (self.json_body and isinstance(response_body, (dict, list))): 301 response_body = edq.util.json.dumps(response_body) 302 303 self.response_body: typing.Union[str, None] = response_body # type: ignore[assignment] 304 """ 305 The response that should be sent in this exchange. 306 """ 307 308 self.response_modifier: typing.Union[str, None] = response_modifier 309 """ 310 This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) 311 before sent back to the caller. 312 This reference must be importable via edq.util.pyimport.fetch(). 313 """ 314 315 self.source_path: typing.Union[str, None] = source_path 316 """ 317 The path that this exchange was loaded from (if it was loaded from a file). 318 This value should never be serialized, but can be useful for testing. 319 """ 320 321 if (extra_options is None): 322 extra_options = {} 323 324 self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy() 325 """ 326 Additional options for this exchange. 327 This library will not use these options, but other's may. 328 kwargs will also be added to this. 329 """ 330 331 self.extra_options.update(kwargs) 332 333 def _parse_url_components(self, 334 url: typing.Union[str, None] = None, 335 url_path: typing.Union[str, None] = None, 336 url_anchor: typing.Union[str, None] = None, 337 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 338 ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]: 339 """ 340 Parse out all URL-based components from raw inputs. 341 The URL's path and anchor can either be supplied separately, or as part of the full given URL. 342 If content is present in both places, they much match (or an error will be raised). 343 Query parameters may be provided in the full URL, 344 but will be overwritten by any that are provided separately. 345 Any information from the URL aside from the path, anchor/fragment, and query will be ignored. 346 Note that path parameters (not query parameters) will be ignored. 347 The final url path, url anchor, and parameters will be returned. 348 """ 349 350 # Do base initialization and cleanup. 351 352 if (url_path is not None): 353 url_path = url_path.strip() 354 if (url_path == ''): 355 url_path = '' 356 else: 357 url_path = url_path.lstrip('/') 358 359 if (url_anchor is not None): 360 url_anchor = url_anchor.strip() 361 if (url_anchor == ''): 362 url_anchor = None 363 else: 364 url_anchor = url_anchor.lstrip('#') 365 366 if (parameters is None): 367 parameters = {} 368 369 # Parse the URL (if present). 370 371 if ((url is not None) and (url.strip() != '')): 372 parts = urllib.parse.urlparse(url) 373 374 # Handle the path. 375 376 path = parts.path.lstrip('/') 377 378 if ((url_path is not None) and (url_path != path)): 379 raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').") 380 381 url_path = path 382 383 # Check the optional anchor/fragment. 384 385 if (parts.fragment != ''): 386 fragment = parts.fragment.lstrip('#') 387 388 if ((url_anchor is not None) and (url_anchor != fragment)): 389 raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').") 390 391 url_anchor = fragment 392 393 # Check for any parameters. 394 395 url_params = parse_query_string(parts.query) 396 for (key, value) in url_params.items(): 397 if (key not in parameters): 398 parameters[key] = value 399 400 if (url_path is None): 401 raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.') 402 403 # Sort parameter keys for consistency. 404 parameters = {key: parameters[key] for key in sorted(parameters.keys())} 405 406 return url_path, url_anchor, parameters 407 408 def resolve_paths(self, base_dir: str) -> None: 409 """ Resolve any paths relative to the given base dir. """ 410 411 for file_info in self.files: 412 file_info.resolve_path(base_dir) 413 414 def match(self, query: 'HTTPExchange', 415 match_headers: bool = True, 416 headers_to_skip: typing.Union[typing.List[str], None] = None, 417 params_to_skip: typing.Union[typing.List[str], None] = None, 418 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 419 """ 420 Check if this exchange matches the query exchange. 421 If they match, `(True, None)` will be returned. 422 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 423 424 Note that this is not an equality check, 425 as a query exchange is often missing the response components. 426 This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange. 427 """ 428 429 if (query.method != self.method): 430 return False, f"HTTP method does not match (query = {query.method}, target = {self.method})." 431 432 if (query.url_path != self.url_path): 433 return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})." 434 435 if (query.url_anchor != self.url_anchor): 436 return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})." 437 438 if (headers_to_skip is None): 439 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 440 441 if (params_to_skip is None): 442 params_to_skip = [] 443 444 if (match_headers): 445 match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip) 446 if (not match): 447 return False, hint 448 449 match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip) 450 if (not match): 451 return False, hint 452 453 # Check file names and hash contents. 454 query_filenames = {(file.name, file.hash_content()) for file in query.files} 455 target_filenames = {(file.name, file.hash_content()) for file in self.files} 456 if (query_filenames != target_filenames): 457 return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})." 458 459 return True, None 460 461 def _match_dict(self, label: str, 462 query_dict: typing.Dict[str, typing.Any], 463 target_dict: typing.Dict[str, typing.Any], 464 keys_to_skip: typing.Union[typing.List[str], None] = None, 465 query_label: str = 'query', 466 target_label: str = 'target', 467 normalize_key_case: bool = True, 468 ) -> typing.Tuple[bool, typing.Union[str, None]]: 469 """ A subcheck in match(), specifically for a dictionary. """ 470 471 if (keys_to_skip is None): 472 keys_to_skip = [] 473 474 if (normalize_key_case): 475 keys_to_skip = [key.lower() for key in keys_to_skip] 476 query_dict = {key.lower(): value for (key, value) in query_dict.items()} 477 target_dict = {key.lower(): value for (key, value) in target_dict.items()} 478 479 query_keys = set(query_dict.keys()) - set(keys_to_skip) 480 target_keys = set(target_dict.keys()) - set(keys_to_skip) 481 482 if (query_keys != target_keys): 483 return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})." 484 485 for key in sorted(query_keys): 486 query_value = query_dict[key] 487 target_value = target_dict[key] 488 489 if (query_value != target_value): 490 comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'" 491 return False, f"{label.title()} '{key}' has a non-matching value ({comparison})." 492 493 return True, None 494 495 def get_url(self) -> str: 496 """ Get the URL path and anchor combined. """ 497 498 url = self.url_path 499 500 if (self.url_anchor is not None): 501 url += ('#' + self.url_anchor) 502 503 return url 504 505 def make_request(self, base_url: str, raise_for_status: bool = True, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 506 """ Perform the HTTP request described by this exchange. """ 507 508 files = [] 509 for file_info in self.files: 510 content = file_info.content 511 512 # Content is base64 encoded. 513 if (file_info.b64_encoded and isinstance(content, str)): 514 content = edq.util.encoding.from_base64(content) 515 516 # Content is missing and must be in a file. 517 if (content is None): 518 content = open(file_info.path, 'rb') # type: ignore[assignment,arg-type] # pylint: disable=consider-using-with 519 520 files.append((file_info.name, content)) 521 522 url = f"{base_url}/{self.get_url()}" 523 524 response, body = make_request(self.method, url, 525 headers = self.headers, 526 data = self.parameters, 527 files = files, 528 raise_for_status = raise_for_status, 529 allow_redirects = self.allow_redirects, 530 **kwargs, 531 ) 532 533 if (self.response_modifier is not None): 534 modify_func = edq.util.pyimport.fetch(self.response_modifier) 535 body = modify_func(response, body) 536 537 return response, body 538 539 def match_response(self, response: requests.Response, 540 override_body: typing.Union[str, None] = None, 541 headers_to_skip: typing.Union[typing.List[str], None] = None, 542 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 543 """ 544 Check if this exchange matches the given response. 545 If they match, `(True, None)` will be returned. 546 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 547 """ 548 549 if (headers_to_skip is None): 550 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 551 552 response_body = override_body 553 if (response_body is None): 554 response_body = response.text 555 556 if (self.response_code != response.status_code): 557 return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})" 558 559 expected_body = self.response_body 560 actual_body = None 561 562 if (self.json_body): 563 actual_body = response.json() 564 565 # Normalize the actual and expected bodies. 566 567 actual_body = edq.util.json.dumps(actual_body) 568 569 if (isinstance(expected_body, str)): 570 expected_body = edq.util.json.loads(expected_body) 571 572 expected_body = edq.util.json.dumps(expected_body) 573 else: 574 actual_body = response_body 575 576 if (self.response_body != actual_body): 577 body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'" 578 return False, f"body does not match ({body_hint})" 579 580 match, hint = self._match_dict('header', response.headers, self.response_headers, 581 keys_to_skip = headers_to_skip, 582 query_label = 'response', target_label = 'exchange') 583 584 if (not match): 585 return False, hint 586 587 return True, None 588 589 def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str: 590 """ Create a consistent, semi-unique, and relative path for this exchange. """ 591 592 url = self.get_url().strip() 593 parts = url.split('/') 594 595 596 if (url in ['', '/']): 597 filename = '_index_' 598 dirname = '' 599 else: 600 filename = parts[-1] 601 602 if (len(parts) > 1): 603 dirname = os.path.join(*parts[0:-1]) 604 else: 605 dirname = '' 606 607 parameters = {} 608 for key in sorted(self.parameters.keys()): 609 parameters[key] = self.parameters[key] 610 611 # Treat files as params as well. 612 for file_info in self.files: 613 parameters[f"file-{file_info.name}"] = file_info.hash_content() 614 615 query = urllib.parse.urlencode(parameters) 616 if (query != ''): 617 # The query can get very long, so we may have to clip it. 618 query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH) 619 620 # Note that the '?' is URL encoded. 621 filename += f"%3F{query_text}" 622 623 filename += f"_{self.method}{http_exchange_extension}" 624 625 return os.path.join(dirname, filename) 626 627 def to_dict(self) -> typing.Dict[str, typing.Any]: 628 return vars(self) 629 630 @classmethod 631 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 632 return HTTPExchange(**data) 633 634 @classmethod 635 def from_path(cls, path: str, 636 set_source_path: bool = True, 637 ) -> 'HTTPExchange': 638 """ 639 Load an exchange from a file. 640 This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths. 641 """ 642 643 exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange)) 644 645 if (set_source_path): 646 exchange.source_path = os.path.abspath(path) 647 648 exchange.resolve_paths(os.path.abspath(os.path.dirname(path))) 649 650 return exchange 651 652 @classmethod 653 def from_response(cls, 654 response: requests.Response, 655 headers_to_skip: typing.Union[typing.List[str], None] = None, 656 params_to_skip: typing.Union[typing.List[str], None] = None, 657 allow_redirects: typing.Union[bool, None] = None, 658 ) -> 'HTTPExchange': 659 """ Create a full excahnge from a response. """ 660 661 if (headers_to_skip is None): 662 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 663 664 if (params_to_skip is None): 665 params_to_skip = [] 666 667 body = response.text 668 669 # Use a clean function (if one exists). 670 if (_exchanges_clean_func is not None): 671 modify_func = edq.util.pyimport.fetch(_exchanges_clean_func) 672 body = modify_func(response, body) 673 674 request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()} 675 response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()} 676 677 # Clean headers. 678 for key in headers_to_skip: 679 key = key.lower() 680 681 request_headers.pop(key, None) 682 response_headers.pop(key, None) 683 684 request_data, request_files = parse_request_data(response.request.url, response.request.headers, response.request.body) 685 686 # Clean parameters. 687 for key in params_to_skip: 688 request_data.pop(key, None) 689 690 files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()] 691 692 data = { 693 'method': response.request.method, 694 'url': response.request.url, 695 'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None), 696 'parameters': request_data, 697 'files': files, 698 'headers': request_headers, 699 'response_code': response.status_code, 700 'response_headers': response_headers, 701 'response_body': body, 702 'response_modifier': _exchanges_clean_func, 703 'allow_redirects': allow_redirects, 704 } 705 706 return HTTPExchange(**data)
The request and response making up a full HTTP exchange.
202 def __init__(self, 203 method: str = 'GET', 204 url: typing.Union[str, None] = None, 205 url_path: typing.Union[str, None] = None, 206 url_anchor: typing.Union[str, None] = None, 207 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 208 files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None, 209 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 210 allow_redirects: typing.Union[bool, None] = None, 211 response_code: int = http.HTTPStatus.OK, 212 response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 213 json_body: typing.Union[bool, None] = None, 214 response_body: typing.Union[str, dict, list, None] = None, 215 source_path: typing.Union[str, None] = None, 216 response_modifier: typing.Union[str, None] = None, 217 extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 218 **kwargs: typing.Any) -> None: 219 method = str(method).upper() 220 if (method not in ALLOWED_METHODS): 221 raise ValueError(f"Got unknown/disallowed method: '{method}'.") 222 223 self.method: str = method 224 """ The HTTP method for this exchange. """ 225 226 url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters) 227 228 self.url_path: str = url_path 229 """ 230 The path portion of the request URL. 231 Only the path (not domain, port, params, anchor, etc) should be included. 232 """ 233 234 self.url_anchor: typing.Union[str, None] = url_anchor 235 """ 236 The anchor portion of the request URL (if it exists). 237 """ 238 239 self.parameters: typing.Dict[str, typing.Any] = parameters 240 """ 241 The parameters/arguments for this request. 242 Parameters should be provided here and not encoded into URLs, 243 regardless of the request method. 244 With the exception of files, all parameters should be placed here. 245 """ 246 247 if (files is None): 248 files = [] 249 250 parsed_files = [] 251 for file in files: 252 if (isinstance(file, FileInfo)): 253 parsed_files.append(file) 254 else: 255 parsed_files.append(FileInfo(**file)) 256 257 self.files: typing.List[FileInfo] = parsed_files 258 """ 259 A list of files to include in the request. 260 The files are represented as dicts with a 261 "path" (path to the file on disk) and "name" (the filename to send in the request) field. 262 These paths must be POSIX-style paths, 263 they will be converted to system-specific paths. 264 Once this exchange is ready for use, these paths should be resolved (and probably absolute). 265 However, when serialized these paths should probably be relative. 266 To reconcile this, resolve_paths() should be called before using this exchange. 267 """ 268 269 if (headers is None): 270 headers = {} 271 272 self.headers: typing.Dict[str, typing.Any] = headers 273 """ Headers in the request. """ 274 275 if (allow_redirects is None): 276 allow_redirects = True 277 278 self.allow_redirects: bool = allow_redirects 279 """ Follow redirects. """ 280 281 self.response_code: int = response_code 282 """ The HTTP status code of the response. """ 283 284 if (response_headers is None): 285 response_headers = {} 286 287 self.response_headers: typing.Dict[str, typing.Any] = response_headers 288 """ Headers in the response. """ 289 290 if (json_body is None): 291 json_body = isinstance(response_body, (dict, list)) 292 293 self.json_body: bool = json_body 294 """ 295 Indicates that the response is JSON and should be converted to/from a string. 296 If the response body is passed in a dict/list and this is passed as None, 297 then this will be set as true. 298 """ 299 300 if (self.json_body and isinstance(response_body, (dict, list))): 301 response_body = edq.util.json.dumps(response_body) 302 303 self.response_body: typing.Union[str, None] = response_body # type: ignore[assignment] 304 """ 305 The response that should be sent in this exchange. 306 """ 307 308 self.response_modifier: typing.Union[str, None] = response_modifier 309 """ 310 This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) 311 before sent back to the caller. 312 This reference must be importable via edq.util.pyimport.fetch(). 313 """ 314 315 self.source_path: typing.Union[str, None] = source_path 316 """ 317 The path that this exchange was loaded from (if it was loaded from a file). 318 This value should never be serialized, but can be useful for testing. 319 """ 320 321 if (extra_options is None): 322 extra_options = {} 323 324 self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy() 325 """ 326 Additional options for this exchange. 327 This library will not use these options, but other's may. 328 kwargs will also be added to this. 329 """ 330 331 self.extra_options.update(kwargs)
The path portion of the request URL. Only the path (not domain, port, params, anchor, etc) should be included.
The parameters/arguments for this request. Parameters should be provided here and not encoded into URLs, regardless of the request method. With the exception of files, all parameters should be placed here.
A list of files to include in the request. The files are represented as dicts with a "path" (path to the file on disk) and "name" (the filename to send in the request) field. These paths must be POSIX-style paths, they will be converted to system-specific paths. Once this exchange is ready for use, these paths should be resolved (and probably absolute). However, when serialized these paths should probably be relative. To reconcile this, resolve_paths() should be called before using this exchange.
Indicates that the response is JSON and should be converted to/from a string. If the response body is passed in a dict/list and this is passed as None, then this will be set as true.
This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) before sent back to the caller. This reference must be importable via edq.util.pyimport.fetch().
The path that this exchange was loaded from (if it was loaded from a file). This value should never be serialized, but can be useful for testing.
Additional options for this exchange. This library will not use these options, but other's may. kwargs will also be added to this.
408 def resolve_paths(self, base_dir: str) -> None: 409 """ Resolve any paths relative to the given base dir. """ 410 411 for file_info in self.files: 412 file_info.resolve_path(base_dir)
Resolve any paths relative to the given base dir.
414 def match(self, query: 'HTTPExchange', 415 match_headers: bool = True, 416 headers_to_skip: typing.Union[typing.List[str], None] = None, 417 params_to_skip: typing.Union[typing.List[str], None] = None, 418 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 419 """ 420 Check if this exchange matches the query exchange. 421 If they match, `(True, None)` will be returned. 422 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 423 424 Note that this is not an equality check, 425 as a query exchange is often missing the response components. 426 This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange. 427 """ 428 429 if (query.method != self.method): 430 return False, f"HTTP method does not match (query = {query.method}, target = {self.method})." 431 432 if (query.url_path != self.url_path): 433 return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})." 434 435 if (query.url_anchor != self.url_anchor): 436 return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})." 437 438 if (headers_to_skip is None): 439 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 440 441 if (params_to_skip is None): 442 params_to_skip = [] 443 444 if (match_headers): 445 match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip) 446 if (not match): 447 return False, hint 448 449 match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip) 450 if (not match): 451 return False, hint 452 453 # Check file names and hash contents. 454 query_filenames = {(file.name, file.hash_content()) for file in query.files} 455 target_filenames = {(file.name, file.hash_content()) for file in self.files} 456 if (query_filenames != target_filenames): 457 return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})." 458 459 return True, None
Check if this exchange matches the query exchange.
If they match, (True, None) will be returned.
If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.
Note that this is not an equality check, as a query exchange is often missing the response components. This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
495 def get_url(self) -> str: 496 """ Get the URL path and anchor combined. """ 497 498 url = self.url_path 499 500 if (self.url_anchor is not None): 501 url += ('#' + self.url_anchor) 502 503 return url
Get the URL path and anchor combined.
505 def make_request(self, base_url: str, raise_for_status: bool = True, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 506 """ Perform the HTTP request described by this exchange. """ 507 508 files = [] 509 for file_info in self.files: 510 content = file_info.content 511 512 # Content is base64 encoded. 513 if (file_info.b64_encoded and isinstance(content, str)): 514 content = edq.util.encoding.from_base64(content) 515 516 # Content is missing and must be in a file. 517 if (content is None): 518 content = open(file_info.path, 'rb') # type: ignore[assignment,arg-type] # pylint: disable=consider-using-with 519 520 files.append((file_info.name, content)) 521 522 url = f"{base_url}/{self.get_url()}" 523 524 response, body = make_request(self.method, url, 525 headers = self.headers, 526 data = self.parameters, 527 files = files, 528 raise_for_status = raise_for_status, 529 allow_redirects = self.allow_redirects, 530 **kwargs, 531 ) 532 533 if (self.response_modifier is not None): 534 modify_func = edq.util.pyimport.fetch(self.response_modifier) 535 body = modify_func(response, body) 536 537 return response, body
Perform the HTTP request described by this exchange.
539 def match_response(self, response: requests.Response, 540 override_body: typing.Union[str, None] = None, 541 headers_to_skip: typing.Union[typing.List[str], None] = None, 542 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 543 """ 544 Check if this exchange matches the given response. 545 If they match, `(True, None)` will be returned. 546 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 547 """ 548 549 if (headers_to_skip is None): 550 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 551 552 response_body = override_body 553 if (response_body is None): 554 response_body = response.text 555 556 if (self.response_code != response.status_code): 557 return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})" 558 559 expected_body = self.response_body 560 actual_body = None 561 562 if (self.json_body): 563 actual_body = response.json() 564 565 # Normalize the actual and expected bodies. 566 567 actual_body = edq.util.json.dumps(actual_body) 568 569 if (isinstance(expected_body, str)): 570 expected_body = edq.util.json.loads(expected_body) 571 572 expected_body = edq.util.json.dumps(expected_body) 573 else: 574 actual_body = response_body 575 576 if (self.response_body != actual_body): 577 body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'" 578 return False, f"body does not match ({body_hint})" 579 580 match, hint = self._match_dict('header', response.headers, self.response_headers, 581 keys_to_skip = headers_to_skip, 582 query_label = 'response', target_label = 'exchange') 583 584 if (not match): 585 return False, hint 586 587 return True, None
Check if this exchange matches the given response.
If they match, (True, None) will be returned.
If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.
589 def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str: 590 """ Create a consistent, semi-unique, and relative path for this exchange. """ 591 592 url = self.get_url().strip() 593 parts = url.split('/') 594 595 596 if (url in ['', '/']): 597 filename = '_index_' 598 dirname = '' 599 else: 600 filename = parts[-1] 601 602 if (len(parts) > 1): 603 dirname = os.path.join(*parts[0:-1]) 604 else: 605 dirname = '' 606 607 parameters = {} 608 for key in sorted(self.parameters.keys()): 609 parameters[key] = self.parameters[key] 610 611 # Treat files as params as well. 612 for file_info in self.files: 613 parameters[f"file-{file_info.name}"] = file_info.hash_content() 614 615 query = urllib.parse.urlencode(parameters) 616 if (query != ''): 617 # The query can get very long, so we may have to clip it. 618 query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH) 619 620 # Note that the '?' is URL encoded. 621 filename += f"%3F{query_text}" 622 623 filename += f"_{self.method}{http_exchange_extension}" 624 625 return os.path.join(dirname, filename)
Create a consistent, semi-unique, and relative path for this exchange.
Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.
A general (but inefficient) implementation is provided by default.
630 @classmethod 631 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 632 return HTTPExchange(**data)
Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.
A general (but inefficient) implementation is provided by default.
634 @classmethod 635 def from_path(cls, path: str, 636 set_source_path: bool = True, 637 ) -> 'HTTPExchange': 638 """ 639 Load an exchange from a file. 640 This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths. 641 """ 642 643 exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange)) 644 645 if (set_source_path): 646 exchange.source_path = os.path.abspath(path) 647 648 exchange.resolve_paths(os.path.abspath(os.path.dirname(path))) 649 650 return exchange
Load an exchange from a file. This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
652 @classmethod 653 def from_response(cls, 654 response: requests.Response, 655 headers_to_skip: typing.Union[typing.List[str], None] = None, 656 params_to_skip: typing.Union[typing.List[str], None] = None, 657 allow_redirects: typing.Union[bool, None] = None, 658 ) -> 'HTTPExchange': 659 """ Create a full excahnge from a response. """ 660 661 if (headers_to_skip is None): 662 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 663 664 if (params_to_skip is None): 665 params_to_skip = [] 666 667 body = response.text 668 669 # Use a clean function (if one exists). 670 if (_exchanges_clean_func is not None): 671 modify_func = edq.util.pyimport.fetch(_exchanges_clean_func) 672 body = modify_func(response, body) 673 674 request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()} 675 response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()} 676 677 # Clean headers. 678 for key in headers_to_skip: 679 key = key.lower() 680 681 request_headers.pop(key, None) 682 response_headers.pop(key, None) 683 684 request_data, request_files = parse_request_data(response.request.url, response.request.headers, response.request.body) 685 686 # Clean parameters. 687 for key in params_to_skip: 688 request_data.pop(key, None) 689 690 files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()] 691 692 data = { 693 'method': response.request.method, 694 'url': response.request.url, 695 'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None), 696 'parameters': request_data, 697 'files': files, 698 'headers': request_headers, 699 'response_code': response.status_code, 700 'response_headers': response_headers, 701 'response_body': body, 702 'response_modifier': _exchanges_clean_func, 703 'allow_redirects': allow_redirects, 704 } 705 706 return HTTPExchange(**data)
Create a full excahnge from a response.
708@typing.runtime_checkable 709class HTTPExchangeComplete(typing.Protocol): 710 """ 711 A function that can be called after a request has been made (and exchange constructed). 712 """ 713 714 def __call__(self, 715 exchange: HTTPExchange 716 ) -> str: 717 """ 718 Called after an HTTP exchange has been completed. 719 """
A function that can be called after a request has been made (and exchange constructed).
1953def _no_init_or_replace_init(self, *args, **kwargs): 1954 cls = type(self) 1955 1956 if cls._is_protocol: 1957 raise TypeError('Protocols cannot be instantiated') 1958 1959 # Already using a custom `__init__`. No need to calculate correct 1960 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1961 if cls.__init__ is not _no_init_or_replace_init: 1962 return 1963 1964 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1965 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1966 # searches for a proper new `__init__` in the MRO. The new `__init__` 1967 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1968 # instantiation of the protocol subclass will thus use the new 1969 # `__init__` and no longer call `_no_init_or_replace_init`. 1970 for base in cls.__mro__: 1971 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1972 if init is not _no_init_or_replace_init: 1973 cls.__init__ = init 1974 break 1975 else: 1976 # should not happen 1977 cls.__init__ = object.__init__ 1978 1979 cls.__init__(self, *args, **kwargs)
724def find_open_port( 725 start_port: int = DEFAULT_START_PORT, end_port: int = DEFAULT_END_PORT, 726 wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC) -> int: 727 """ 728 Find an open port on this machine within the given range (inclusive). 729 If no open port is found, an error is raised. 730 """ 731 732 for port in range(start_port, end_port + 1): 733 try: 734 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 735 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 736 sock.bind(('127.0.0.1', port)) 737 738 # Explicitly close the port and wait a short amount of time for the port to clear. 739 # This should not be required because of the socket option above, 740 # but the cost is small. 741 sock.close() 742 time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC) 743 744 return port 745 except socket.error as ex: 746 sock.close() 747 748 if (ex.errno == errno.EADDRINUSE): 749 continue 750 751 # Unknown error. 752 raise ex 753 754 raise ValueError(f"Could not find open port in [{start_port}, {end_port}].")
Find an open port on this machine within the given range (inclusive). If no open port is found, an error is raised.
756def make_request(method: str, url: str, 757 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 758 data: typing.Union[typing.Dict[str, typing.Any], None] = None, 759 files: typing.Union[typing.List[typing.Any], None] = None, 760 raise_for_status: bool = True, 761 timeout_secs: float = DEFAULT_REQUEST_TIMEOUT_SECS, 762 output_dir: typing.Union[str, None] = None, 763 send_anchor_header: bool = True, 764 headers_to_skip: typing.Union[typing.List[str], None] = None, 765 params_to_skip: typing.Union[typing.List[str], None] = None, 766 http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION, 767 add_http_prefix: bool = True, 768 additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 769 exchange_complete_func: typing.Union[HTTPExchangeComplete, None] = None, 770 allow_redirects: typing.Union[bool, None] = None, 771 **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 772 """ 773 Make an HTTP request and return the response object and text body. 774 """ 775 776 if (add_http_prefix and (not url.lower().startswith('http'))): 777 url = 'http://' + url 778 779 if (output_dir is None): 780 output_dir = _exchanges_out_dir 781 782 if (headers is None): 783 headers = {} 784 785 if (data is None): 786 data = {} 787 788 if (files is None): 789 files = [] 790 791 if (additional_requests_options is None): 792 additional_requests_options = {} 793 794 # Add in the anchor as a header (since it is not traditionally sent in an HTTP request). 795 if (send_anchor_header): 796 headers = headers.copy() 797 798 parts = urllib.parse.urlparse(url) 799 headers[ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#') 800 801 options = additional_requests_options.copy() 802 options.update({ 803 'headers': headers, 804 'files': files, 805 'timeout': timeout_secs, 806 }) 807 808 if (allow_redirects is not None): 809 options['allow_redirects'] = allow_redirects 810 811 if (method == 'GET'): 812 options['params'] = data 813 else: 814 options['data'] = data 815 816 logging.debug("Making %s request: '%s' (options = %s).", method, url, options) 817 response = requests.request(method, url, **options) 818 819 body = response.text 820 logging.debug("Response:\n%s", body) 821 822 if (raise_for_status): 823 # Handle 404s a little special, as their body may contain useful information. 824 if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')): 825 response.reason += f" (Body: '{body.strip()}')" 826 827 response.raise_for_status() 828 829 exchange = None 830 if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)): 831 exchange = HTTPExchange.from_response(response, 832 headers_to_skip = headers_to_skip, params_to_skip = params_to_skip, 833 allow_redirects = options.get('allow_redirects', None)) 834 835 if ((output_dir is not None) and (exchange is not None)): 836 relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension) 837 path = os.path.abspath(os.path.join(output_dir, relpath)) 838 839 edq.util.dirent.mkdir(os.path.dirname(path)) 840 edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False) 841 842 if ((exchange_complete_func is not None) and (exchange is not None)): 843 exchange_complete_func(exchange) 844 845 if ((_make_request_exchange_complete_func is not None) and (exchange is not None)): 846 _make_request_exchange_complete_func(exchange) # pylint: disable=not-callable 847 848 return response, body
Make an HTTP request and return the response object and text body.
850def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 851 """ 852 Make a GET request and return the response object and text body. 853 """ 854 855 return make_request('GET', url, **kwargs)
Make a GET request and return the response object and text body.
857def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]: 858 """ 859 Make a POST request and return the response object and text body. 860 """ 861 862 return make_request('POST', url, **kwargs)
Make a POST request and return the response object and text body.
864def parse_request_data( 865 url: str, 866 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 867 body: typing.Union[bytes, str, io.BufferedIOBase], 868 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 869 """ Parse data and files from an HTTP request URL and body. """ 870 871 # Parse data from the request body. 872 request_data, request_files = parse_request_body_data(headers, body) 873 874 # Parse parameters from the URL. 875 url_parts = urllib.parse.urlparse(url) 876 request_data.update(parse_query_string(url_parts.query)) 877 878 return request_data, request_files
Parse data and files from an HTTP request URL and body.
880def parse_request_body_data( 881 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 882 body: typing.Union[bytes, str, io.BufferedIOBase], 883 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 884 """ Parse data and files from an HTTP request body. """ 885 886 data: typing.Dict[str, typing.Any] = {} 887 files: typing.Dict[str, bytes] = {} 888 889 length = int(headers.get('Content-Length', 0)) 890 if (length == 0): 891 return data, files 892 893 if (isinstance(body, io.BufferedIOBase)): 894 raw_content = body.read(length) 895 elif (isinstance(body, str)): 896 raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING) 897 else: 898 raw_content = body 899 900 content_type = headers.get('Content-Type', '') 901 902 if (content_type in ['', 'application/x-www-form-urlencoded']): 903 data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip()) 904 return data, files 905 906 if (content_type.startswith('multipart/form-data')): 907 decoder = requests_toolbelt.multipart.decoder.MultipartDecoder( 908 raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING) 909 910 for multipart_section in decoder.parts: 911 values = parse_content_dispositions(multipart_section.headers) 912 913 name = values.get('name', None) 914 if (name is None): 915 raise ValueError("Could not find name for multipart section.") 916 917 # Look for a "filename" field to indicate a multipart section is a file. 918 # The file's desired name is still in "name", but an alternate name is in "filename". 919 if ('filename' in values): 920 filename = values.get('name', '') 921 if (filename == ''): 922 raise ValueError("Unable to find filename for multipart section.") 923 924 files[filename] = multipart_section.content 925 else: 926 # Normal Parameter 927 data[name] = multipart_section.text 928 929 return data, files 930 931 raise ValueError(f"Unknown content type: '{content_type}'.")
Parse data and files from an HTTP request body.
933def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]: 934 """ Parse a request's content dispositions from headers. """ 935 936 values = {} 937 for (key, value) in headers.items(): 938 if (isinstance(key, bytes)): 939 key = key.decode(edq.util.dirent.DEFAULT_ENCODING) 940 941 if (isinstance(value, bytes)): 942 value = value.decode(edq.util.dirent.DEFAULT_ENCODING) 943 944 key = key.strip().lower() 945 if (key != 'content-disposition'): 946 continue 947 948 # The Python stdlib recommends using the email library for this parsing, 949 # but I have not had a good experience with it. 950 for part in value.strip().split(';'): 951 part = part.strip() 952 953 parts = part.split('=') 954 if (len(parts) != 2): 955 continue 956 957 cd_key = parts[0].strip() 958 cd_value = parts[1].strip().strip('"') 959 960 values[cd_key] = cd_value 961 962 return values
Parse a request's content dispositions from headers.
964def parse_query_string(text: str, 965 replace_single_lists: bool = True, 966 keep_blank_values: bool = True, 967 **kwargs: typing.Any) -> typing.Dict[str, typing.Any]: 968 """ 969 Parse a query string (like urllib.parse.parse_qs()), and normalize the result. 970 If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value. 971 """ 972 973 results = urllib.parse.parse_qs(text, keep_blank_values = True) 974 for (key, value) in results.items(): 975 if (replace_single_lists and (len(value) == 1)): 976 results[key] = value[0] # type: ignore[assignment] 977 978 return results
Parse a query string (like urllib.parse.parse_qs()), and normalize the result. If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.
980def set_cli_args(parser: argparse.ArgumentParser, extra_state: typing.Dict[str, typing.Any]) -> None: 981 """ 982 Set common CLI arguments. 983 This is a sibling to init_from_args(), as the arguments set here can be interpreted there. 984 """ 985 986 parser.add_argument('--http-exchanges-out-dir', dest = 'http_exchanges_out_dir', 987 action = 'store', type = str, default = None, 988 help = 'If set, write all outgoing HTTP requests as exchanges to this directory.') 989 990 parser.add_argument('--http-exchanges-clean-func', dest = 'http_exchanges_clean_func', 991 action = 'store', type = str, default = None, 992 help = 'If set, default all created exchanges to this modifier function.')
Set common CLI arguments. This is a sibling to init_from_args(), as the arguments set here can be interpreted there.
994def init_from_args( 995 parser: argparse.ArgumentParser, 996 args: argparse.Namespace, 997 extra_state: typing.Dict[str, typing.Any]) -> None: 998 """ 999 Take in args from a parser that was passed to set_cli_args(), 1000 and call init() with the appropriate arguments. 1001 """ 1002 1003 global _exchanges_out_dir # pylint: disable=global-statement 1004 if (args.http_exchanges_out_dir is not None): 1005 _exchanges_out_dir = args.http_exchanges_out_dir 1006 1007 global _exchanges_clean_func # pylint: disable=global-statement 1008 if (args.http_exchanges_clean_func is not None): 1009 _exchanges_clean_func = args.http_exchanges_clean_func
Take in args from a parser that was passed to set_cli_args(), and call init() with the appropriate arguments.