edq.util.net

Utilities for network and HTTP.

   1"""
   2Utilities for network and HTTP.
   3"""
   4
   5import argparse
   6import email.message
   7import errno
   8import http.server
   9import io
  10import logging
  11import os
  12import pathlib
  13import socket
  14import time
  15import typing
  16import urllib.parse
  17
  18import requests
  19import requests_toolbelt.multipart.decoder
  20
  21import edq.util.dirent
  22import edq.util.encoding
  23import edq.util.hash
  24import edq.util.json
  25import edq.util.pyimport
  26
  27DEFAULT_START_PORT: int = 30000
  28DEFAULT_END_PORT: int = 40000
  29DEFAULT_PORT_SEARCH_WAIT_SEC: float = 0.01
  30
  31DEFAULT_REQUEST_TIMEOUT_SECS: float = 10.0
  32
  33DEFAULT_HTTP_EXCHANGE_EXTENSION: str= '.httpex.json'
  34
  35QUERY_CLIP_LENGTH: int = 100
  36""" If the filename of an HTTPExhange being saved is longer than this, then clip it. """
  37
  38ANCHOR_HEADER_KEY: str = 'edq-anchor'
  39"""
  40By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL.
  41Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information.
  42"""
  43
  44ALLOWED_METHODS: typing.List[str] = [
  45    'DELETE',
  46    'GET',
  47    'HEAD',
  48    'OPTIONS',
  49    'PATCH',
  50    'POST',
  51    'PUT',
  52]
  53""" Allowed HTTP methods for an HTTPExchange. """
  54
  55DEFAULT_EXCHANGE_IGNORE_HEADERS: typing.List[str] = [
  56    'accept',
  57    'accept-encoding',
  58    'accept-language',
  59    'cache-control',
  60    'connection',
  61    'content-length',
  62    'content-security-policy',
  63    'content-type',
  64    'cookie',
  65    'date',
  66    'dnt',
  67    'etag',
  68    'host',
  69    'link',
  70    'location',
  71    'priority',
  72    'referrer-policy',
  73    'sec-fetch-dest',
  74    'sec-fetch-mode',
  75    'sec-fetch-site',
  76    'sec-fetch-user',
  77    'sec-gpc',
  78    'server',
  79    'server-timing',
  80    'set-cookie',
  81    'upgrade-insecure-requests',
  82    'user-agent',
  83    'x-content-type-options',
  84    'x-download-options',
  85    'x-permitted-cross-domain-policies',
  86    'x-rate-limit-remaining',
  87    'x-request-context-id',
  88    'x-request-cost',
  89    'x-runtime',
  90    'x-session-id',
  91    'x-xss-protection',
  92    ANCHOR_HEADER_KEY,
  93]
  94"""
  95By default, ignore these headers during exchange matching.
  96Some are sent automatically and we don't need to record (like content-length),
  97and some are additional information we don't need.
  98"""
  99
 100_exchanges_out_dir: typing.Union[str, None] = None
 101""" If not None, all requests made via make_request() will be saved as an HTTPExchange in this directory. """
 102
 103_exchanges_clean_func: typing.Union[str, None] = None
 104""" If not None, all created exchanges (in HTTPExchange.make_request() and HTTPExchange.from_response()) will use this response modifier. """
 105
 106@typing.runtime_checkable
 107class ResponseModifierFunction(typing.Protocol):
 108    """
 109    A function that can be used to modify an exchange's response.
 110    Exchanges can use these functions to normalize their responses before saving.
 111    """
 112
 113    def __call__(self,
 114            response: requests.Response,
 115            body: str,
 116            ) -> str:
 117        """
 118        Modify the http response.
 119        Headers may be modified in the response directly,
 120        while the modified (or same) body must be returned.
 121        """
 122
 123class FileInfo(edq.util.json.DictConverter):
 124    """ Store info about files used in HTTP exchanges. """
 125
 126    def __init__(self,
 127            path: typing.Union[str, None] = None,
 128            name: typing.Union[str, None] = None,
 129            content: typing.Union[str, bytes, None] = None,
 130            b64_encoded: bool = False,
 131            **kwargs: typing.Any) -> None:
 132        # Normalize the path from POSIX-style to the system's style.
 133        if (path is not None):
 134            path = str(pathlib.PurePath(pathlib.PurePosixPath(path)))
 135
 136        self.path: typing.Union[str, None] = path
 137        """ The on-disk path to a file. """
 138
 139        if ((name is None) and (self.path is not None)):
 140            name = os.path.basename(self.path)
 141
 142        if (name is None):
 143            raise ValueError("No name was provided for file.")
 144
 145        self.name: str = name
 146        """ The name for this file used in an HTTP request. """
 147
 148        self.content: typing.Union[str, bytes, None] = content
 149        """ The contents of this file. """
 150
 151        self.b64_encoded: bool = b64_encoded
 152        """ Whether the content is a string encoded in Base64. """
 153
 154        if ((self.path is None) and (self.content is None)):
 155            raise ValueError("File must have either path or content specified.")
 156
 157    def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
 158        """ Resolve this path relative to the given base dir. """
 159
 160        if ((self.path is not None) and (not os.path.isabs(self.path))):
 161            self.path = os.path.abspath(os.path.join(base_dir, self.path))
 162
 163        if ((self.path is not None) and (self.content is None) and load_file):
 164            self.content = edq.util.dirent.read_file_bytes(self.path)
 165
 166    def hash_content(self) -> str:
 167        """
 168        Compute a hash for the content present.
 169        If no content is provided, use the path.
 170        """
 171
 172        hash_content = self.content
 173
 174        if (self.b64_encoded and isinstance(hash_content, str)):
 175            hash_content = edq.util.encoding.from_base64(hash_content)
 176
 177        if (hash_content is None):
 178            hash_content = self.path
 179
 180        return edq.util.hash.sha256_hex(hash_content)
 181
 182    def to_dict(self) -> typing.Dict[str, typing.Any]:
 183        data = vars(self).copy()
 184
 185        # JSON does not support raw bytes, so we will need to base64 encode any binary content.
 186        if (isinstance(self.content, bytes)):
 187            data['content'] = edq.util.encoding.to_base64(self.content)
 188            data['b64_encoded'] = True
 189
 190        return data
 191
 192    @classmethod
 193    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
 194        return FileInfo(**data)
 195
 196class HTTPExchange(edq.util.json.DictConverter):
 197    """
 198    The request and response making up a full HTTP exchange.
 199    """
 200
 201    def __init__(self,
 202            method: str = 'GET',
 203            url: typing.Union[str, None] = None,
 204            url_path: typing.Union[str, None] = None,
 205            url_anchor: typing.Union[str, None] = None,
 206            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
 207            files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None,
 208            headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 209            allow_redirects: typing.Union[bool, None] = None,
 210            response_code: int = http.HTTPStatus.OK,
 211            response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 212            json_body: typing.Union[bool, None] = None,
 213            response_body: typing.Union[str, dict, list, None] = None,
 214            source_path: typing.Union[str, None] = None,
 215            response_modifier: typing.Union[str, None] = None,
 216            extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 217            **kwargs: typing.Any) -> None:
 218        method = str(method).upper()
 219        if (method not in ALLOWED_METHODS):
 220            raise ValueError(f"Got unknown/disallowed method: '{method}'.")
 221
 222        self.method: str = method
 223        """ The HTTP method for this exchange. """
 224
 225        url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters)
 226
 227        self.url_path: str = url_path
 228        """
 229        The path portion of the request URL.
 230        Only the path (not domain, port, params, anchor, etc) should be included.
 231        """
 232
 233        self.url_anchor: typing.Union[str, None] = url_anchor
 234        """
 235        The anchor portion of the request URL (if it exists).
 236        """
 237
 238        self.parameters: typing.Dict[str, typing.Any] = parameters
 239        """
 240        The parameters/arguments for this request.
 241        Parameters should be provided here and not encoded into URLs,
 242        regardless of the request method.
 243        With the exception of files, all parameters should be placed here.
 244        """
 245
 246        if (files is None):
 247            files = []
 248
 249        parsed_files = []
 250        for file in files:
 251            if (isinstance(file, FileInfo)):
 252                parsed_files.append(file)
 253            else:
 254                parsed_files.append(FileInfo(**file))
 255
 256        self.files: typing.List[FileInfo] = parsed_files
 257        """
 258        A list of files to include in the request.
 259        The files are represented as dicts with a
 260        "path" (path to the file on disk) and "name" (the filename to send in the request) field.
 261        These paths must be POSIX-style paths,
 262        they will be converted to system-specific paths.
 263        Once this exchange is ready for use, these paths should be resolved (and probably absolute).
 264        However, when serialized these paths should probably be relative.
 265        To reconcile this, resolve_paths() should be called before using this exchange.
 266        """
 267
 268        if (headers is None):
 269            headers = {}
 270
 271        self.headers: typing.Dict[str, typing.Any] = headers
 272        """ Headers in the request. """
 273
 274        if (allow_redirects is None):
 275            allow_redirects = True
 276
 277        self.allow_redirects: bool = allow_redirects
 278        """ Follow redirects. """
 279
 280        self.response_code: int = response_code
 281        """ The HTTP status code of the response. """
 282
 283        if (response_headers is None):
 284            response_headers = {}
 285
 286        self.response_headers: typing.Dict[str, typing.Any] = response_headers
 287        """ Headers in the response. """
 288
 289        if (json_body is None):
 290            json_body = isinstance(response_body, (dict, list))
 291
 292        self.json_body: bool = json_body
 293        """
 294        Indicates that the response is JSON and should be converted to/from a string.
 295        If the response body is passed in a dict/list and this is passed as None,
 296        then this will be set as true.
 297        """
 298
 299        if (self.json_body and isinstance(response_body, (dict, list))):
 300            response_body = edq.util.json.dumps(response_body)
 301
 302        self.response_body: typing.Union[str, None] = response_body  # type: ignore[assignment]
 303        """
 304        The response that should be sent in this exchange.
 305        """
 306
 307        self.response_modifier: typing.Union[str, None] = response_modifier
 308        """
 309        This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response())
 310        before sent back to the caller.
 311        This reference must be importable via edq.util.pyimport.fetch().
 312        """
 313
 314        self.source_path: typing.Union[str, None] = source_path
 315        """
 316        The path that this exchange was loaded from (if it was loaded from a file).
 317        This value should never be serialized, but can be useful for testing.
 318        """
 319
 320        if (extra_options is None):
 321            extra_options = {}
 322
 323        self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy()
 324        """
 325        Additional options for this exchange.
 326        This library will not use these options, but other's may.
 327        kwargs will also be added to this.
 328        """
 329
 330        self.extra_options.update(kwargs)
 331
 332    def _parse_url_components(self,
 333            url: typing.Union[str, None] = None,
 334            url_path: typing.Union[str, None] = None,
 335            url_anchor: typing.Union[str, None] = None,
 336            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
 337            ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]:
 338        """
 339        Parse out all URL-based components from raw inputs.
 340        The URL's path and anchor can either be supplied separately, or as part of the full given URL.
 341        If content is present in both places, they much match (or an error will be raised).
 342        Query parameters may be provided in the full URL,
 343        but will be overwritten by any that are provided separately.
 344        Any information from the URL aside from the path, anchor/fragment, and query will be ignored.
 345        Note that path parameters (not query parameters) will be ignored.
 346        The final url path, url anchor, and parameters will be returned.
 347        """
 348
 349        # Do base initialization and cleanup.
 350
 351        if (url_path is not None):
 352            url_path = url_path.strip()
 353            if (url_path == ''):
 354                url_path = ''
 355            else:
 356                url_path = url_path.lstrip('/')
 357
 358        if (url_anchor is not None):
 359            url_anchor = url_anchor.strip()
 360            if (url_anchor == ''):
 361                url_anchor = None
 362            else:
 363                url_anchor = url_anchor.lstrip('#')
 364
 365        if (parameters is None):
 366            parameters = {}
 367
 368        # Parse the URL (if present).
 369
 370        if ((url is not None) and (url.strip() != '')):
 371            parts = urllib.parse.urlparse(url)
 372
 373            # Handle the path.
 374
 375            path = parts.path.lstrip('/')
 376
 377            if ((url_path is not None) and (url_path != path)):
 378                raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').")
 379
 380            url_path = path
 381
 382            # Check the optional anchor/fragment.
 383
 384            if (parts.fragment != ''):
 385                fragment = parts.fragment.lstrip('#')
 386
 387                if ((url_anchor is not None) and (url_anchor != fragment)):
 388                    raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').")
 389
 390                url_anchor = fragment
 391
 392            # Check for any parameters.
 393
 394            url_params = parse_query_string(parts.query)
 395            for (key, value) in url_params.items():
 396                if (key not in parameters):
 397                    parameters[key] = value
 398
 399        if (url_path is None):
 400            raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.')
 401
 402        # Sort parameter keys for consistency.
 403        parameters = {key: parameters[key] for key in sorted(parameters.keys())}
 404
 405        return url_path, url_anchor, parameters
 406
 407    def resolve_paths(self, base_dir: str) -> None:
 408        """ Resolve any paths relative to the given base dir. """
 409
 410        for file_info in self.files:
 411            file_info.resolve_path(base_dir)
 412
 413    def match(self, query: 'HTTPExchange',
 414            match_headers: bool = True,
 415            headers_to_skip: typing.Union[typing.List[str], None] = None,
 416            params_to_skip: typing.Union[typing.List[str], None] = None,
 417            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
 418        """
 419        Check if this exchange matches the query exchange.
 420        If they match, `(True, None)` will be returned.
 421        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
 422
 423        Note that this is not an equality check,
 424        as a query exchange is often missing the response components.
 425        This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
 426        """
 427
 428        if (query.method != self.method):
 429            return False, f"HTTP method does not match (query = {query.method}, target = {self.method})."
 430
 431        if (query.url_path != self.url_path):
 432            return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})."
 433
 434        if (query.url_anchor != self.url_anchor):
 435            return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})."
 436
 437        if (headers_to_skip is None):
 438            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
 439
 440        if (params_to_skip is None):
 441            params_to_skip = []
 442
 443        if (match_headers):
 444            match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip)
 445            if (not match):
 446                return False, hint
 447
 448        match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip)
 449        if (not match):
 450            return False, hint
 451
 452        # Check file names and hash contents.
 453        query_filenames = {(file.name, file.hash_content()) for file in query.files}
 454        target_filenames = {(file.name, file.hash_content()) for file in self.files}
 455        if (query_filenames != target_filenames):
 456            return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})."
 457
 458        return True, None
 459
 460    def _match_dict(self, label: str,
 461            query_dict: typing.Dict[str, typing.Any],
 462            target_dict: typing.Dict[str, typing.Any],
 463            keys_to_skip: typing.Union[typing.List[str], None] = None,
 464            query_label: str = 'query',
 465            target_label: str = 'target',
 466            normalize_key_case: bool = True,
 467            ) -> typing.Tuple[bool, typing.Union[str, None]]:
 468        """ A subcheck in match(), specifically for a dictionary. """
 469
 470        if (keys_to_skip is None):
 471            keys_to_skip = []
 472
 473        if (normalize_key_case):
 474            keys_to_skip = [key.lower() for key in keys_to_skip]
 475            query_dict = {key.lower(): value for (key, value) in query_dict.items()}
 476            target_dict = {key.lower(): value for (key, value) in target_dict.items()}
 477
 478        query_keys = set(query_dict.keys()) - set(keys_to_skip)
 479        target_keys = set(target_dict.keys()) - set(keys_to_skip)
 480
 481        if (query_keys != target_keys):
 482            return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})."
 483
 484        for key in sorted(query_keys):
 485            query_value = query_dict[key]
 486            target_value = target_dict[key]
 487
 488            if (query_value != target_value):
 489                comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'"
 490                return False, f"{label.title()} '{key}' has a non-matching value ({comparison})."
 491
 492        return True, None
 493
 494    def get_url(self) -> str:
 495        """ Get the URL path and anchor combined. """
 496
 497        url = self.url_path
 498
 499        if (self.url_anchor is not None):
 500            url += ('#' + self.url_anchor)
 501
 502        return url
 503
 504    def make_request(self, base_url: str, raise_for_status: bool = True, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 505        """ Perform the HTTP request described by this exchange. """
 506
 507        files = []
 508        for file_info in self.files:
 509            content = file_info.content
 510
 511            # Content is base64 encoded.
 512            if (file_info.b64_encoded and isinstance(content, str)):
 513                content = edq.util.encoding.from_base64(content)
 514
 515            # Content is missing and must be in a file.
 516            if (content is None):
 517                content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
 518
 519            files.append((file_info.name, content))
 520
 521        url = f"{base_url}/{self.get_url()}"
 522
 523        response, body = make_request(self.method, url,
 524                headers = self.headers,
 525                data = self.parameters,
 526                files = files,
 527                raise_for_status = raise_for_status,
 528                allow_redirects = self.allow_redirects,
 529                **kwargs,
 530        )
 531
 532        if (self.response_modifier is not None):
 533            modify_func = edq.util.pyimport.fetch(self.response_modifier)
 534            body = modify_func(response, body)
 535
 536        return response, body
 537
 538    def match_response(self, response: requests.Response,
 539            override_body: typing.Union[str, None] = None,
 540            headers_to_skip: typing.Union[typing.List[str], None] = None,
 541            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
 542        """
 543        Check if this exchange matches the given response.
 544        If they match, `(True, None)` will be returned.
 545        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
 546        """
 547
 548        if (headers_to_skip is None):
 549            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
 550
 551        response_body = override_body
 552        if (response_body is None):
 553            response_body = response.text
 554
 555        if (self.response_code != response.status_code):
 556            return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})"
 557
 558        expected_body = self.response_body
 559        actual_body = None
 560
 561        if (self.json_body):
 562            actual_body = response.json()
 563
 564            # Normalize the actual and expected bodies.
 565
 566            actual_body = edq.util.json.dumps(actual_body)
 567
 568            if (isinstance(expected_body, str)):
 569                expected_body = edq.util.json.loads(expected_body)
 570
 571            expected_body = edq.util.json.dumps(expected_body)
 572        else:
 573            actual_body = response_body
 574
 575        if (self.response_body != actual_body):
 576            body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'"
 577            return False, f"body does not match ({body_hint})"
 578
 579        match, hint = self._match_dict('header', response.headers, self.response_headers,
 580                keys_to_skip = headers_to_skip,
 581                query_label = 'response', target_label = 'exchange')
 582
 583        if (not match):
 584            return False, hint
 585
 586        return True, None
 587
 588    def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str:
 589        """ Create a consistent, semi-unique, and relative path for this exchange. """
 590
 591        url = self.get_url().strip()
 592        parts = url.split('/')
 593
 594
 595        if (url in ['', '/']):
 596            filename = '_index_'
 597            dirname = ''
 598        else:
 599            filename = parts[-1]
 600
 601            if (len(parts) > 1):
 602                dirname = os.path.join(*parts[0:-1])
 603            else:
 604                dirname = ''
 605
 606        parameters = {}
 607        for key in sorted(self.parameters.keys()):
 608            parameters[key] = self.parameters[key]
 609
 610        # Treat files as params as well.
 611        for file_info in self.files:
 612            parameters[f"file-{file_info.name}"] = file_info.hash_content()
 613
 614        query = urllib.parse.urlencode(parameters)
 615        if (query != ''):
 616            # The query can get very long, so we may have to clip it.
 617            query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH)
 618
 619            # Note that the '?' is URL encoded.
 620            filename += f"%3F{query_text}"
 621
 622        filename += f"_{self.method}{http_exchange_extension}"
 623
 624        return os.path.join(dirname, filename)
 625
 626    def to_dict(self) -> typing.Dict[str, typing.Any]:
 627        return vars(self)
 628
 629    @classmethod
 630    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
 631        return HTTPExchange(**data)
 632
 633    @classmethod
 634    def from_path(cls, path: str,
 635            set_source_path: bool = True,
 636            ) -> 'HTTPExchange':
 637        """
 638        Load an exchange from a file.
 639        This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
 640        """
 641
 642        exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange))
 643
 644        if (set_source_path):
 645            exchange.source_path = os.path.abspath(path)
 646
 647        exchange.resolve_paths(os.path.abspath(os.path.dirname(path)))
 648
 649        return exchange
 650
 651    @classmethod
 652    def from_response(cls,
 653            response: requests.Response,
 654            headers_to_skip: typing.Union[typing.List[str], None] = None,
 655            params_to_skip: typing.Union[typing.List[str], None] = None,
 656            allow_redirects: typing.Union[bool, None] = None,
 657            ) -> 'HTTPExchange':
 658        """ Create a full excahnge from a response. """
 659
 660        if (headers_to_skip is None):
 661            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
 662
 663        if (params_to_skip is None):
 664            params_to_skip = []
 665
 666        body = response.text
 667
 668        # Use a clean function (if one exists).
 669        if (_exchanges_clean_func is not None):
 670            modify_func = edq.util.pyimport.fetch(_exchanges_clean_func)
 671            body = modify_func(response, body)
 672
 673        request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()}
 674        response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()}
 675
 676        # Clean headers.
 677        for key in headers_to_skip:
 678            key = key.lower()
 679
 680            request_headers.pop(key, None)
 681            response_headers.pop(key, None)
 682
 683        request_data, request_files = parse_request_data(response.request.url, response.request.headers, response.request.body)
 684
 685        # Clean parameters.
 686        for key in params_to_skip:
 687            request_data.pop(key, None)
 688
 689        files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()]
 690
 691        data = {
 692            'method': response.request.method,
 693            'url': response.request.url,
 694            'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None),
 695            'parameters': request_data,
 696            'files': files,
 697            'headers': request_headers,
 698            'response_code': response.status_code,
 699            'response_headers': response_headers,
 700            'response_body': body,
 701            'response_modifier': _exchanges_clean_func,
 702            'allow_redirects': allow_redirects,
 703        }
 704
 705        return HTTPExchange(**data)
 706
 707@typing.runtime_checkable
 708class HTTPExchangeComplete(typing.Protocol):
 709    """
 710    A function that can be called after a request has been made (and exchange constructed).
 711    """
 712
 713    def __call__(self,
 714            exchange: HTTPExchange
 715            ) -> str:
 716        """
 717        Called after an HTTP exchange has been completed.
 718        """
 719
 720_make_request_exchange_complete_func: typing.Union[HTTPExchangeComplete, None] = None  # pylint: disable=invalid-name
 721""" If not None, call this func after make_request() has created its HTTPExchange. """
 722
 723def find_open_port(
 724        start_port: int = DEFAULT_START_PORT, end_port: int = DEFAULT_END_PORT,
 725        wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC) -> int:
 726    """
 727    Find an open port on this machine within the given range (inclusive).
 728    If no open port is found, an error is raised.
 729    """
 730
 731    for port in range(start_port, end_port + 1):
 732        try:
 733            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 734            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 735            sock.bind(('127.0.0.1', port))
 736
 737            # Explicitly close the port and wait a short amount of time for the port to clear.
 738            # This should not be required because of the socket option above,
 739            # but the cost is small.
 740            sock.close()
 741            time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC)
 742
 743            return port
 744        except socket.error as ex:
 745            sock.close()
 746
 747            if (ex.errno == errno.EADDRINUSE):
 748                continue
 749
 750            # Unknown error.
 751            raise ex
 752
 753    raise ValueError(f"Could not find open port in [{start_port}, {end_port}].")
 754
 755def make_request(method: str, url: str,
 756        headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 757        data: typing.Union[typing.Dict[str, typing.Any], None] = None,
 758        files: typing.Union[typing.List[typing.Any], None] = None,
 759        raise_for_status: bool = True,
 760        timeout_secs: float = DEFAULT_REQUEST_TIMEOUT_SECS,
 761        output_dir: typing.Union[str, None] = None,
 762        send_anchor_header: bool = True,
 763        headers_to_skip: typing.Union[typing.List[str], None] = None,
 764        params_to_skip: typing.Union[typing.List[str], None] = None,
 765        http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION,
 766        add_http_prefix: bool = True,
 767        additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 768        exchange_complete_func: typing.Union[HTTPExchangeComplete, None] = None,
 769        allow_redirects: typing.Union[bool, None] = None,
 770        **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 771    """
 772    Make an HTTP request and return the response object and text body.
 773    """
 774
 775    if (add_http_prefix and (not url.lower().startswith('http'))):
 776        url = 'http://' + url
 777
 778    if (output_dir is None):
 779        output_dir = _exchanges_out_dir
 780
 781    if (headers is None):
 782        headers = {}
 783
 784    if (data is None):
 785        data = {}
 786
 787    if (files is None):
 788        files = []
 789
 790    if (additional_requests_options is None):
 791        additional_requests_options = {}
 792
 793    # Add in the anchor as a header (since it is not traditionally sent in an HTTP request).
 794    if (send_anchor_header):
 795        headers = headers.copy()
 796
 797        parts = urllib.parse.urlparse(url)
 798        headers[ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#')
 799
 800    options = additional_requests_options.copy()
 801    options.update({
 802        'headers': headers,
 803        'files': files,
 804        'timeout': timeout_secs,
 805    })
 806
 807    if (allow_redirects is not None):
 808        options['allow_redirects'] = allow_redirects
 809
 810    if (method == 'GET'):
 811        options['params'] = data
 812    else:
 813        options['data'] = data
 814
 815    logging.debug("Making %s request: '%s' (options = %s).", method, url, options)
 816    response = requests.request(method, url, **options)
 817
 818    body = response.text
 819    logging.debug("Response:\n%s", body)
 820
 821    if (raise_for_status):
 822        # Handle 404s a little special, as their body may contain useful information.
 823        if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')):
 824            response.reason += f" (Body: '{body.strip()}')"
 825
 826        response.raise_for_status()
 827
 828    exchange = None
 829    if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)):
 830        exchange = HTTPExchange.from_response(response,
 831                headers_to_skip = headers_to_skip, params_to_skip = params_to_skip,
 832                allow_redirects = options.get('allow_redirects', None))
 833
 834    if ((output_dir is not None) and (exchange is not None)):
 835        relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension)
 836        path = os.path.abspath(os.path.join(output_dir, relpath))
 837
 838        edq.util.dirent.mkdir(os.path.dirname(path))
 839        edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False)
 840
 841    if ((exchange_complete_func is not None) and (exchange is not None)):
 842        exchange_complete_func(exchange)
 843
 844    if ((_make_request_exchange_complete_func is not None) and (exchange is not None)):
 845        _make_request_exchange_complete_func(exchange)  # pylint: disable=not-callable
 846
 847    return response, body
 848
 849def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 850    """
 851    Make a GET request and return the response object and text body.
 852    """
 853
 854    return make_request('GET', url, **kwargs)
 855
 856def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 857    """
 858    Make a POST request and return the response object and text body.
 859    """
 860
 861    return make_request('POST', url, **kwargs)
 862
 863def parse_request_data(
 864        url: str,
 865        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
 866        body: typing.Union[bytes, str, io.BufferedIOBase],
 867        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
 868    """ Parse data and files from an HTTP request URL and body. """
 869
 870    # Parse data from the request body.
 871    request_data, request_files = parse_request_body_data(headers, body)
 872
 873    # Parse parameters from the URL.
 874    url_parts = urllib.parse.urlparse(url)
 875    request_data.update(parse_query_string(url_parts.query))
 876
 877    return request_data, request_files
 878
 879def parse_request_body_data(
 880        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
 881        body: typing.Union[bytes, str, io.BufferedIOBase],
 882        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
 883    """ Parse data and files from an HTTP request body. """
 884
 885    data: typing.Dict[str, typing.Any] = {}
 886    files: typing.Dict[str, bytes] = {}
 887
 888    length = int(headers.get('Content-Length', 0))
 889    if (length == 0):
 890        return data, files
 891
 892    if (isinstance(body, io.BufferedIOBase)):
 893        raw_content = body.read(length)
 894    elif (isinstance(body, str)):
 895        raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING)
 896    else:
 897        raw_content = body
 898
 899    content_type = headers.get('Content-Type', '')
 900
 901    if (content_type in ['', 'application/x-www-form-urlencoded']):
 902        data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip())
 903        return data, files
 904
 905    if (content_type.startswith('multipart/form-data')):
 906        decoder = requests_toolbelt.multipart.decoder.MultipartDecoder(
 907            raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING)
 908
 909        for multipart_section in decoder.parts:
 910            values = parse_content_dispositions(multipart_section.headers)
 911
 912            name = values.get('name', None)
 913            if (name is None):
 914                raise ValueError("Could not find name for multipart section.")
 915
 916            # Look for a "filename" field to indicate a multipart section is a file.
 917            # The file's desired name is still in "name", but an alternate name is in "filename".
 918            if ('filename' in values):
 919                filename = values.get('name', '')
 920                if (filename == ''):
 921                    raise ValueError("Unable to find filename for multipart section.")
 922
 923                files[filename] = multipart_section.content
 924            else:
 925                # Normal Parameter
 926                data[name] = multipart_section.text
 927
 928        return data, files
 929
 930    raise ValueError(f"Unknown content type: '{content_type}'.")
 931
 932def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]:
 933    """ Parse a request's content dispositions from headers. """
 934
 935    values = {}
 936    for (key, value) in headers.items():
 937        if (isinstance(key, bytes)):
 938            key = key.decode(edq.util.dirent.DEFAULT_ENCODING)
 939
 940        if (isinstance(value, bytes)):
 941            value = value.decode(edq.util.dirent.DEFAULT_ENCODING)
 942
 943        key = key.strip().lower()
 944        if (key != 'content-disposition'):
 945            continue
 946
 947        # The Python stdlib recommends using the email library for this parsing,
 948        # but I have not had a good experience with it.
 949        for part in value.strip().split(';'):
 950            part = part.strip()
 951
 952            parts = part.split('=')
 953            if (len(parts) != 2):
 954                continue
 955
 956            cd_key = parts[0].strip()
 957            cd_value = parts[1].strip().strip('"')
 958
 959            values[cd_key] = cd_value
 960
 961    return values
 962
 963def parse_query_string(text: str,
 964        replace_single_lists: bool = True,
 965        keep_blank_values: bool = True,
 966        **kwargs: typing.Any) -> typing.Dict[str, typing.Any]:
 967    """
 968    Parse a query string (like urllib.parse.parse_qs()), and normalize the result.
 969    If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.
 970    """
 971
 972    results = urllib.parse.parse_qs(text, keep_blank_values = True)
 973    for (key, value) in results.items():
 974        if (replace_single_lists and (len(value) == 1)):
 975            results[key] = value[0]  # type: ignore[assignment]
 976
 977    return results
 978
 979def set_cli_args(parser: argparse.ArgumentParser, extra_state: typing.Dict[str, typing.Any]) -> None:
 980    """
 981    Set common CLI arguments.
 982    This is a sibling to init_from_args(), as the arguments set here can be interpreted there.
 983    """
 984
 985    parser.add_argument('--http-exchanges-out-dir', dest = 'http_exchanges_out_dir',
 986        action = 'store', type = str, default = None,
 987        help = 'If set, write all outgoing HTTP requests as exchanges to this directory.')
 988
 989    parser.add_argument('--http-exchanges-clean-func', dest = 'http_exchanges_clean_func',
 990        action = 'store', type = str, default = None,
 991        help = 'If set, default all created exchanges to this modifier function.')
 992
 993def init_from_args(
 994        parser: argparse.ArgumentParser,
 995        args: argparse.Namespace,
 996        extra_state: typing.Dict[str, typing.Any]) -> None:
 997    """
 998    Take in args from a parser that was passed to set_cli_args(),
 999    and call init() with the appropriate arguments.
1000    """
1001
1002    global _exchanges_out_dir  # pylint: disable=global-statement
1003    if (args.http_exchanges_out_dir is not None):
1004        _exchanges_out_dir = args.http_exchanges_out_dir
1005
1006    global _exchanges_clean_func  # pylint: disable=global-statement
1007    if (args.http_exchanges_clean_func is not None):
1008        _exchanges_clean_func = args.http_exchanges_clean_func
DEFAULT_START_PORT: int = 30000
DEFAULT_END_PORT: int = 40000
DEFAULT_PORT_SEARCH_WAIT_SEC: float = 0.01
DEFAULT_REQUEST_TIMEOUT_SECS: float = 10.0
DEFAULT_HTTP_EXCHANGE_EXTENSION: str = '.httpex.json'
QUERY_CLIP_LENGTH: int = 100

