edq.net.exchange
1import copy 2import http 3import os 4import pathlib 5import typing 6import urllib.parse 7 8import requests 9 10import edq.net.util 11import edq.util.dirent 12import edq.util.encoding 13import edq.util.hash 14import edq.util.json 15import edq.util.pyimport 16 17DEFAULT_HTTP_EXCHANGE_EXTENSION: str= '.httpex.json' 18 19QUERY_CLIP_LENGTH: int = 100 20""" If the filename of an HTTPExhange being saved is longer than this, then clip it. """ 21 22ANCHOR_HEADER_KEY: str = 'edq-anchor' 23""" 24By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL. 25Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information. 26""" 27 28ALLOWED_METHODS: typing.List[str] = [ 29 'DELETE', 30 'GET', 31 'HEAD', 32 'OPTIONS', 33 'PATCH', 34 'POST', 35 'PUT', 36] 37""" Allowed HTTP methods for an HTTPExchange. """ 38 39DEFAULT_EXCHANGE_IGNORE_HEADERS: typing.List[str] = [ 40 'accept', 41 'accept-encoding', 42 'accept-language', 43 'cache-control', 44 'connection', 45 'content-length', 46 'content-security-policy', 47 'content-type', 48 'cookie', 49 'date', 50 'dnt', 51 'etag', 52 'host', 53 'link', 54 'location', 55 'priority', 56 'referrer-policy', 57 'sec-fetch-dest', 58 'sec-fetch-mode', 59 'sec-fetch-site', 60 'sec-fetch-user', 61 'sec-gpc', 62 'server', 63 'server-timing', 64 'set-cookie', 65 'upgrade-insecure-requests', 66 'user-agent', 67 'x-content-type-options', 68 'x-download-options', 69 'x-permitted-cross-domain-policies', 70 'x-rate-limit-remaining', 71 'x-request-context-id', 72 'x-request-cost', 73 'x-runtime', 74 'x-session-id', 75 'x-xss-protection', 76 ANCHOR_HEADER_KEY, 77] 78""" 79By default, ignore these headers during exchange matching. 80Some are sent automatically and we don't need to record (like content-length), 81and some are additional information we don't need. 82""" 83 84_exchanges_clean_func: typing.Union[str, None] = None # pylint: disable=invalid-name 85""" 86If not None, all created exchanges (in HTTPExchange.make_request() and HTTPExchange.from_response()) will use this response modifier. 87This function will be called with the response and response body before parsing the rest of the data to build the exchange. 88""" 89 90_exchanges_finalize_func: typing.Union[str, None] = None # pylint: disable=invalid-name 91""" 92If not None, all created exchanges (in HTTPExchange.make_request()) will use this finalize function. 93This function will be called with the created exchange right after construction and before passing back to the caller 94(or writing). 95""" 96 97class FileInfo(edq.util.json.DictConverter): 98 """ Store info about files used in HTTP exchanges. """ 99 100 def __init__(self, 101 path: typing.Union[str, None] = None, 102 name: typing.Union[str, None] = None, 103 content: typing.Union[str, bytes, None] = None, 104 b64_encoded: bool = False, 105 **kwargs: typing.Any) -> None: 106 # Normalize the path from POSIX-style to the system's style. 107 if (path is not None): 108 path = str(pathlib.PurePath(pathlib.PurePosixPath(path))) 109 110 self.path: typing.Union[str, None] = path 111 """ The on-disk path to a file. """ 112 113 if ((name is None) and (self.path is not None)): 114 name = os.path.basename(self.path) 115 116 if (name is None): 117 raise ValueError("No name was provided for file.") 118 119 self.name: str = name 120 """ The name for this file used in an HTTP request. """ 121 122 self.content: typing.Union[str, bytes, None] = content 123 """ The contents of this file. """ 124 125 self.b64_encoded: bool = b64_encoded 126 """ Whether the content is a string encoded in Base64. """ 127 128 if ((self.path is None) and (self.content is None)): 129 raise ValueError("File must have either path or content specified.") 130 131 def resolve_path(self, base_dir: str, load_file: bool = True) -> None: 132 """ Resolve this path relative to the given base dir. """ 133 134 if ((self.path is not None) and (not os.path.isabs(self.path))): 135 self.path = os.path.abspath(os.path.join(base_dir, self.path)) 136 137 if ((self.path is not None) and (self.content is None) and load_file): 138 self.content = edq.util.dirent.read_file_bytes(self.path) 139 140 def hash_content(self) -> str: 141 """ 142 Compute a hash for the content present. 143 If no content is provided, use the path. 144 """ 145 146 hash_content = self.content 147 148 if (self.b64_encoded and isinstance(hash_content, str)): 149 hash_content = edq.util.encoding.from_base64(hash_content) 150 151 if (hash_content is None): 152 hash_content = self.path 153 154 return edq.util.hash.sha256_hex(hash_content) 155 156 def to_dict(self) -> typing.Dict[str, typing.Any]: 157 data = vars(self).copy() 158 159 # JSON does not support raw bytes, so we will need to base64 encode any binary content. 160 if (isinstance(self.content, bytes)): 161 data['content'] = edq.util.encoding.to_base64(self.content) 162 data['b64_encoded'] = True 163 164 return data 165 166 @classmethod 167 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 168 return FileInfo(**data) 169 170class HTTPExchange(edq.util.json.DictConverter): 171 """ 172 The request and response making up a full HTTP exchange. 173 """ 174 175 def __init__(self, 176 method: str = 'GET', 177 url: typing.Union[str, None] = None, 178 url_path: typing.Union[str, None] = None, 179 url_anchor: typing.Union[str, None] = None, 180 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 181 files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None, 182 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 183 allow_redirects: typing.Union[bool, None] = None, 184 response_code: int = http.HTTPStatus.OK, 185 response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 186 json_body: typing.Union[bool, None] = None, 187 response_body: typing.Union[str, dict, list, None] = None, 188 source_path: typing.Union[str, None] = None, 189 response_modifier: typing.Union[str, None] = None, 190 finalize: typing.Union[str, None] = None, 191 extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 192 **kwargs: typing.Any) -> None: 193 method = str(method).upper() 194 if (method not in ALLOWED_METHODS): 195 raise ValueError(f"Got unknown/disallowed method: '{method}'.") 196 197 self.method: str = method 198 """ The HTTP method for this exchange. """ 199 200 url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters) 201 202 self.url_path: str = url_path 203 """ 204 The path portion of the request URL. 205 Only the path (not domain, port, params, anchor, etc) should be included. 206 """ 207 208 self.url_anchor: typing.Union[str, None] = url_anchor 209 """ 210 The anchor portion of the request URL (if it exists). 211 """ 212 213 self.parameters: typing.Dict[str, typing.Any] = parameters 214 """ 215 The parameters/arguments for this request. 216 Parameters should be provided here and not encoded into URLs, 217 regardless of the request method. 218 With the exception of files, all parameters should be placed here. 219 """ 220 221 if (files is None): 222 files = [] 223 224 parsed_files = [] 225 for file in files: 226 if (isinstance(file, FileInfo)): 227 parsed_files.append(file) 228 else: 229 parsed_files.append(FileInfo(**file)) 230 231 self.files: typing.List[FileInfo] = parsed_files 232 """ 233 A list of files to include in the request. 234 The files are represented as dicts with a 235 "path" (path to the file on disk) and "name" (the filename to send in the request) field. 236 These paths must be POSIX-style paths, 237 they will be converted to system-specific paths. 238 Once this exchange is ready for use, these paths should be resolved (and probably absolute). 239 However, when serialized these paths should probably be relative. 240 To reconcile this, resolve_paths() should be called before using this exchange. 241 """ 242 243 if (headers is None): 244 headers = {} 245 246 self.headers: typing.Dict[str, typing.Any] = headers 247 """ Headers in the request. """ 248 249 if (allow_redirects is None): 250 allow_redirects = True 251 252 self.allow_redirects: bool = allow_redirects 253 """ Follow redirects. """ 254 255 self.response_code: int = response_code 256 """ The HTTP status code of the response. """ 257 258 if (response_headers is None): 259 response_headers = {} 260 261 self.response_headers: typing.Dict[str, typing.Any] = response_headers 262 """ Headers in the response. """ 263 264 if (json_body is None): 265 json_body = isinstance(response_body, (dict, list)) 266 267 self.json_body: bool = json_body 268 """ 269 Indicates that the response is JSON and should be converted to/from a string. 270 If the response body is passed in a dict/list and this is passed as None, 271 then this will be set as true. 272 """ 273 274 if (self.json_body and isinstance(response_body, (dict, list))): 275 response_body = edq.util.json.dumps(response_body) 276 277 self.response_body: typing.Union[str, None] = response_body # type: ignore[assignment] 278 """ 279 The response that should be sent in this exchange. 280 """ 281 282 self.response_modifier: typing.Union[str, None] = response_modifier 283 """ 284 This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) 285 before sent back to the caller. 286 This reference must be importable via edq.util.pyimport.fetch(). 287 """ 288 289 self.finalize: typing.Union[str, None] = finalize 290 """ 291 This function reference will be used to finalize echanges before sent back to the caller. 292 This reference must be importable via edq.util.pyimport.fetch(). 293 """ 294 295 self.source_path: typing.Union[str, None] = source_path 296 """ 297 The path that this exchange was loaded from (if it was loaded from a file). 298 This value should never be serialized, but can be useful for testing. 299 """ 300 301 if (extra_options is None): 302 extra_options = {} 303 304 self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy() 305 """ 306 Additional options for this exchange. 307 This library will not use these options, but other's may. 308 kwargs will also be added to this. 309 """ 310 311 self.extra_options.update(kwargs) 312 313 def _parse_url_components(self, 314 url: typing.Union[str, None] = None, 315 url_path: typing.Union[str, None] = None, 316 url_anchor: typing.Union[str, None] = None, 317 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 318 ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]: 319 """ 320 Parse out all URL-based components from raw inputs. 321 The URL's path and anchor can either be supplied separately, or as part of the full given URL. 322 If content is present in both places, they much match (or an error will be raised). 323 Query parameters may be provided in the full URL, 324 but will be overwritten by any that are provided separately. 325 Any information from the URL aside from the path, anchor/fragment, and query will be ignored. 326 Note that path parameters (not query parameters) will be ignored. 327 The final url path, url anchor, and parameters will be returned. 328 """ 329 330 # Do base initialization and cleanup. 331 332 if (url_path is not None): 333 url_path = url_path.strip() 334 if (url_path == ''): 335 url_path = '' 336 else: 337 url_path = url_path.lstrip('/') 338 339 if (url_anchor is not None): 340 url_anchor = url_anchor.strip() 341 if (url_anchor == ''): 342 url_anchor = None 343 else: 344 url_anchor = url_anchor.lstrip('#') 345 346 if (parameters is None): 347 parameters = {} 348 349 # Parse the URL (if present). 350 351 if ((url is not None) and (url.strip() != '')): 352 parts = urllib.parse.urlparse(url) 353 354 # Handle the path. 355 356 path = parts.path.lstrip('/') 357 358 if ((url_path is not None) and (url_path != path)): 359 raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').") 360 361 url_path = path 362 363 # Check the optional anchor/fragment. 364 365 if (parts.fragment != ''): 366 fragment = parts.fragment.lstrip('#') 367 368 if ((url_anchor is not None) and (url_anchor != fragment)): 369 raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').") 370 371 url_anchor = fragment 372 373 # Check for any parameters. 374 375 url_params = edq.net.util.parse_query_string(parts.query) 376 for (key, value) in url_params.items(): 377 if (key not in parameters): 378 parameters[key] = value 379 380 if (url_path is None): 381 raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.') 382 383 # Sort parameter keys for consistency. 384 parameters = {key: parameters[key] for key in sorted(parameters.keys())} 385 386 return url_path, url_anchor, parameters 387 388 def resolve_paths(self, base_dir: str) -> None: 389 """ Resolve any paths relative to the given base dir. """ 390 391 for file_info in self.files: 392 file_info.resolve_path(base_dir) 393 394 def match(self, query: 'HTTPExchange', 395 match_headers: bool = True, 396 headers_to_skip: typing.Union[typing.List[str], None] = None, 397 params_to_skip: typing.Union[typing.List[str], None] = None, 398 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 399 """ 400 Check if this exchange matches the query exchange. 401 If they match, `(True, None)` will be returned. 402 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 403 404 Note that this is not an equality check, 405 as a query exchange is often missing the response components. 406 This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange. 407 """ 408 409 if (query.method != self.method): 410 return False, f"HTTP method does not match (query = {query.method}, target = {self.method})." 411 412 if (query.url_path != self.url_path): 413 return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})." 414 415 if (query.url_anchor != self.url_anchor): 416 return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})." 417 418 if (headers_to_skip is None): 419 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 420 421 if (params_to_skip is None): 422 params_to_skip = [] 423 424 if (match_headers): 425 match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip) 426 if (not match): 427 return False, hint 428 429 match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip) 430 if (not match): 431 return False, hint 432 433 # Check file names and hash contents. 434 query_filenames = {(file.name, file.hash_content()) for file in query.files} 435 target_filenames = {(file.name, file.hash_content()) for file in self.files} 436 if (query_filenames != target_filenames): 437 return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})." 438 439 return True, None 440 441 def _match_dict(self, label: str, 442 query_dict: typing.Dict[str, typing.Any], 443 target_dict: typing.Dict[str, typing.Any], 444 keys_to_skip: typing.Union[typing.List[str], None] = None, 445 query_label: str = 'query', 446 target_label: str = 'target', 447 normalize_key_case: bool = True, 448 ) -> typing.Tuple[bool, typing.Union[str, None]]: 449 """ A subcheck in match(), specifically for a dictionary. """ 450 451 if (keys_to_skip is None): 452 keys_to_skip = [] 453 454 if (normalize_key_case): 455 keys_to_skip = [key.lower() for key in keys_to_skip] 456 query_dict = {key.lower(): value for (key, value) in query_dict.items()} 457 target_dict = {key.lower(): value for (key, value) in target_dict.items()} 458 459 query_keys = set(query_dict.keys()) - set(keys_to_skip) 460 target_keys = set(target_dict.keys()) - set(keys_to_skip) 461 462 if (query_keys != target_keys): 463 return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})." 464 465 for key in sorted(query_keys): 466 query_value = query_dict[key] 467 target_value = target_dict[key] 468 469 if (query_value != target_value): 470 comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'" 471 return False, f"{label.title()} '{key}' has a non-matching value ({comparison})." 472 473 return True, None 474 475 def get_url(self) -> str: 476 """ Get the URL path and anchor combined. """ 477 478 url = self.url_path 479 480 if (self.url_anchor is not None): 481 url += ('#' + self.url_anchor) 482 483 return url 484 485 def match_response(self, response: requests.Response, 486 override_body: typing.Union[str, None] = None, 487 headers_to_skip: typing.Union[typing.List[str], None] = None, 488 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 489 """ 490 Check if this exchange matches the given response. 491 If they match, `(True, None)` will be returned. 492 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 493 """ 494 495 if (headers_to_skip is None): 496 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 497 498 response_body = override_body 499 if (response_body is None): 500 response_body = response.text 501 502 if (self.response_code != response.status_code): 503 return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})" 504 505 expected_body = self.response_body 506 actual_body = None 507 508 if (self.json_body): 509 actual_body = response.json() 510 511 # Normalize the actual and expected bodies. 512 513 actual_body = edq.util.json.dumps(actual_body) 514 515 if (isinstance(expected_body, str)): 516 expected_body = edq.util.json.loads(expected_body) 517 518 expected_body = edq.util.json.dumps(expected_body) 519 else: 520 actual_body = response_body 521 522 if (self.response_body != actual_body): 523 body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'" 524 return False, f"body does not match ({body_hint})" 525 526 match, hint = self._match_dict('header', response.headers, self.response_headers, 527 keys_to_skip = headers_to_skip, 528 query_label = 'response', target_label = 'exchange') 529 530 if (not match): 531 return False, hint 532 533 return True, None 534 535 def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str: 536 """ Create a consistent, semi-unique, and relative path for this exchange. """ 537 538 url = self.get_url().strip() 539 parts = url.split('/') 540 541 542 if (url in ['', '/']): 543 filename = '_index_' 544 dirname = '' 545 else: 546 filename = parts[-1] 547 548 if (len(parts) > 1): 549 dirname = os.path.join(*parts[0:-1]) 550 else: 551 dirname = '' 552 553 parameters = {} 554 for key in sorted(self.parameters.keys()): 555 parameters[key] = self.parameters[key] 556 557 # Treat files as params as well. 558 for file_info in self.files: 559 parameters[f"file-{file_info.name}"] = file_info.hash_content() 560 561 query = urllib.parse.urlencode(parameters) 562 if (query != ''): 563 # The query can get very long, so we may have to clip it. 564 query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH) 565 566 # Note that the '?' is URL encoded. 567 filename += f"%3F{query_text}" 568 569 filename += f"_{self.method}{http_exchange_extension}" 570 571 return os.path.join(dirname, filename) 572 573 def to_dict(self) -> typing.Dict[str, typing.Any]: 574 return vars(self) 575 576 @classmethod 577 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 578 return HTTPExchange(**data) 579 580 @classmethod 581 def from_path(cls, path: str, 582 set_source_path: bool = True, 583 ) -> 'HTTPExchange': 584 """ 585 Load an exchange from a file. 586 This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths. 587 """ 588 589 exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange)) 590 591 if (set_source_path): 592 exchange.source_path = os.path.abspath(path) 593 594 exchange.resolve_paths(os.path.abspath(os.path.dirname(path))) 595 596 return exchange 597 598 @classmethod 599 def from_response(cls, 600 response: requests.Response, 601 headers_to_skip: typing.Union[typing.List[str], None] = None, 602 params_to_skip: typing.Union[typing.List[str], None] = None, 603 allow_redirects: typing.Union[bool, None] = None, 604 ) -> 'HTTPExchange': 605 """ Create a full excahnge from a response. """ 606 607 if (headers_to_skip is None): 608 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 609 610 if (params_to_skip is None): 611 params_to_skip = [] 612 613 body = response.text 614 615 # Use a clean function (if one exists). 616 if (_exchanges_clean_func is not None): 617 # Make a copy of the response to avoid cleaning functions modifying it. 618 # Note that this is not a very complete solution, since we can't rely on the deep copy getting everything right. 619 response = copy.deepcopy(response) 620 621 modify_func = edq.util.pyimport.fetch(_exchanges_clean_func) 622 body = modify_func(response, body) 623 624 request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()} 625 response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()} 626 627 # Clean headers. 628 for key in headers_to_skip: 629 key = key.lower() 630 631 request_headers.pop(key, None) 632 response_headers.pop(key, None) 633 634 request_data, request_files = edq.net.util.parse_request_data(response.request.url, response.request.headers, response.request.body) 635 636 # Clean parameters. 637 for key in params_to_skip: 638 request_data.pop(key, None) 639 640 files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()] 641 642 data = { 643 'method': response.request.method, 644 'url': response.request.url, 645 'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None), 646 'parameters': request_data, 647 'files': files, 648 'headers': request_headers, 649 'response_code': response.status_code, 650 'response_headers': response_headers, 651 'response_body': body, 652 'response_modifier': _exchanges_clean_func, 653 'allow_redirects': allow_redirects, 654 } 655 656 exchange = HTTPExchange(**data) 657 658 # Use a finalize function (if one exists). 659 if (_exchanges_finalize_func is not None): 660 finalize_func = edq.util.pyimport.fetch(_exchanges_finalize_func) 661 662 exchange = finalize_func(exchange) 663 exchange.finalize = _exchanges_finalize_func 664 665 return exchange 666 667@typing.runtime_checkable 668class HTTPExchangeComplete(typing.Protocol): 669 """ 670 A function that can be called after a request has been made (and exchange constructed). 671 """ 672 673 def __call__(self, 674 exchange: HTTPExchange 675 ) -> str: 676 """ 677 Called after an HTTP exchange has been completed. 678 """
If the filename of an HTTPExhange being saved is longer than this, then clip it.
By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL. Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information.
Allowed HTTP methods for an HTTPExchange.
By default, ignore these headers during exchange matching. Some are sent automatically and we don't need to record (like content-length), and some are additional information we don't need.
98class FileInfo(edq.util.json.DictConverter): 99 """ Store info about files used in HTTP exchanges. """ 100 101 def __init__(self, 102 path: typing.Union[str, None] = None, 103 name: typing.Union[str, None] = None, 104 content: typing.Union[str, bytes, None] = None, 105 b64_encoded: bool = False, 106 **kwargs: typing.Any) -> None: 107 # Normalize the path from POSIX-style to the system's style. 108 if (path is not None): 109 path = str(pathlib.PurePath(pathlib.PurePosixPath(path))) 110 111 self.path: typing.Union[str, None] = path 112 """ The on-disk path to a file. """ 113 114 if ((name is None) and (self.path is not None)): 115 name = os.path.basename(self.path) 116 117 if (name is None): 118 raise ValueError("No name was provided for file.") 119 120 self.name: str = name 121 """ The name for this file used in an HTTP request. """ 122 123 self.content: typing.Union[str, bytes, None] = content 124 """ The contents of this file. """ 125 126 self.b64_encoded: bool = b64_encoded 127 """ Whether the content is a string encoded in Base64. """ 128 129 if ((self.path is None) and (self.content is None)): 130 raise ValueError("File must have either path or content specified.") 131 132 def resolve_path(self, base_dir: str, load_file: bool = True) -> None: 133 """ Resolve this path relative to the given base dir. """ 134 135 if ((self.path is not None) and (not os.path.isabs(self.path))): 136 self.path = os.path.abspath(os.path.join(base_dir, self.path)) 137 138 if ((self.path is not None) and (self.content is None) and load_file): 139 self.content = edq.util.dirent.read_file_bytes(self.path) 140 141 def hash_content(self) -> str: 142 """ 143 Compute a hash for the content present. 144 If no content is provided, use the path. 145 """ 146 147 hash_content = self.content 148 149 if (self.b64_encoded and isinstance(hash_content, str)): 150 hash_content = edq.util.encoding.from_base64(hash_content) 151 152 if (hash_content is None): 153 hash_content = self.path 154 155 return edq.util.hash.sha256_hex(hash_content) 156 157 def to_dict(self) -> typing.Dict[str, typing.Any]: 158 data = vars(self).copy() 159 160 # JSON does not support raw bytes, so we will need to base64 encode any binary content. 161 if (isinstance(self.content, bytes)): 162 data['content'] = edq.util.encoding.to_base64(self.content) 163 data['b64_encoded'] = True 164 165 return data 166 167 @classmethod 168 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 169 return FileInfo(**data)
Store info about files used in HTTP exchanges.
101 def __init__(self, 102 path: typing.Union[str, None] = None, 103 name: typing.Union[str, None] = None, 104 content: typing.Union[str, bytes, None] = None, 105 b64_encoded: bool = False, 106 **kwargs: typing.Any) -> None: 107 # Normalize the path from POSIX-style to the system's style. 108 if (path is not None): 109 path = str(pathlib.PurePath(pathlib.PurePosixPath(path))) 110 111 self.path: typing.Union[str, None] = path 112 """ The on-disk path to a file. """ 113 114 if ((name is None) and (self.path is not None)): 115 name = os.path.basename(self.path) 116 117 if (name is None): 118 raise ValueError("No name was provided for file.") 119 120 self.name: str = name 121 """ The name for this file used in an HTTP request. """ 122 123 self.content: typing.Union[str, bytes, None] = content 124 """ The contents of this file. """ 125 126 self.b64_encoded: bool = b64_encoded 127 """ Whether the content is a string encoded in Base64. """ 128 129 if ((self.path is None) and (self.content is None)): 130 raise ValueError("File must have either path or content specified.")
132 def resolve_path(self, base_dir: str, load_file: bool = True) -> None: 133 """ Resolve this path relative to the given base dir. """ 134 135 if ((self.path is not None) and (not os.path.isabs(self.path))): 136 self.path = os.path.abspath(os.path.join(base_dir, self.path)) 137 138 if ((self.path is not None) and (self.content is None) and load_file): 139 self.content = edq.util.dirent.read_file_bytes(self.path)
Resolve this path relative to the given base dir.
141 def hash_content(self) -> str: 142 """ 143 Compute a hash for the content present. 144 If no content is provided, use the path. 145 """ 146 147 hash_content = self.content 148 149 if (self.b64_encoded and isinstance(hash_content, str)): 150 hash_content = edq.util.encoding.from_base64(hash_content) 151 152 if (hash_content is None): 153 hash_content = self.path 154 155 return edq.util.hash.sha256_hex(hash_content)
Compute a hash for the content present. If no content is provided, use the path.
157 def to_dict(self) -> typing.Dict[str, typing.Any]: 158 data = vars(self).copy() 159 160 # JSON does not support raw bytes, so we will need to base64 encode any binary content. 161 if (isinstance(self.content, bytes)): 162 data['content'] = edq.util.encoding.to_base64(self.content) 163 data['b64_encoded'] = True 164 165 return data
Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.
A general (but inefficient) implementation is provided by default.
167 @classmethod 168 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 169 return FileInfo(**data)
Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.
A general (but inefficient) implementation is provided by default.
171class HTTPExchange(edq.util.json.DictConverter): 172 """ 173 The request and response making up a full HTTP exchange. 174 """ 175 176 def __init__(self, 177 method: str = 'GET', 178 url: typing.Union[str, None] = None, 179 url_path: typing.Union[str, None] = None, 180 url_anchor: typing.Union[str, None] = None, 181 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 182 files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None, 183 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 184 allow_redirects: typing.Union[bool, None] = None, 185 response_code: int = http.HTTPStatus.OK, 186 response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 187 json_body: typing.Union[bool, None] = None, 188 response_body: typing.Union[str, dict, list, None] = None, 189 source_path: typing.Union[str, None] = None, 190 response_modifier: typing.Union[str, None] = None, 191 finalize: typing.Union[str, None] = None, 192 extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 193 **kwargs: typing.Any) -> None: 194 method = str(method).upper() 195 if (method not in ALLOWED_METHODS): 196 raise ValueError(f"Got unknown/disallowed method: '{method}'.") 197 198 self.method: str = method 199 """ The HTTP method for this exchange. """ 200 201 url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters) 202 203 self.url_path: str = url_path 204 """ 205 The path portion of the request URL. 206 Only the path (not domain, port, params, anchor, etc) should be included. 207 """ 208 209 self.url_anchor: typing.Union[str, None] = url_anchor 210 """ 211 The anchor portion of the request URL (if it exists). 212 """ 213 214 self.parameters: typing.Dict[str, typing.Any] = parameters 215 """ 216 The parameters/arguments for this request. 217 Parameters should be provided here and not encoded into URLs, 218 regardless of the request method. 219 With the exception of files, all parameters should be placed here. 220 """ 221 222 if (files is None): 223 files = [] 224 225 parsed_files = [] 226 for file in files: 227 if (isinstance(file, FileInfo)): 228 parsed_files.append(file) 229 else: 230 parsed_files.append(FileInfo(**file)) 231 232 self.files: typing.List[FileInfo] = parsed_files 233 """ 234 A list of files to include in the request. 235 The files are represented as dicts with a 236 "path" (path to the file on disk) and "name" (the filename to send in the request) field. 237 These paths must be POSIX-style paths, 238 they will be converted to system-specific paths. 239 Once this exchange is ready for use, these paths should be resolved (and probably absolute). 240 However, when serialized these paths should probably be relative. 241 To reconcile this, resolve_paths() should be called before using this exchange. 242 """ 243 244 if (headers is None): 245 headers = {} 246 247 self.headers: typing.Dict[str, typing.Any] = headers 248 """ Headers in the request. """ 249 250 if (allow_redirects is None): 251 allow_redirects = True 252 253 self.allow_redirects: bool = allow_redirects 254 """ Follow redirects. """ 255 256 self.response_code: int = response_code 257 """ The HTTP status code of the response. """ 258 259 if (response_headers is None): 260 response_headers = {} 261 262 self.response_headers: typing.Dict[str, typing.Any] = response_headers 263 """ Headers in the response. """ 264 265 if (json_body is None): 266 json_body = isinstance(response_body, (dict, list)) 267 268 self.json_body: bool = json_body 269 """ 270 Indicates that the response is JSON and should be converted to/from a string. 271 If the response body is passed in a dict/list and this is passed as None, 272 then this will be set as true. 273 """ 274 275 if (self.json_body and isinstance(response_body, (dict, list))): 276 response_body = edq.util.json.dumps(response_body) 277 278 self.response_body: typing.Union[str, None] = response_body # type: ignore[assignment] 279 """ 280 The response that should be sent in this exchange. 281 """ 282 283 self.response_modifier: typing.Union[str, None] = response_modifier 284 """ 285 This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) 286 before sent back to the caller. 287 This reference must be importable via edq.util.pyimport.fetch(). 288 """ 289 290 self.finalize: typing.Union[str, None] = finalize 291 """ 292 This function reference will be used to finalize echanges before sent back to the caller. 293 This reference must be importable via edq.util.pyimport.fetch(). 294 """ 295 296 self.source_path: typing.Union[str, None] = source_path 297 """ 298 The path that this exchange was loaded from (if it was loaded from a file). 299 This value should never be serialized, but can be useful for testing. 300 """ 301 302 if (extra_options is None): 303 extra_options = {} 304 305 self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy() 306 """ 307 Additional options for this exchange. 308 This library will not use these options, but other's may. 309 kwargs will also be added to this. 310 """ 311 312 self.extra_options.update(kwargs) 313 314 def _parse_url_components(self, 315 url: typing.Union[str, None] = None, 316 url_path: typing.Union[str, None] = None, 317 url_anchor: typing.Union[str, None] = None, 318 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 319 ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]: 320 """ 321 Parse out all URL-based components from raw inputs. 322 The URL's path and anchor can either be supplied separately, or as part of the full given URL. 323 If content is present in both places, they much match (or an error will be raised). 324 Query parameters may be provided in the full URL, 325 but will be overwritten by any that are provided separately. 326 Any information from the URL aside from the path, anchor/fragment, and query will be ignored. 327 Note that path parameters (not query parameters) will be ignored. 328 The final url path, url anchor, and parameters will be returned. 329 """ 330 331 # Do base initialization and cleanup. 332 333 if (url_path is not None): 334 url_path = url_path.strip() 335 if (url_path == ''): 336 url_path = '' 337 else: 338 url_path = url_path.lstrip('/') 339 340 if (url_anchor is not None): 341 url_anchor = url_anchor.strip() 342 if (url_anchor == ''): 343 url_anchor = None 344 else: 345 url_anchor = url_anchor.lstrip('#') 346 347 if (parameters is None): 348 parameters = {} 349 350 # Parse the URL (if present). 351 352 if ((url is not None) and (url.strip() != '')): 353 parts = urllib.parse.urlparse(url) 354 355 # Handle the path. 356 357 path = parts.path.lstrip('/') 358 359 if ((url_path is not None) and (url_path != path)): 360 raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').") 361 362 url_path = path 363 364 # Check the optional anchor/fragment. 365 366 if (parts.fragment != ''): 367 fragment = parts.fragment.lstrip('#') 368 369 if ((url_anchor is not None) and (url_anchor != fragment)): 370 raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').") 371 372 url_anchor = fragment 373 374 # Check for any parameters. 375 376 url_params = edq.net.util.parse_query_string(parts.query) 377 for (key, value) in url_params.items(): 378 if (key not in parameters): 379 parameters[key] = value 380 381 if (url_path is None): 382 raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.') 383 384 # Sort parameter keys for consistency. 385 parameters = {key: parameters[key] for key in sorted(parameters.keys())} 386 387 return url_path, url_anchor, parameters 388 389 def resolve_paths(self, base_dir: str) -> None: 390 """ Resolve any paths relative to the given base dir. """ 391 392 for file_info in self.files: 393 file_info.resolve_path(base_dir) 394 395 def match(self, query: 'HTTPExchange', 396 match_headers: bool = True, 397 headers_to_skip: typing.Union[typing.List[str], None] = None, 398 params_to_skip: typing.Union[typing.List[str], None] = None, 399 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 400 """ 401 Check if this exchange matches the query exchange. 402 If they match, `(True, None)` will be returned. 403 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 404 405 Note that this is not an equality check, 406 as a query exchange is often missing the response components. 407 This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange. 408 """ 409 410 if (query.method != self.method): 411 return False, f"HTTP method does not match (query = {query.method}, target = {self.method})." 412 413 if (query.url_path != self.url_path): 414 return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})." 415 416 if (query.url_anchor != self.url_anchor): 417 return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})." 418 419 if (headers_to_skip is None): 420 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 421 422 if (params_to_skip is None): 423 params_to_skip = [] 424 425 if (match_headers): 426 match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip) 427 if (not match): 428 return False, hint 429 430 match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip) 431 if (not match): 432 return False, hint 433 434 # Check file names and hash contents. 435 query_filenames = {(file.name, file.hash_content()) for file in query.files} 436 target_filenames = {(file.name, file.hash_content()) for file in self.files} 437 if (query_filenames != target_filenames): 438 return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})." 439 440 return True, None 441 442 def _match_dict(self, label: str, 443 query_dict: typing.Dict[str, typing.Any], 444 target_dict: typing.Dict[str, typing.Any], 445 keys_to_skip: typing.Union[typing.List[str], None] = None, 446 query_label: str = 'query', 447 target_label: str = 'target', 448 normalize_key_case: bool = True, 449 ) -> typing.Tuple[bool, typing.Union[str, None]]: 450 """ A subcheck in match(), specifically for a dictionary. """ 451 452 if (keys_to_skip is None): 453 keys_to_skip = [] 454 455 if (normalize_key_case): 456 keys_to_skip = [key.lower() for key in keys_to_skip] 457 query_dict = {key.lower(): value for (key, value) in query_dict.items()} 458 target_dict = {key.lower(): value for (key, value) in target_dict.items()} 459 460 query_keys = set(query_dict.keys()) - set(keys_to_skip) 461 target_keys = set(target_dict.keys()) - set(keys_to_skip) 462 463 if (query_keys != target_keys): 464 return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})." 465 466 for key in sorted(query_keys): 467 query_value = query_dict[key] 468 target_value = target_dict[key] 469 470 if (query_value != target_value): 471 comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'" 472 return False, f"{label.title()} '{key}' has a non-matching value ({comparison})." 473 474 return True, None 475 476 def get_url(self) -> str: 477 """ Get the URL path and anchor combined. """ 478 479 url = self.url_path 480 481 if (self.url_anchor is not None): 482 url += ('#' + self.url_anchor) 483 484 return url 485 486 def match_response(self, response: requests.Response, 487 override_body: typing.Union[str, None] = None, 488 headers_to_skip: typing.Union[typing.List[str], None] = None, 489 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 490 """ 491 Check if this exchange matches the given response. 492 If they match, `(True, None)` will be returned. 493 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 494 """ 495 496 if (headers_to_skip is None): 497 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 498 499 response_body = override_body 500 if (response_body is None): 501 response_body = response.text 502 503 if (self.response_code != response.status_code): 504 return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})" 505 506 expected_body = self.response_body 507 actual_body = None 508 509 if (self.json_body): 510 actual_body = response.json() 511 512 # Normalize the actual and expected bodies. 513 514 actual_body = edq.util.json.dumps(actual_body) 515 516 if (isinstance(expected_body, str)): 517 expected_body = edq.util.json.loads(expected_body) 518 519 expected_body = edq.util.json.dumps(expected_body) 520 else: 521 actual_body = response_body 522 523 if (self.response_body != actual_body): 524 body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'" 525 return False, f"body does not match ({body_hint})" 526 527 match, hint = self._match_dict('header', response.headers, self.response_headers, 528 keys_to_skip = headers_to_skip, 529 query_label = 'response', target_label = 'exchange') 530 531 if (not match): 532 return False, hint 533 534 return True, None 535 536 def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str: 537 """ Create a consistent, semi-unique, and relative path for this exchange. """ 538 539 url = self.get_url().strip() 540 parts = url.split('/') 541 542 543 if (url in ['', '/']): 544 filename = '_index_' 545 dirname = '' 546 else: 547 filename = parts[-1] 548 549 if (len(parts) > 1): 550 dirname = os.path.join(*parts[0:-1]) 551 else: 552 dirname = '' 553 554 parameters = {} 555 for key in sorted(self.parameters.keys()): 556 parameters[key] = self.parameters[key] 557 558 # Treat files as params as well. 559 for file_info in self.files: 560 parameters[f"file-{file_info.name}"] = file_info.hash_content() 561 562 query = urllib.parse.urlencode(parameters) 563 if (query != ''): 564 # The query can get very long, so we may have to clip it. 565 query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH) 566 567 # Note that the '?' is URL encoded. 568 filename += f"%3F{query_text}" 569 570 filename += f"_{self.method}{http_exchange_extension}" 571 572 return os.path.join(dirname, filename) 573 574 def to_dict(self) -> typing.Dict[str, typing.Any]: 575 return vars(self) 576 577 @classmethod 578 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 579 return HTTPExchange(**data) 580 581 @classmethod 582 def from_path(cls, path: str, 583 set_source_path: bool = True, 584 ) -> 'HTTPExchange': 585 """ 586 Load an exchange from a file. 587 This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths. 588 """ 589 590 exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange)) 591 592 if (set_source_path): 593 exchange.source_path = os.path.abspath(path) 594 595 exchange.resolve_paths(os.path.abspath(os.path.dirname(path))) 596 597 return exchange 598 599 @classmethod 600 def from_response(cls, 601 response: requests.Response, 602 headers_to_skip: typing.Union[typing.List[str], None] = None, 603 params_to_skip: typing.Union[typing.List[str], None] = None, 604 allow_redirects: typing.Union[bool, None] = None, 605 ) -> 'HTTPExchange': 606 """ Create a full excahnge from a response. """ 607 608 if (headers_to_skip is None): 609 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 610 611 if (params_to_skip is None): 612 params_to_skip = [] 613 614 body = response.text 615 616 # Use a clean function (if one exists). 617 if (_exchanges_clean_func is not None): 618 # Make a copy of the response to avoid cleaning functions modifying it. 619 # Note that this is not a very complete solution, since we can't rely on the deep copy getting everything right. 620 response = copy.deepcopy(response) 621 622 modify_func = edq.util.pyimport.fetch(_exchanges_clean_func) 623 body = modify_func(response, body) 624 625 request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()} 626 response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()} 627 628 # Clean headers. 629 for key in headers_to_skip: 630 key = key.lower() 631 632 request_headers.pop(key, None) 633 response_headers.pop(key, None) 634 635 request_data, request_files = edq.net.util.parse_request_data(response.request.url, response.request.headers, response.request.body) 636 637 # Clean parameters. 638 for key in params_to_skip: 639 request_data.pop(key, None) 640 641 files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()] 642 643 data = { 644 'method': response.request.method, 645 'url': response.request.url, 646 'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None), 647 'parameters': request_data, 648 'files': files, 649 'headers': request_headers, 650 'response_code': response.status_code, 651 'response_headers': response_headers, 652 'response_body': body, 653 'response_modifier': _exchanges_clean_func, 654 'allow_redirects': allow_redirects, 655 } 656 657 exchange = HTTPExchange(**data) 658 659 # Use a finalize function (if one exists). 660 if (_exchanges_finalize_func is not None): 661 finalize_func = edq.util.pyimport.fetch(_exchanges_finalize_func) 662 663 exchange = finalize_func(exchange) 664 exchange.finalize = _exchanges_finalize_func 665 666 return exchange
The request and response making up a full HTTP exchange.
176 def __init__(self, 177 method: str = 'GET', 178 url: typing.Union[str, None] = None, 179 url_path: typing.Union[str, None] = None, 180 url_anchor: typing.Union[str, None] = None, 181 parameters: typing.Union[typing.Dict[str, typing.Any], None] = None, 182 files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None, 183 headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 184 allow_redirects: typing.Union[bool, None] = None, 185 response_code: int = http.HTTPStatus.OK, 186 response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None, 187 json_body: typing.Union[bool, None] = None, 188 response_body: typing.Union[str, dict, list, None] = None, 189 source_path: typing.Union[str, None] = None, 190 response_modifier: typing.Union[str, None] = None, 191 finalize: typing.Union[str, None] = None, 192 extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None, 193 **kwargs: typing.Any) -> None: 194 method = str(method).upper() 195 if (method not in ALLOWED_METHODS): 196 raise ValueError(f"Got unknown/disallowed method: '{method}'.") 197 198 self.method: str = method 199 """ The HTTP method for this exchange. """ 200 201 url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters) 202 203 self.url_path: str = url_path 204 """ 205 The path portion of the request URL. 206 Only the path (not domain, port, params, anchor, etc) should be included. 207 """ 208 209 self.url_anchor: typing.Union[str, None] = url_anchor 210 """ 211 The anchor portion of the request URL (if it exists). 212 """ 213 214 self.parameters: typing.Dict[str, typing.Any] = parameters 215 """ 216 The parameters/arguments for this request. 217 Parameters should be provided here and not encoded into URLs, 218 regardless of the request method. 219 With the exception of files, all parameters should be placed here. 220 """ 221 222 if (files is None): 223 files = [] 224 225 parsed_files = [] 226 for file in files: 227 if (isinstance(file, FileInfo)): 228 parsed_files.append(file) 229 else: 230 parsed_files.append(FileInfo(**file)) 231 232 self.files: typing.List[FileInfo] = parsed_files 233 """ 234 A list of files to include in the request. 235 The files are represented as dicts with a 236 "path" (path to the file on disk) and "name" (the filename to send in the request) field. 237 These paths must be POSIX-style paths, 238 they will be converted to system-specific paths. 239 Once this exchange is ready for use, these paths should be resolved (and probably absolute). 240 However, when serialized these paths should probably be relative. 241 To reconcile this, resolve_paths() should be called before using this exchange. 242 """ 243 244 if (headers is None): 245 headers = {} 246 247 self.headers: typing.Dict[str, typing.Any] = headers 248 """ Headers in the request. """ 249 250 if (allow_redirects is None): 251 allow_redirects = True 252 253 self.allow_redirects: bool = allow_redirects 254 """ Follow redirects. """ 255 256 self.response_code: int = response_code 257 """ The HTTP status code of the response. """ 258 259 if (response_headers is None): 260 response_headers = {} 261 262 self.response_headers: typing.Dict[str, typing.Any] = response_headers 263 """ Headers in the response. """ 264 265 if (json_body is None): 266 json_body = isinstance(response_body, (dict, list)) 267 268 self.json_body: bool = json_body 269 """ 270 Indicates that the response is JSON and should be converted to/from a string. 271 If the response body is passed in a dict/list and this is passed as None, 272 then this will be set as true. 273 """ 274 275 if (self.json_body and isinstance(response_body, (dict, list))): 276 response_body = edq.util.json.dumps(response_body) 277 278 self.response_body: typing.Union[str, None] = response_body # type: ignore[assignment] 279 """ 280 The response that should be sent in this exchange. 281 """ 282 283 self.response_modifier: typing.Union[str, None] = response_modifier 284 """ 285 This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) 286 before sent back to the caller. 287 This reference must be importable via edq.util.pyimport.fetch(). 288 """ 289 290 self.finalize: typing.Union[str, None] = finalize 291 """ 292 This function reference will be used to finalize echanges before sent back to the caller. 293 This reference must be importable via edq.util.pyimport.fetch(). 294 """ 295 296 self.source_path: typing.Union[str, None] = source_path 297 """ 298 The path that this exchange was loaded from (if it was loaded from a file). 299 This value should never be serialized, but can be useful for testing. 300 """ 301 302 if (extra_options is None): 303 extra_options = {} 304 305 self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy() 306 """ 307 Additional options for this exchange. 308 This library will not use these options, but other's may. 309 kwargs will also be added to this. 310 """ 311 312 self.extra_options.update(kwargs)
The path portion of the request URL. Only the path (not domain, port, params, anchor, etc) should be included.
The parameters/arguments for this request. Parameters should be provided here and not encoded into URLs, regardless of the request method. With the exception of files, all parameters should be placed here.
A list of files to include in the request. The files are represented as dicts with a "path" (path to the file on disk) and "name" (the filename to send in the request) field. These paths must be POSIX-style paths, they will be converted to system-specific paths. Once this exchange is ready for use, these paths should be resolved (and probably absolute). However, when serialized these paths should probably be relative. To reconcile this, resolve_paths() should be called before using this exchange.
Indicates that the response is JSON and should be converted to/from a string. If the response body is passed in a dict/list and this is passed as None, then this will be set as true.
This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) before sent back to the caller. This reference must be importable via edq.util.pyimport.fetch().
This function reference will be used to finalize echanges before sent back to the caller. This reference must be importable via edq.util.pyimport.fetch().
The path that this exchange was loaded from (if it was loaded from a file). This value should never be serialized, but can be useful for testing.
Additional options for this exchange. This library will not use these options, but other's may. kwargs will also be added to this.
389 def resolve_paths(self, base_dir: str) -> None: 390 """ Resolve any paths relative to the given base dir. """ 391 392 for file_info in self.files: 393 file_info.resolve_path(base_dir)
Resolve any paths relative to the given base dir.
395 def match(self, query: 'HTTPExchange', 396 match_headers: bool = True, 397 headers_to_skip: typing.Union[typing.List[str], None] = None, 398 params_to_skip: typing.Union[typing.List[str], None] = None, 399 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 400 """ 401 Check if this exchange matches the query exchange. 402 If they match, `(True, None)` will be returned. 403 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 404 405 Note that this is not an equality check, 406 as a query exchange is often missing the response components. 407 This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange. 408 """ 409 410 if (query.method != self.method): 411 return False, f"HTTP method does not match (query = {query.method}, target = {self.method})." 412 413 if (query.url_path != self.url_path): 414 return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})." 415 416 if (query.url_anchor != self.url_anchor): 417 return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})." 418 419 if (headers_to_skip is None): 420 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 421 422 if (params_to_skip is None): 423 params_to_skip = [] 424 425 if (match_headers): 426 match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip) 427 if (not match): 428 return False, hint 429 430 match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip) 431 if (not match): 432 return False, hint 433 434 # Check file names and hash contents. 435 query_filenames = {(file.name, file.hash_content()) for file in query.files} 436 target_filenames = {(file.name, file.hash_content()) for file in self.files} 437 if (query_filenames != target_filenames): 438 return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})." 439 440 return True, None
Check if this exchange matches the query exchange.
If they match, (True, None) will be returned.
If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.
Note that this is not an equality check, as a query exchange is often missing the response components. This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
476 def get_url(self) -> str: 477 """ Get the URL path and anchor combined. """ 478 479 url = self.url_path 480 481 if (self.url_anchor is not None): 482 url += ('#' + self.url_anchor) 483 484 return url
Get the URL path and anchor combined.
486 def match_response(self, response: requests.Response, 487 override_body: typing.Union[str, None] = None, 488 headers_to_skip: typing.Union[typing.List[str], None] = None, 489 **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]: 490 """ 491 Check if this exchange matches the given response. 492 If they match, `(True, None)` will be returned. 493 If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is. 494 """ 495 496 if (headers_to_skip is None): 497 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 498 499 response_body = override_body 500 if (response_body is None): 501 response_body = response.text 502 503 if (self.response_code != response.status_code): 504 return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})" 505 506 expected_body = self.response_body 507 actual_body = None 508 509 if (self.json_body): 510 actual_body = response.json() 511 512 # Normalize the actual and expected bodies. 513 514 actual_body = edq.util.json.dumps(actual_body) 515 516 if (isinstance(expected_body, str)): 517 expected_body = edq.util.json.loads(expected_body) 518 519 expected_body = edq.util.json.dumps(expected_body) 520 else: 521 actual_body = response_body 522 523 if (self.response_body != actual_body): 524 body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'" 525 return False, f"body does not match ({body_hint})" 526 527 match, hint = self._match_dict('header', response.headers, self.response_headers, 528 keys_to_skip = headers_to_skip, 529 query_label = 'response', target_label = 'exchange') 530 531 if (not match): 532 return False, hint 533 534 return True, None
Check if this exchange matches the given response.
If they match, (True, None) will be returned.
If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.
536 def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str: 537 """ Create a consistent, semi-unique, and relative path for this exchange. """ 538 539 url = self.get_url().strip() 540 parts = url.split('/') 541 542 543 if (url in ['', '/']): 544 filename = '_index_' 545 dirname = '' 546 else: 547 filename = parts[-1] 548 549 if (len(parts) > 1): 550 dirname = os.path.join(*parts[0:-1]) 551 else: 552 dirname = '' 553 554 parameters = {} 555 for key in sorted(self.parameters.keys()): 556 parameters[key] = self.parameters[key] 557 558 # Treat files as params as well. 559 for file_info in self.files: 560 parameters[f"file-{file_info.name}"] = file_info.hash_content() 561 562 query = urllib.parse.urlencode(parameters) 563 if (query != ''): 564 # The query can get very long, so we may have to clip it. 565 query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH) 566 567 # Note that the '?' is URL encoded. 568 filename += f"%3F{query_text}" 569 570 filename += f"_{self.method}{http_exchange_extension}" 571 572 return os.path.join(dirname, filename)
Create a consistent, semi-unique, and relative path for this exchange.
Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.
A general (but inefficient) implementation is provided by default.
577 @classmethod 578 def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any: 579 return HTTPExchange(**data)
Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.
A general (but inefficient) implementation is provided by default.
581 @classmethod 582 def from_path(cls, path: str, 583 set_source_path: bool = True, 584 ) -> 'HTTPExchange': 585 """ 586 Load an exchange from a file. 587 This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths. 588 """ 589 590 exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange)) 591 592 if (set_source_path): 593 exchange.source_path = os.path.abspath(path) 594 595 exchange.resolve_paths(os.path.abspath(os.path.dirname(path))) 596 597 return exchange
Load an exchange from a file. This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
599 @classmethod 600 def from_response(cls, 601 response: requests.Response, 602 headers_to_skip: typing.Union[typing.List[str], None] = None, 603 params_to_skip: typing.Union[typing.List[str], None] = None, 604 allow_redirects: typing.Union[bool, None] = None, 605 ) -> 'HTTPExchange': 606 """ Create a full excahnge from a response. """ 607 608 if (headers_to_skip is None): 609 headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS 610 611 if (params_to_skip is None): 612 params_to_skip = [] 613 614 body = response.text 615 616 # Use a clean function (if one exists). 617 if (_exchanges_clean_func is not None): 618 # Make a copy of the response to avoid cleaning functions modifying it. 619 # Note that this is not a very complete solution, since we can't rely on the deep copy getting everything right. 620 response = copy.deepcopy(response) 621 622 modify_func = edq.util.pyimport.fetch(_exchanges_clean_func) 623 body = modify_func(response, body) 624 625 request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()} 626 response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()} 627 628 # Clean headers. 629 for key in headers_to_skip: 630 key = key.lower() 631 632 request_headers.pop(key, None) 633 response_headers.pop(key, None) 634 635 request_data, request_files = edq.net.util.parse_request_data(response.request.url, response.request.headers, response.request.body) 636 637 # Clean parameters. 638 for key in params_to_skip: 639 request_data.pop(key, None) 640 641 files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()] 642 643 data = { 644 'method': response.request.method, 645 'url': response.request.url, 646 'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None), 647 'parameters': request_data, 648 'files': files, 649 'headers': request_headers, 650 'response_code': response.status_code, 651 'response_headers': response_headers, 652 'response_body': body, 653 'response_modifier': _exchanges_clean_func, 654 'allow_redirects': allow_redirects, 655 } 656 657 exchange = HTTPExchange(**data) 658 659 # Use a finalize function (if one exists). 660 if (_exchanges_finalize_func is not None): 661 finalize_func = edq.util.pyimport.fetch(_exchanges_finalize_func) 662 663 exchange = finalize_func(exchange) 664 exchange.finalize = _exchanges_finalize_func 665 666 return exchange
Create a full excahnge from a response.
668@typing.runtime_checkable 669class HTTPExchangeComplete(typing.Protocol): 670 """ 671 A function that can be called after a request has been made (and exchange constructed). 672 """ 673 674 def __call__(self, 675 exchange: HTTPExchange 676 ) -> str: 677 """ 678 Called after an HTTP exchange has been completed. 679 """
A function that can be called after a request has been made (and exchange constructed).
1953def _no_init_or_replace_init(self, *args, **kwargs): 1954 cls = type(self) 1955 1956 if cls._is_protocol: 1957 raise TypeError('Protocols cannot be instantiated') 1958 1959 # Already using a custom `__init__`. No need to calculate correct 1960 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1961 if cls.__init__ is not _no_init_or_replace_init: 1962 return 1963 1964 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1965 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1966 # searches for a proper new `__init__` in the MRO. The new `__init__` 1967 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1968 # instantiation of the protocol subclass will thus use the new 1969 # `__init__` and no longer call `_no_init_or_replace_init`. 1970 for base in cls.__mro__: 1971 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1972 if init is not _no_init_or_replace_init: 1973 cls.__init__ = init 1974 break 1975 else: 1976 # should not happen 1977 cls.__init__ = object.__init__ 1978 1979 cls.__init__(self, *args, **kwargs)