If the filename of an HTTPExhange being saved is longer than this, then clip it.

ANCHOR_HEADER_KEY: str = 'edq-anchor'

By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL. Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information.

ALLOWED_METHODS: List[str] = ['DELETE', 'GET', 'HEAD', 'OPTIONS', 'PATCH', 'POST', 'PUT']

Allowed HTTP methods for an HTTPExchange.

DEFAULT_EXCHANGE_IGNORE_HEADERS: List[str] = ['accept', 'accept-encoding', 'accept-language', 'cache-control', 'connection', 'content-length', 'content-security-policy', 'content-type', 'cookie', 'date', 'dnt', 'etag', 'host', 'link', 'location', 'priority', 'referrer-policy', 'sec-fetch-dest', 'sec-fetch-mode', 'sec-fetch-site', 'sec-fetch-user', 'sec-gpc', 'server', 'server-timing', 'set-cookie', 'upgrade-insecure-requests', 'user-agent', 'x-content-type-options', 'x-download-options', 'x-permitted-cross-domain-policies', 'x-rate-limit-remaining', 'x-request-context-id', 'x-request-cost', 'x-runtime', 'x-session-id', 'x-xss-protection', 'edq-anchor']

By default, ignore these headers during exchange matching. Some are sent automatically and we don't need to record (like content-length), and some are additional information we don't need.

@typing.runtime_checkable
class ResponseModifierFunction(typing.Protocol):
107@typing.runtime_checkable
108class ResponseModifierFunction(typing.Protocol):
109    """
110    A function that can be used to modify an exchange's response.
111    Exchanges can use these functions to normalize their responses before saving.
112    """
113
114    def __call__(self,
115            response: requests.Response,
116            body: str,
117            ) -> str:
118        """
119        Modify the http response.
120        Headers may be modified in the response directly,
121        while the modified (or same) body must be returned.
122        """

A function that can be used to modify an exchange's response. Exchanges can use these functions to normalize their responses before saving.

ResponseModifierFunction(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
class FileInfo(edq.util.json.DictConverter):
124class FileInfo(edq.util.json.DictConverter):
125    """ Store info about files used in HTTP exchanges. """
126
127    def __init__(self,
128            path: typing.Union[str, None] = None,
129            name: typing.Union[str, None] = None,
130            content: typing.Union[str, bytes, None] = None,
131            b64_encoded: bool = False,
132            **kwargs: typing.Any) -> None:
133        # Normalize the path from POSIX-style to the system's style.
134        if (path is not None):
135            path = str(pathlib.PurePath(pathlib.PurePosixPath(path)))
136
137        self.path: typing.Union[str, None] = path
138        """ The on-disk path to a file. """
139
140        if ((name is None) and (self.path is not None)):
141            name = os.path.basename(self.path)
142
143        if (name is None):
144            raise ValueError("No name was provided for file.")
145
146        self.name: str = name
147        """ The name for this file used in an HTTP request. """
148
149        self.content: typing.Union[str, bytes, None] = content
150        """ The contents of this file. """
151
152        self.b64_encoded: bool = b64_encoded
153        """ Whether the content is a string encoded in Base64. """
154
155        if ((self.path is None) and (self.content is None)):
156            raise ValueError("File must have either path or content specified.")
157
158    def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
159        """ Resolve this path relative to the given base dir. """
160
161        if ((self.path is not None) and (not os.path.isabs(self.path))):
162            self.path = os.path.abspath(os.path.join(base_dir, self.path))
163
164        if ((self.path is not None) and (self.content is None) and load_file):
165            self.content = edq.util.dirent.read_file_bytes(self.path)
166
167    def hash_content(self) -> str:
168        """
169        Compute a hash for the content present.
170        If no content is provided, use the path.
171        """
172
173        hash_content = self.content
174
175        if (self.b64_encoded and isinstance(hash_content, str)):
176            hash_content = edq.util.encoding.from_base64(hash_content)
177
178        if (hash_content is None):
179            hash_content = self.path
180
181        return edq.util.hash.sha256_hex(hash_content)
182
183    def to_dict(self) -> typing.Dict[str, typing.Any]:
184        data = vars(self).copy()
185
186        # JSON does not support raw bytes, so we will need to base64 encode any binary content.
187        if (isinstance(self.content, bytes)):
188            data['content'] = edq.util.encoding.to_base64(self.content)
189            data['b64_encoded'] = True
190
191        return data
192
193    @classmethod
194    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
195        return FileInfo(**data)

Store info about files used in HTTP exchanges.

FileInfo( path: Optional[str] = None, name: Optional[str] = None, content: Union[str, bytes, NoneType] = None, b64_encoded: bool = False, **kwargs: Any)
127    def __init__(self,
128            path: typing.Union[str, None] = None,
129            name: typing.Union[str, None] = None,
130            content: typing.Union[str, bytes, None] = None,
131            b64_encoded: bool = False,
132            **kwargs: typing.Any) -> None:
133        # Normalize the path from POSIX-style to the system's style.
134        if (path is not None):
135            path = str(pathlib.PurePath(pathlib.PurePosixPath(path)))
136
137        self.path: typing.Union[str, None] = path
138        """ The on-disk path to a file. """
139
140        if ((name is None) and (self.path is not None)):
141            name = os.path.basename(self.path)
142
143        if (name is None):
144            raise ValueError("No name was provided for file.")
145
146        self.name: str = name
147        """ The name for this file used in an HTTP request. """
148
149        self.content: typing.Union[str, bytes, None] = content
150        """ The contents of this file. """
151
152        self.b64_encoded: bool = b64_encoded
153        """ Whether the content is a string encoded in Base64. """
154
155        if ((self.path is None) and (self.content is None)):
156            raise ValueError("File must have either path or content specified.")
path: Optional[str]

The on-disk path to a file.

name: str

The name for this file used in an HTTP request.

content: Union[str, bytes, NoneType]

The contents of this file.

b64_encoded: bool

Whether the content is a string encoded in Base64.

def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
158    def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
159        """ Resolve this path relative to the given base dir. """
160
161        if ((self.path is not None) and (not os.path.isabs(self.path))):
162            self.path = os.path.abspath(os.path.join(base_dir, self.path))
163
164        if ((self.path is not None) and (self.content is None) and load_file):
165            self.content = edq.util.dirent.read_file_bytes(self.path)

Resolve this path relative to the given base dir.

def hash_content(self) -> str:
167    def hash_content(self) -> str:
168        """
169        Compute a hash for the content present.
170        If no content is provided, use the path.
171        """
172
173        hash_content = self.content
174
175        if (self.b64_encoded and isinstance(hash_content, str)):
176            hash_content = edq.util.encoding.from_base64(hash_content)
177
178        if (hash_content is None):
179            hash_content = self.path
180
181        return edq.util.hash.sha256_hex(hash_content)

Compute a hash for the content present. If no content is provided, use the path.

def to_dict(self) -> Dict[str, Any]:
183    def to_dict(self) -> typing.Dict[str, typing.Any]:
184        data = vars(self).copy()
185
186        # JSON does not support raw bytes, so we will need to base64 encode any binary content.
187        if (isinstance(self.content, bytes)):
188            data['content'] = edq.util.encoding.to_base64(self.content)
189            data['b64_encoded'] = True
190
191        return data

Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.

A general (but inefficient) implementation is provided by default.

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Any:
193    @classmethod
194    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
195        return FileInfo(**data)

Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.

A general (but inefficient) implementation is provided by default.

class HTTPExchange(edq.util.json.DictConverter):
197class HTTPExchange(edq.util.json.DictConverter):
198    """
199    The request and response making up a full HTTP exchange.
200    """
201
202    def __init__(self,
203            method: str = 'GET',
204            url: typing.Union[str, None] = None,
205            url_path: typing.Union[str, None] = None,
206            url_anchor: typing.Union[str, None] = None,
207            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
208            files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None,
209            headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
210            allow_redirects: typing.Union[bool, None] = None,
211            response_code: int = http.HTTPStatus.OK,
212            response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
213            json_body: typing.Union[bool, None] = None,
214            response_body: typing.Union[str, dict, list, None] = None,
215            source_path: typing.Union[str, None] = None,
216            response_modifier: typing.Union[str, None] = None,
217            extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
218            **kwargs: typing.Any) -> None:
219        method = str(method).upper()
220        if (method not in ALLOWED_METHODS):
221            raise ValueError(f"Got unknown/disallowed method: '{method}'.")
222
223        self.method: str = method
224        """ The HTTP method for this exchange. """
225
226        url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters)
227
228        self.url_path: str = url_path
229        """
230        The path portion of the request URL.
231        Only the path (not domain, port, params, anchor, etc) should be included.
232        """
233
234        self.url_anchor: typing.Union[str, None] = url_anchor
235        """
236        The anchor portion of the request URL (if it exists).
237        """
238
239        self.parameters: typing.Dict[str, typing.Any] = parameters
240        """
241        The parameters/arguments for this request.
242        Parameters should be provided here and not encoded into URLs,
243        regardless of the request method.
244        With the exception of files, all parameters should be placed here.
245        """
246
247        if (files is None):
248            files = []
249
250        parsed_files = []
251        for file in files:
252            if (isinstance(file, FileInfo)):
253                parsed_files.append(file)
254            else:
255                parsed_files.append(FileInfo(**file))
256
257        self.files: typing.List[FileInfo] = parsed_files
258        """
259        A list of files to include in the request.
260        The files are represented as dicts with a
261        "path" (path to the file on disk) and "name" (the filename to send in the request) field.
262        These paths must be POSIX-style paths,
263        they will be converted to system-specific paths.
264        Once this exchange is ready for use, these paths should be resolved (and probably absolute).
265        However, when serialized these paths should probably be relative.
266        To reconcile this, resolve_paths() should be called before using this exchange.
267        """
268
269        if (headers is None):
270            headers = {}
271
272        self.headers: typing.Dict[str, typing.Any] = headers
273        """ Headers in the request. """
274
275        if (allow_redirects is None):
276            allow_redirects = True
277
278        self.allow_redirects: bool = allow_redirects
279        """ Follow redirects. """
280
281        self.response_code: int = response_code
282        """ The HTTP status code of the response. """
283
284        if (response_headers is None):
285            response_headers = {}
286
287        self.response_headers: typing.Dict[str, typing.Any] = response_headers
288        """ Headers in the response. """
289
290        if (json_body is None):
291            json_body = isinstance(response_body, (dict, list))
292
293        self.json_body: bool = json_body
294        """
295        Indicates that the response is JSON and should be converted to/from a string.
296        If the response body is passed in a dict/list and this is passed as None,
297        then this will be set as true.
298        """
299
300        if (self.json_body and isinstance(response_body, (dict, list))):
301            response_body = edq.util.json.dumps(response_body)
302
303        self.response_body: typing.Union[str, None] = response_body  # type: ignore[assignment]
304        """
305        The response that should be sent in this exchange.
306        """
307
308        self.response_modifier: typing.Union[str, None] = response_modifier
309        """
310        This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response())
311        before sent back to the caller.
312        This reference must be importable via edq.util.pyimport.fetch().
313        """
314
315        self.source_path: typing.Union[str, None] = source_path
316        """
317        The path that this exchange was loaded from (if it was loaded from a file).
318        This value should never be serialized, but can be useful for testing.
319        """
320
321        if (extra_options is None):
322            extra_options = {}
323
324        self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy()
325        """
326        Additional options for this exchange.
327        This library will not use these options, but other's may.
328        kwargs will also be added to this.
329        """
330
331        self.extra_options.update(kwargs)
332
333    def _parse_url_components(self,
334            url: typing.Union[str, None] = None,
335            url_path: typing.Union[str, None] = None,
336            url_anchor: typing.Union[str, None] = None,
337            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
338            ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]:
339        """
340        Parse out all URL-based components from raw inputs.
341        The URL's path and anchor can either be supplied separately, or as part of the full given URL.
342        If content is present in both places, they much match (or an error will be raised).
343        Query parameters may be provided in the full URL,
344        but will be overwritten by any that are provided separately.
345        Any information from the URL aside from the path, anchor/fragment, and query will be ignored.
346        Note that path parameters (not query parameters) will be ignored.
347        The final url path, url anchor, and parameters will be returned.
348        """
349
350        # Do base initialization and cleanup.
351
352        if (url_path is not None):
353            url_path = url_path.strip()
354            if (url_path == ''):
355                url_path = ''
356            else:
357                url_path = url_path.lstrip('/')
358
359        if (url_anchor is not None):
360            url_anchor = url_anchor.strip()
361            if (url_anchor == ''):
362                url_anchor = None
363            else:
364                url_anchor = url_anchor.lstrip('#')
365
366        if (parameters is None):
367            parameters = {}
368
369        # Parse the URL (if present).
370
371        if ((url is not None) and (url.strip() != '')):
372            parts = urllib.parse.urlparse(url)
373
374            # Handle the path.
375
376            path = parts.path.lstrip('/')
377
378            if ((url_path is not None) and (url_path != path)):
379                raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').")
380
381            url_path = path
382
383            # Check the optional anchor/fragment.
384
385            if (parts.fragment != ''):
386                fragment = parts.fragment.lstrip('#')
387
388                if ((url_anchor is not None) and (url_anchor != fragment)):
389                    raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').")
390
391                url_anchor = fragment
392
393            # Check for any parameters.
394
395            url_params = parse_query_string(parts.query)
396            for (key, value) in url_params.items():
397                if (key not in parameters):
398                    parameters[key] = value
399
400        if (url_path is None):
401            raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.')
402
403        # Sort parameter keys for consistency.
404        parameters = {key: parameters[key] for key in sorted(parameters.keys())}
405
406        return url_path, url_anchor, parameters
407
408    def resolve_paths(self, base_dir: str) -> None:
409        """ Resolve any paths relative to the given base dir. """
410
411        for file_info in self.files:
412            file_info.resolve_path(base_dir)
413
414    def match(self, query: 'HTTPExchange',
415            match_headers: bool = True,
416            headers_to_skip: typing.Union[typing.List[str], None] = None,
417            params_to_skip: typing.Union[typing.List[str], None] = None,
418            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
419        """
420        Check if this exchange matches the query exchange.
421        If they match, `(True, None)` will be returned.
422        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
423
424        Note that this is not an equality check,
425        as a query exchange is often missing the response components.
426        This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
427        """
428
429        if (query.method != self.method):
430            return False, f"HTTP method does not match (query = {query.method}, target = {self.method})."
431
432        if (query.url_path != self.url_path):
433            return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})."
434
435        if (query.url_anchor != self.url_anchor):
436            return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})."
437
438        if (headers_to_skip is None):
439            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
440
441        if (params_to_skip is None):
442            params_to_skip = []
443
444        if (match_headers):
445            match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip)
446            if (not match):
447                return False, hint
448
449        match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip)
450        if (not match):
451            return False, hint
452
453        # Check file names and hash contents.
454        query_filenames = {(file.name, file.hash_content()) for file in query.files}
455        target_filenames = {(file.name, file.hash_content()) for file in self.files}
456        if (query_filenames != target_filenames):
457            return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})."
458
459        return True, None
460
461    def _match_dict(self, label: str,
462            query_dict: typing.Dict[str, typing.Any],
463            target_dict: typing.Dict[str, typing.Any],
464            keys_to_skip: typing.Union[typing.List[str], None] = None,
465            query_label: str = 'query',
466            target_label: str = 'target',
467            normalize_key_case: bool = True,
468            ) -> typing.Tuple[bool, typing.Union[str, None]]:
469        """ A subcheck in match(), specifically for a dictionary. """
470
471        if (keys_to_skip is None):
472            keys_to_skip = []
473
474        if (normalize_key_case):
475            keys_to_skip = [key.lower() for key in keys_to_skip]
476            query_dict = {key.lower(): value for (key, value) in query_dict.items()}
477            target_dict = {key.lower(): value for (key, value) in target_dict.items()}
478
479        query_keys = set(query_dict.keys()) - set(keys_to_skip)
480        target_keys = set(target_dict.keys()) - set(keys_to_skip)
481
482        if (query_keys != target_keys):
483            return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})."
484
485        for key in sorted(query_keys):
486            query_value = query_dict[key]
487            target_value = target_dict[key]
488
489            if (query_value != target_value):
490                comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'"
491                return False, f"{label.title()} '{key}' has a non-matching value ({comparison})."
492
493        return True, None
494
495    def get_url(self) -> str:
496        """ Get the URL path and anchor combined. """
497
498        url = self.url_path
499
500        if (self.url_anchor is not None):
501            url += ('#' + self.url_anchor)
502
503        return url
504
505    def make_request(self, base_url: str, raise_for_status: bool = True, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
506        """ Perform the HTTP request described by this exchange. """
507
508        files = []
509        for file_info in self.files:
510            content = file_info.content
511
512            # Content is base64 encoded.
513            if (file_info.b64_encoded and isinstance(content, str)):
514                content = edq.util.encoding.from_base64(content)
515
516            # Content is missing and must be in a file.
517            if (content is None):
518                content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
519
520            files.append((file_info.name, content))
521
522        url = f"{base_url}/{self.get_url()}"
523
524        response, body = make_request(self.method, url,
525                headers = self.headers,
526                data = self.parameters,
527                files = files,
528                raise_for_status = raise_for_status,
529                allow_redirects = self.allow_redirects,
530                **kwargs,
531        )
532
533        if (self.response_modifier is not None):
534            modify_func = edq.util.pyimport.fetch(self.response_modifier)
535            body = modify_func(response, body)
536
537        return response, body
538
539    def match_response(self, response: requests.Response,
540            override_body: typing.Union[str, None] = None,
541            headers_to_skip: typing.Union[typing.List[str], None] = None,
542            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
543        """
544        Check if this exchange matches the given response.
545        If they match, `(True, None)` will be returned.
546        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
547        """
548
549        if (headers_to_skip is None):
550            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
551
552        response_body = override_body
553        if (response_body is None):
554            response_body = response.text
555
556        if (self.response_code != response.status_code):
557            return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})"
558
559        expected_body = self.response_body
560        actual_body = None
561
562        if (self.json_body):
563            actual_body = response.json()
564
565            # Normalize the actual and expected bodies.
566
567            actual_body = edq.util.json.dumps(actual_body)
568
569            if (isinstance(expected_body, str)):
570                expected_body = edq.util.json.loads(expected_body)
571
572            expected_body = edq.util.json.dumps(expected_body)
573        else:
574            actual_body = response_body
575
576        if (self.response_body != actual_body):
577            body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'"
578            return False, f"body does not match ({body_hint})"
579
580        match, hint = self._match_dict('header', response.headers, self.response_headers,
581                keys_to_skip = headers_to_skip,
582                query_label = 'response', target_label = 'exchange')
583
584        if (not match):
585            return False, hint
586
587        return True, None
588
589    def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str:
590        """ Create a consistent, semi-unique, and relative path for this exchange. """
591
592        url = self.get_url().strip()
593        parts = url.split('/')
594
595
596        if (url in ['', '/']):
597            filename = '_index_'
598            dirname = ''
599        else:
600            filename = parts[-1]
601
602            if (len(parts) > 1):
603                dirname = os.path.join(*parts[0:-1])
604            else:
605                dirname = ''
606
607        parameters = {}
608        for key in sorted(self.parameters.keys()):
609            parameters[key] = self.parameters[key]
610
611        # Treat files as params as well.
612        for file_info in self.files:
613            parameters[f"file-{file_info.name}"] = file_info.hash_content()
614
615        query = urllib.parse.urlencode(parameters)
616        if (query != ''):
617            # The query can get very long, so we may have to clip it.
618            query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH)
619
620            # Note that the '?' is URL encoded.
621            filename += f"%3F{query_text}"
622
623        filename += f"_{self.method}{http_exchange_extension}"
624
625        return os.path.join(dirname, filename)
626
627    def to_dict(self) -> typing.Dict[str, typing.Any]:
628        return vars(self)
629
630    @classmethod
631    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
632        return HTTPExchange(**data)
633
634    @classmethod
635    def from_path(cls, path: str,
636            set_source_path: bool = True,
637            ) -> 'HTTPExchange':
638        """
639        Load an exchange from a file.
640        This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
641        """
642
643        exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange))
644
645        if (set_source_path):
646            exchange.source_path = os.path.abspath(path)
647
648        exchange.resolve_paths(os.path.abspath(os.path.dirname(path)))
649
650        return exchange
651
652    @classmethod
653    def from_response(cls,
654            response: requests.Response,
655            headers_to_skip: typing.Union[typing.List[str], None] = None,
656            params_to_skip: typing.Union[typing.List[str], None] = None,
657            allow_redirects: typing.Union[bool, None] = None,
658            ) -> 'HTTPExchange':
659        """ Create a full excahnge from a response. """
660
661        if (headers_to_skip is None):
662            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
663
664        if (params_to_skip is None):
665            params_to_skip = []
666
667        body = response.text
668
669        # Use a clean function (if one exists).
670        if (_exchanges_clean_func is not None):
671            modify_func = edq.util.pyimport.fetch(_exchanges_clean_func)
672            body = modify_func(response, body)
673
674        request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()}
675        response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()}
676
677        # Clean headers.
678        for key in headers_to_skip:
679            key = key.lower()
680
681            request_headers.pop(key, None)
682            response_headers.pop(key, None)
683
684        request_data, request_files = parse_request_data(response.request.url, response.request.headers, response.request.body)
685
686        # Clean parameters.
687        for key in params_to_skip:
688            request_data.pop(key, None)
689
690        files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()]
691
692        data = {
693            'method': response.request.method,
694            'url': response.request.url,
695            'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None),
696            'parameters': request_data,
697            'files': files,
698            'headers': request_headers,
699            'response_code': response.status_code,
700            'response_headers': response_headers,
701            'response_body': body,
702            'response_modifier': _exchanges_clean_func,
703            'allow_redirects': allow_redirects,
704        }
705
706        return HTTPExchange(**data)

The request and response making up a full HTTP exchange.

HTTPExchange( method: str = 'GET', url: Optional[str] = None, url_path: Optional[str] = None, url_anchor: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, files: Optional[List[Union[FileInfo, Dict[str, Any]]]] = None, headers: Optional[Dict[str, Any]] = None, allow_redirects: Optional[bool] = None, response_code: int = <HTTPStatus.OK: 200>, response_headers: Optional[Dict[str, Any]] = None, json_body: Optional[bool] = None, response_body: Union[str, dict, list, NoneType] = None, source_path: Optional[str] = None, response_modifier: Optional[str] = None, extra_options: Optional[Dict[str, Any]] = None, **kwargs: Any)
202    def __init__(self,
203            method: str = 'GET',
204            url: typing.Union[str, None] = None,
205            url_path: typing.Union[str, None] = None,
206            url_anchor: typing.Union[str, None] = None,
207            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
208            files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None,
209            headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
210            allow_redirects: typing.Union[bool, None] = None,
211            response_code: int = http.HTTPStatus.OK,
212            response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
213            json_body: typing.Union[bool, None] = None,
214            response_body: typing.Union[str, dict, list, None] = None,
215            source_path: typing.Union[str, None] = None,
216            response_modifier: typing.Union[str, None] = None,
217            extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
218            **kwargs: typing.Any) -> None:
219        method = str(method).upper()
220        if (method not in ALLOWED_METHODS):
221            raise ValueError(f"Got unknown/disallowed method: '{method}'.")
222
223        self.method: str = method
224        """ The HTTP method for this exchange. """
225
226        url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters)
227
228        self.url_path: str = url_path
229        """
230        The path portion of the request URL.
231        Only the path (not domain, port, params, anchor, etc) should be included.
232        """
233
234        self.url_anchor: typing.Union[str, None] = url_anchor
235        """
236        The anchor portion of the request URL (if it exists).
237        """
238
239        self.parameters: typing.Dict[str, typing.Any] = parameters
240        """
241        The parameters/arguments for this request.
242        Parameters should be provided here and not encoded into URLs,
243        regardless of the request method.
244        With the exception of files, all parameters should be placed here.
245        """
246
247        if (files is None):
248            files = []
249
250        parsed_files = []
251        for file in files:
252            if (isinstance(file, FileInfo)):
253                parsed_files.append(file)
254            else:
255                parsed_files.append(FileInfo(**file))
256
257        self.files: typing.List[FileInfo] = parsed_files
258        """
259        A list of files to include in the request.
260        The files are represented as dicts with a
261        "path" (path to the file on disk) and "name" (the filename to send in the request) field.
262        These paths must be POSIX-style paths,
263        they will be converted to system-specific paths.
264        Once this exchange is ready for use, these paths should be resolved (and probably absolute).
265        However, when serialized these paths should probably be relative.
266        To reconcile this, resolve_paths() should be called before using this exchange.
267        """
268
269        if (headers is None):
270            headers = {}
271
272        self.headers: typing.Dict[str, typing.Any] = headers
273        """ Headers in the request. """
274
275        if (allow_redirects is None):
276            allow_redirects = True
277
278        self.allow_redirects: bool = allow_redirects
279        """ Follow redirects. """
280
281        self.response_code: int = response_code
282        """ The HTTP status code of the response. """
283
284        if (response_headers is None):
285            response_headers = {}
286
287        self.response_headers: typing.Dict[str, typing.Any] = response_headers
288        """ Headers in the response. """
289
290        if (json_body is None):
291            json_body = isinstance(response_body, (dict, list))
292
293        self.json_body: bool = json_body
294        """
295        Indicates that the response is JSON and should be converted to/from a string.
296        If the response body is passed in a dict/list and this is passed as None,
297        then this will be set as true.
298        """
299
300        if (self.json_body and isinstance(response_body, (dict, list))):
301            response_body = edq.util.json.dumps(response_body)
302
303        self.response_body: typing.Union[str, None] = response_body  # type: ignore[assignment]
304        """
305        The response that should be sent in this exchange.
306        """
307
308        self.response_modifier: typing.Union[str, None] = response_modifier
309        """
310        This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response())
311        before sent back to the caller.
312        This reference must be importable via edq.util.pyimport.fetch().
313        """
314
315        self.source_path: typing.Union[str, None] = source_path
316        """
317        The path that this exchange was loaded from (if it was loaded from a file).
318        This value should never be serialized, but can be useful for testing.
319        """
320
321        if (extra_options is None):
322            extra_options = {}
323
324        self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy()
325        """
326        Additional options for this exchange.
327        This library will not use these options, but other's may.
328        kwargs will also be added to this.
329        """
330
331        self.extra_options.update(kwargs)
method: str

The HTTP method for this exchange.

url_path: str

The path portion of the request URL. Only the path (not domain, port, params, anchor, etc) should be included.

url_anchor: Optional[str]

The anchor portion of the request URL (if it exists).

parameters: Dict[str, Any]

The parameters/arguments for this request. Parameters should be provided here and not encoded into URLs, regardless of the request method. With the exception of files, all parameters should be placed here.

files: List[FileInfo]

A list of files to include in the request. The files are represented as dicts with a "path" (path to the file on disk) and "name" (the filename to send in the request) field. These paths must be POSIX-style paths, they will be converted to system-specific paths. Once this exchange is ready for use, these paths should be resolved (and probably absolute). However, when serialized these paths should probably be relative. To reconcile this, resolve_paths() should be called before using this exchange.

headers: Dict[str, Any]

Headers in the request.

allow_redirects: bool

Follow redirects.

response_code: int

The HTTP status code of the response.

response_headers: Dict[str, Any]

Headers in the response.

json_body: bool

Indicates that the response is JSON and should be converted to/from a string. If the response body is passed in a dict/list and this is passed as None, then this will be set as true.

response_body: Optional[str]

The response that should be sent in this exchange.

response_modifier: Optional[str]

This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) before sent back to the caller. This reference must be importable via edq.util.pyimport.fetch().

source_path: Optional[str]

The path that this exchange was loaded from (if it was loaded from a file). This value should never be serialized, but can be useful for testing.

extra_options: Dict[str, Any]

Additional options for this exchange. This library will not use these options, but other's may. kwargs will also be added to this.

def resolve_paths(self, base_dir: str) -> None:
408    def resolve_paths(self, base_dir: str) -> None:
409        """ Resolve any paths relative to the given base dir. """
410
411        for file_info in self.files:
412            file_info.resolve_path(base_dir)

Resolve any paths relative to the given base dir.

def match( self, query: HTTPExchange, match_headers: bool = True, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, **kwargs: Any) -> Tuple[bool, Optional[str]]:
414    def match(self, query: 'HTTPExchange',
415            match_headers: bool = True,
416            headers_to_skip: typing.Union[typing.List[str], None] = None,
417            params_to_skip: typing.Union[typing.List[str], None] = None,
418            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
419        """
420        Check if this exchange matches the query exchange.
421        If they match, `(True, None)` will be returned.
422        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
423
424        Note that this is not an equality check,
425        as a query exchange is often missing the response components.
426        This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
427        """
428
429        if (query.method != self.method):
430            return False, f"HTTP method does not match (query = {query.method}, target = {self.method})."
431
432        if (query.url_path != self.url_path):
433            return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})."
434
435        if (query.url_anchor != self.url_anchor):
436            return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})."
437
438        if (headers_to_skip is None):
439            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
440
441        if (params_to_skip is None):
442            params_to_skip = []
443
444        if (match_headers):
445            match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip)
446            if (not match):
447                return False, hint
448
449        match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip)
450        if (not match):
451            return False, hint
452
453        # Check file names and hash contents.
454        query_filenames = {(file.name, file.hash_content()) for file in query.files}
455        target_filenames = {(file.name, file.hash_content()) for file in self.files}
456        if (query_filenames != target_filenames):
457            return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})."
458
459        return True, None

Check if this exchange matches the query exchange. If they match, (True, None) will be returned. If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.

Note that this is not an equality check, as a query exchange is often missing the response components. This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.

def get_url(self) -> str:
495    def get_url(self) -> str:
496        """ Get the URL path and anchor combined. """
497
498        url = self.url_path
499
500        if (self.url_anchor is not None):
501            url += ('#' + self.url_anchor)
502
503        return url

Get the URL path and anchor combined.

def make_request( self, base_url: str, raise_for_status: bool = True, **kwargs: Any) -> Tuple[requests.models.Response, str]:
505    def make_request(self, base_url: str, raise_for_status: bool = True, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
506        """ Perform the HTTP request described by this exchange. """
507
508        files = []
509        for file_info in self.files:
510            content = file_info.content
511
512            # Content is base64 encoded.
513            if (file_info.b64_encoded and isinstance(content, str)):
514                content = edq.util.encoding.from_base64(content)
515
516            # Content is missing and must be in a file.
517            if (content is None):
518                content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
519
520            files.append((file_info.name, content))
521
522        url = f"{base_url}/{self.get_url()}"
523
524        response, body = make_request(self.method, url,
525                headers = self.headers,
526                data = self.parameters,
527                files = files,
528                raise_for_status = raise_for_status,
529                allow_redirects = self.allow_redirects,
530                **kwargs,
531        )
532
533        if (self.response_modifier is not None):
534            modify_func = edq.util.pyimport.fetch(self.response_modifier)
535            body = modify_func(response, body)
536
537        return response, body

Perform the HTTP request described by this exchange.

def match_response( self, response: requests.models.Response, override_body: Optional[str] = None, headers_to_skip: Optional[List[str]] = None, **kwargs: Any) -> Tuple[bool, Optional[str]]:
539    def match_response(self, response: requests.Response,
540            override_body: typing.Union[str, None] = None,
541            headers_to_skip: typing.Union[typing.List[str], None] = None,
542            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
543        """
544        Check if this exchange matches the given response.
545        If they match, `(True, None)` will be returned.
546        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
547        """
548
549        if (headers_to_skip is None):
550            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
551
552        response_body = override_body
553        if (response_body is None):
554            response_body = response.text
555
556        if (self.response_code != response.status_code):
557            return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})"
558
559        expected_body = self.response_body
560        actual_body = None
561
562        if (self.json_body):
563            actual_body = response.json()
564
565            # Normalize the actual and expected bodies.
566
567            actual_body = edq.util.json.dumps(actual_body)
568
569            if (isinstance(expected_body, str)):
570                expected_body = edq.util.json.loads(expected_body)
571
572            expected_body = edq.util.json.dumps(expected_body)
573        else:
574            actual_body = response_body
575
576        if (self.response_body != actual_body):
577            body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'"
578            return False, f"body does not match ({body_hint})"
579
580        match, hint = self._match_dict('header', response.headers, self.response_headers,
581                keys_to_skip = headers_to_skip,
582                query_label = 'response', target_label = 'exchange')
583
584        if (not match):
585            return False, hint
586
587        return True, None

Check if this exchange matches the given response. If they match, (True, None) will be returned. If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.

def compute_relpath(self, http_exchange_extension: str = '.httpex.json') -> str:
589    def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str:
590        """ Create a consistent, semi-unique, and relative path for this exchange. """
591
592        url = self.get_url().strip()
593        parts = url.split('/')
594
595
596        if (url in ['', '/']):
597            filename = '_index_'
598            dirname = ''
599        else:
600            filename = parts[-1]
601
602            if (len(parts) > 1):
603                dirname = os.path.join(*parts[0:-1])
604            else:
605                dirname = ''
606
607        parameters = {}
608        for key in sorted(self.parameters.keys()):
609            parameters[key] = self.parameters[key]
610
611        # Treat files as params as well.
612        for file_info in self.files:
613            parameters[f"file-{file_info.name}"] = file_info.hash_content()
614
615        query = urllib.parse.urlencode(parameters)
616        if (query != ''):
617            # The query can get very long, so we may have to clip it.
618            query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH)
619
620            # Note that the '?' is URL encoded.
621            filename += f"%3F{query_text}"
622
623        filename += f"_{self.method}{http_exchange_extension}"
624
625        return os.path.join(dirname, filename)

Create a consistent, semi-unique, and relative path for this exchange.

def to_dict(self) -> Dict[str, Any]:
627    def to_dict(self) -> typing.Dict[str, typing.Any]:
628        return vars(self)

Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.

A general (but inefficient) implementation is provided by default.

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Any:
630    @classmethod
631    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
632        return HTTPExchange(**data)

Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.

A general (but inefficient) implementation is provided by default.

@classmethod
def from_path( cls, path: str, set_source_path: bool = True) -> HTTPExchange:
634    @classmethod
635    def from_path(cls, path: str,
636            set_source_path: bool = True,
637            ) -> 'HTTPExchange':
638        """
639        Load an exchange from a file.
640        This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
641        """
642
643        exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange))
644
645        if (set_source_path):
646            exchange.source_path = os.path.abspath(path)
647
648        exchange.resolve_paths(os.path.abspath(os.path.dirname(path)))
649
650        return exchange

Load an exchange from a file. This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.

@classmethod
def from_response( cls, response: requests.models.Response, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, allow_redirects: Optional[bool] = None) -> HTTPExchange:
652    @classmethod
653    def from_response(cls,
654            response: requests.Response,
655            headers_to_skip: typing.Union[typing.List[str], None] = None,
656            params_to_skip: typing.Union[typing.List[str], None] = None,
657            allow_redirects: typing.Union[bool, None] = None,
658            ) -> 'HTTPExchange':
659        """ Create a full excahnge from a response. """
660
661        if (headers_to_skip is None):
662            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
663
664        if (params_to_skip is None):
665            params_to_skip = []
666
667        body = response.text
668
669        # Use a clean function (if one exists).
670        if (_exchanges_clean_func is not None):
671            modify_func = edq.util.pyimport.fetch(_exchanges_clean_func)
672            body = modify_func(response, body)
673
674        request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()}
675        response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()}
676
677        # Clean headers.
678        for key in headers_to_skip:
679            key = key.lower()
680
681            request_headers.pop(key, None)
682            response_headers.pop(key, None)
683
684        request_data, request_files = parse_request_data(response.request.url, response.request.headers, response.request.body)
685
686        # Clean parameters.
687        for key in params_to_skip:
688            request_data.pop(key, None)
689
690        files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()]
691
692        data = {
693            'method': response.request.method,
694            'url': response.request.url,
695            'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None),
696            'parameters': request_data,
697            'files': files,
698            'headers': request_headers,
699            'response_code': response.status_code,
700            'response_headers': response_headers,
701            'response_body': body,
702            'response_modifier': _exchanges_clean_func,
703            'allow_redirects': allow_redirects,
704        }
705
706        return HTTPExchange(**data)

Create a full excahnge from a response.

@typing.runtime_checkable
class HTTPExchangeComplete(typing.Protocol):
708@typing.runtime_checkable
709class HTTPExchangeComplete(typing.Protocol):
710    """
711    A function that can be called after a request has been made (and exchange constructed).
712    """
713
714    def __call__(self,
715            exchange: HTTPExchange
716            ) -> str:
717        """
718        Called after an HTTP exchange has been completed.
719        """

A function that can be called after a request has been made (and exchange constructed).

HTTPExchangeComplete(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
def find_open_port( start_port: int = 30000, end_port: int = 40000, wait_time: float = 0.01) -> int:
724def find_open_port(
725        start_port: int = DEFAULT_START_PORT, end_port: int = DEFAULT_END_PORT,
726        wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC) -> int:
727    """
728    Find an open port on this machine within the given range (inclusive).
729    If no open port is found, an error is raised.
730    """
731
732    for port in range(start_port, end_port + 1):
733        try:
734            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
735            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
736            sock.bind(('127.0.0.1', port))
737
738            # Explicitly close the port and wait a short amount of time for the port to clear.
739            # This should not be required because of the socket option above,
740            # but the cost is small.
741            sock.close()
742            time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC)
743
744            return port
745        except socket.error as ex:
746            sock.close()
747
748            if (ex.errno == errno.EADDRINUSE):
749                continue
750
751            # Unknown error.
752            raise ex
753
754    raise ValueError(f"Could not find open port in [{start_port}, {end_port}].")

Find an open port on this machine within the given range (inclusive). If no open port is found, an error is raised.

def make_request( method: str, url: str, headers: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[List[Any]] = None, raise_for_status: bool = True, timeout_secs: float = 10.0, output_dir: Optional[str] = None, send_anchor_header: bool = True, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, http_exchange_extension: str = '.httpex.json', add_http_prefix: bool = True, additional_requests_options: Optional[Dict[str, Any]] = None, exchange_complete_func: Optional[HTTPExchangeComplete] = None, allow_redirects: Optional[bool] = None, **kwargs: Any) -> Tuple[requests.models.Response, str]:
756def make_request(method: str, url: str,
757        headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
758        data: typing.Union[typing.Dict[str, typing.Any], None] = None,
759        files: typing.Union[typing.List[typing.Any], None] = None,
760        raise_for_status: bool = True,
761        timeout_secs: float = DEFAULT_REQUEST_TIMEOUT_SECS,
762        output_dir: typing.Union[str, None] = None,
763        send_anchor_header: bool = True,
764        headers_to_skip: typing.Union[typing.List[str], None] = None,
765        params_to_skip: typing.Union[typing.List[str], None] = None,
766        http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION,
767        add_http_prefix: bool = True,
768        additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
769        exchange_complete_func: typing.Union[HTTPExchangeComplete, None] = None,
770        allow_redirects: typing.Union[bool, None] = None,
771        **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
772    """
773    Make an HTTP request and return the response object and text body.
774    """
775
776    if (add_http_prefix and (not url.lower().startswith('http'))):
777        url = 'http://' + url
778
779    if (output_dir is None):
780        output_dir = _exchanges_out_dir
781
782    if (headers is None):
783        headers = {}
784
785    if (data is None):
786        data = {}
787
788    if (files is None):
789        files = []
790
791    if (additional_requests_options is None):
792        additional_requests_options = {}
793
794    # Add in the anchor as a header (since it is not traditionally sent in an HTTP request).
795    if (send_anchor_header):
796        headers = headers.copy()
797
798        parts = urllib.parse.urlparse(url)
799        headers[ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#')
800
801    options = additional_requests_options.copy()
802    options.update({
803        'headers': headers,
804        'files': files,
805        'timeout': timeout_secs,
806    })
807
808    if (allow_redirects is not None):
809        options['allow_redirects'] = allow_redirects
810
811    if (method == 'GET'):
812        options['params'] = data
813    else:
814        options['data'] = data
815
816    logging.debug("Making %s request: '%s' (options = %s).", method, url, options)
817    response = requests.request(method, url, **options)
818
819    body = response.text
820    logging.debug("Response:\n%s", body)
821
822    if (raise_for_status):
823        # Handle 404s a little special, as their body may contain useful information.
824        if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')):
825            response.reason += f" (Body: '{body.strip()}')"
826
827        response.raise_for_status()
828
829    exchange = None
830    if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)):
831        exchange = HTTPExchange.from_response(response,
832                headers_to_skip = headers_to_skip, params_to_skip = params_to_skip,
833                allow_redirects = options.get('allow_redirects', None))
834
835    if ((output_dir is not None) and (exchange is not None)):
836        relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension)
837        path = os.path.abspath(os.path.join(output_dir, relpath))
838
839        edq.util.dirent.mkdir(os.path.dirname(path))
840        edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False)
841
842    if ((exchange_complete_func is not None) and (exchange is not None)):
843        exchange_complete_func(exchange)
844
845    if ((_make_request_exchange_complete_func is not None) and (exchange is not None)):
846        _make_request_exchange_complete_func(exchange)  # pylint: disable=not-callable
847
848    return response, body

Make an HTTP request and return the response object and text body.

def make_get(url: str, **kwargs: Any) -> Tuple[requests.models.Response, str]:
850def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
851    """
852    Make a GET request and return the response object and text body.
853    """
854
855    return make_request('GET', url, **kwargs)

Make a GET request and return the response object and text body.

def make_post(url: str, **kwargs: Any) -> Tuple[requests.models.Response, str]:
857def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
858    """
859    Make a POST request and return the response object and text body.
860    """
861
862    return make_request('POST', url, **kwargs)

Make a POST request and return the response object and text body.

def parse_request_data( url: str, headers: Union[email.message.Message, Dict[str, Any]], body: Union[bytes, str, io.BufferedIOBase]) -> Tuple[Dict[str, Any], Dict[str, bytes]]:
864def parse_request_data(
865        url: str,
866        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
867        body: typing.Union[bytes, str, io.BufferedIOBase],
868        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
869    """ Parse data and files from an HTTP request URL and body. """
870
871    # Parse data from the request body.
872    request_data, request_files = parse_request_body_data(headers, body)
873
874    # Parse parameters from the URL.
875    url_parts = urllib.parse.urlparse(url)
876    request_data.update(parse_query_string(url_parts.query))
877
878    return request_data, request_files

Parse data and files from an HTTP request URL and body.

def parse_request_body_data( headers: Union[email.message.Message, Dict[str, Any]], body: Union[bytes, str, io.BufferedIOBase]) -> Tuple[Dict[str, Any], Dict[str, bytes]]:
880def parse_request_body_data(
881        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
882        body: typing.Union[bytes, str, io.BufferedIOBase],
883        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
884    """ Parse data and files from an HTTP request body. """
885
886    data: typing.Dict[str, typing.Any] = {}
887    files: typing.Dict[str, bytes] = {}
888
889    length = int(headers.get('Content-Length', 0))
890    if (length == 0):
891        return data, files
892
893    if (isinstance(body, io.BufferedIOBase)):
894        raw_content = body.read(length)
895    elif (isinstance(body, str)):
896        raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING)
897    else:
898        raw_content = body
899
900    content_type = headers.get('Content-Type', '')
901
902    if (content_type in ['', 'application/x-www-form-urlencoded']):
903        data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip())
904        return data, files
905
906    if (content_type.startswith('multipart/form-data')):
907        decoder = requests_toolbelt.multipart.decoder.MultipartDecoder(
908            raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING)
909
910        for multipart_section in decoder.parts:
911            values = parse_content_dispositions(multipart_section.headers)
912
913            name = values.get('name', None)
914            if (name is None):
915                raise ValueError("Could not find name for multipart section.")
916
917            # Look for a "filename" field to indicate a multipart section is a file.
918            # The file's desired name is still in "name", but an alternate name is in "filename".
919            if ('filename' in values):
920                filename = values.get('name', '')
921                if (filename == ''):
922                    raise ValueError("Unable to find filename for multipart section.")
923
924                files[filename] = multipart_section.content
925            else:
926                # Normal Parameter
927                data[name] = multipart_section.text
928
929        return data, files
930
931    raise ValueError(f"Unknown content type: '{content_type}'.")

Parse data and files from an HTTP request body.

def parse_content_dispositions(headers: Union[email.message.Message, Dict[str, Any]]) -> Dict[str, Any]:
933def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]:
934    """ Parse a request's content dispositions from headers. """
935
936    values = {}
937    for (key, value) in headers.items():
938        if (isinstance(key, bytes)):
939            key = key.decode(edq.util.dirent.DEFAULT_ENCODING)
940
941        if (isinstance(value, bytes)):
942            value = value.decode(edq.util.dirent.DEFAULT_ENCODING)
943
944        key = key.strip().lower()
945        if (key != 'content-disposition'):
946            continue
947
948        # The Python stdlib recommends using the email library for this parsing,
949        # but I have not had a good experience with it.
950        for part in value.strip().split(';'):
951            part = part.strip()
952
953            parts = part.split('=')
954            if (len(parts) != 2):
955                continue
956
957            cd_key = parts[0].strip()
958            cd_value = parts[1].strip().strip('"')
959
960            values[cd_key] = cd_value
961
962    return values

Parse a request's content dispositions from headers.

def parse_query_string( text: str, replace_single_lists: bool = True, keep_blank_values: bool = True, **kwargs: Any) -> Dict[str, Any]:
964def parse_query_string(text: str,
965        replace_single_lists: bool = True,
966        keep_blank_values: bool = True,
967        **kwargs: typing.Any) -> typing.Dict[str, typing.Any]:
968    """
969    Parse a query string (like urllib.parse.parse_qs()), and normalize the result.
970    If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.
971    """
972
973    results = urllib.parse.parse_qs(text, keep_blank_values = True)
974    for (key, value) in results.items():
975        if (replace_single_lists and (len(value) == 1)):
976            results[key] = value[0]  # type: ignore[assignment]
977
978    return results

Parse a query string (like urllib.parse.parse_qs()), and normalize the result. If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.

def set_cli_args(parser: argparse.ArgumentParser, extra_state: Dict[str, Any]) -> None:
980def set_cli_args(parser: argparse.ArgumentParser, extra_state: typing.Dict[str, typing.Any]) -> None:
981    """
982    Set common CLI arguments.
983    This is a sibling to init_from_args(), as the arguments set here can be interpreted there.
984    """
985
986    parser.add_argument('--http-exchanges-out-dir', dest = 'http_exchanges_out_dir',
987        action = 'store', type = str, default = None,
988        help = 'If set, write all outgoing HTTP requests as exchanges to this directory.')
989
990    parser.add_argument('--http-exchanges-clean-func', dest = 'http_exchanges_clean_func',
991        action = 'store', type = str, default = None,
992        help = 'If set, default all created exchanges to this modifier function.')

Set common CLI arguments. This is a sibling to init_from_args(), as the arguments set here can be interpreted there.

def init_from_args( parser: argparse.ArgumentParser, args: argparse.Namespace, extra_state: Dict[str, Any]) -> None:
 994def init_from_args(
 995        parser: argparse.ArgumentParser,
 996        args: argparse.Namespace,
 997        extra_state: typing.Dict[str, typing.Any]) -> None:
 998    """
 999    Take in args from a parser that was passed to set_cli_args(),
1000    and call init() with the appropriate arguments.
1001    """
1002
1003    global _exchanges_out_dir  # pylint: disable=global-statement
1004    if (args.http_exchanges_out_dir is not None):
1005        _exchanges_out_dir = args.http_exchanges_out_dir
1006
1007    global _exchanges_clean_func  # pylint: disable=global-statement
1008    if (args.http_exchanges_clean_func is not None):
1009        _exchanges_clean_func = args.http_exchanges_clean_func

Take in args from a parser that was passed to set_cli_args(), and call init() with the appropriate arguments.