edq.net.exchange

  1import copy
  2import http
  3import os
  4import pathlib
  5import typing
  6import urllib.parse
  7
  8import requests
  9
 10import edq.net.util
 11import edq.util.dirent
 12import edq.util.encoding
 13import edq.util.hash
 14import edq.util.json
 15import edq.util.pyimport
 16
 17DEFAULT_HTTP_EXCHANGE_EXTENSION: str= '.httpex.json'
 18
 19QUERY_CLIP_LENGTH: int = 100
 20""" If the filename of an HTTPExhange being saved is longer than this, then clip it. """
 21
 22ANCHOR_HEADER_KEY: str = 'edq-anchor'
 23"""
 24By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL.
 25Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information.
 26"""
 27
 28ALLOWED_METHODS: typing.List[str] = [
 29    'DELETE',
 30    'GET',
 31    'HEAD',
 32    'OPTIONS',
 33    'PATCH',
 34    'POST',
 35    'PUT',
 36]
 37""" Allowed HTTP methods for an HTTPExchange. """
 38
 39DEFAULT_EXCHANGE_IGNORE_HEADERS: typing.List[str] = [
 40    'accept',
 41    'accept-encoding',
 42    'accept-language',
 43    'cache-control',
 44    'connection',
 45    'content-length',
 46    'content-security-policy',
 47    'content-type',
 48    'cookie',
 49    'date',
 50    'dnt',
 51    'etag',
 52    'host',
 53    'link',
 54    'location',
 55    'priority',
 56    'referrer-policy',
 57    'sec-fetch-dest',
 58    'sec-fetch-mode',
 59    'sec-fetch-site',
 60    'sec-fetch-user',
 61    'sec-gpc',
 62    'server',
 63    'server-timing',
 64    'set-cookie',
 65    'upgrade-insecure-requests',
 66    'user-agent',
 67    'x-content-type-options',
 68    'x-download-options',
 69    'x-permitted-cross-domain-policies',
 70    'x-rate-limit-remaining',
 71    'x-request-context-id',
 72    'x-request-cost',
 73    'x-runtime',
 74    'x-session-id',
 75    'x-xss-protection',
 76    ANCHOR_HEADER_KEY,
 77]
 78"""
 79By default, ignore these headers during exchange matching.
 80Some are sent automatically and we don't need to record (like content-length),
 81and some are additional information we don't need.
 82"""
 83
 84_exchanges_clean_func: typing.Union[str, None] = None  # pylint: disable=invalid-name
 85"""
 86If not None, all created exchanges (in HTTPExchange.make_request() and HTTPExchange.from_response()) will use this response modifier.
 87This function will be called with the response and response body before parsing the rest of the data to build the exchange.
 88"""
 89
 90_exchanges_finalize_func: typing.Union[str, None] = None  # pylint: disable=invalid-name
 91"""
 92If not None, all created exchanges (in HTTPExchange.make_request()) will use this finalize function.
 93This function will be called with the created exchange right after construction and before passing back to the caller
 94(or writing).
 95"""
 96
 97class FileInfo(edq.util.json.DictConverter):
 98    """ Store info about files used in HTTP exchanges. """
 99
100    def __init__(self,
101            path: typing.Union[str, None] = None,
102            name: typing.Union[str, None] = None,
103            content: typing.Union[str, bytes, None] = None,
104            b64_encoded: bool = False,
105            **kwargs: typing.Any) -> None:
106        # Normalize the path from POSIX-style to the system's style.
107        if (path is not None):
108            path = str(pathlib.PurePath(pathlib.PurePosixPath(path)))
109
110        self.path: typing.Union[str, None] = path
111        """ The on-disk path to a file. """
112
113        if ((name is None) and (self.path is not None)):
114            name = os.path.basename(self.path)
115
116        if (name is None):
117            raise ValueError("No name was provided for file.")
118
119        self.name: str = name
120        """ The name for this file used in an HTTP request. """
121
122        self.content: typing.Union[str, bytes, None] = content
123        """ The contents of this file. """
124
125        self.b64_encoded: bool = b64_encoded
126        """ Whether the content is a string encoded in Base64. """
127
128        if ((self.path is None) and (self.content is None)):
129            raise ValueError("File must have either path or content specified.")
130
131    def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
132        """ Resolve this path relative to the given base dir. """
133
134        if ((self.path is not None) and (not os.path.isabs(self.path))):
135            self.path = os.path.abspath(os.path.join(base_dir, self.path))
136
137        if ((self.path is not None) and (self.content is None) and load_file):
138            self.content = edq.util.dirent.read_file_bytes(self.path)
139
140    def hash_content(self) -> str:
141        """
142        Compute a hash for the content present.
143        If no content is provided, use the path.
144        """
145
146        hash_content = self.content
147
148        if (self.b64_encoded and isinstance(hash_content, str)):
149            hash_content = edq.util.encoding.from_base64(hash_content)
150
151        if (hash_content is None):
152            hash_content = self.path
153
154        return edq.util.hash.sha256_hex(hash_content)
155
156    def to_dict(self) -> typing.Dict[str, typing.Any]:
157        data = vars(self).copy()
158
159        # JSON does not support raw bytes, so we will need to base64 encode any binary content.
160        if (isinstance(self.content, bytes)):
161            data['content'] = edq.util.encoding.to_base64(self.content)
162            data['b64_encoded'] = True
163
164        return data
165
166    @classmethod
167    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
168        return FileInfo(**data)
169
170class HTTPExchange(edq.util.json.DictConverter):
171    """
172    The request and response making up a full HTTP exchange.
173    """
174
175    def __init__(self,
176            method: str = 'GET',
177            url: typing.Union[str, None] = None,
178            url_path: typing.Union[str, None] = None,
179            url_anchor: typing.Union[str, None] = None,
180            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
181            files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None,
182            headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
183            allow_redirects: typing.Union[bool, None] = None,
184            response_code: int = http.HTTPStatus.OK,
185            response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
186            json_body: typing.Union[bool, None] = None,
187            response_body: typing.Union[str, dict, list, None] = None,
188            source_path: typing.Union[str, None] = None,
189            response_modifier: typing.Union[str, None] = None,
190            finalize: typing.Union[str, None] = None,
191            extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
192            **kwargs: typing.Any) -> None:
193        method = str(method).upper()
194        if (method not in ALLOWED_METHODS):
195            raise ValueError(f"Got unknown/disallowed method: '{method}'.")
196
197        self.method: str = method
198        """ The HTTP method for this exchange. """
199
200        url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters)
201
202        self.url_path: str = url_path
203        """
204        The path portion of the request URL.
205        Only the path (not domain, port, params, anchor, etc) should be included.
206        """
207
208        self.url_anchor: typing.Union[str, None] = url_anchor
209        """
210        The anchor portion of the request URL (if it exists).
211        """
212
213        self.parameters: typing.Dict[str, typing.Any] = parameters
214        """
215        The parameters/arguments for this request.
216        Parameters should be provided here and not encoded into URLs,
217        regardless of the request method.
218        With the exception of files, all parameters should be placed here.
219        """
220
221        if (files is None):
222            files = []
223
224        parsed_files = []
225        for file in files:
226            if (isinstance(file, FileInfo)):
227                parsed_files.append(file)
228            else:
229                parsed_files.append(FileInfo(**file))
230
231        self.files: typing.List[FileInfo] = parsed_files
232        """
233        A list of files to include in the request.
234        The files are represented as dicts with a
235        "path" (path to the file on disk) and "name" (the filename to send in the request) field.
236        These paths must be POSIX-style paths,
237        they will be converted to system-specific paths.
238        Once this exchange is ready for use, these paths should be resolved (and probably absolute).
239        However, when serialized these paths should probably be relative.
240        To reconcile this, resolve_paths() should be called before using this exchange.
241        """
242
243        if (headers is None):
244            headers = {}
245
246        self.headers: typing.Dict[str, typing.Any] = headers
247        """ Headers in the request. """
248
249        if (allow_redirects is None):
250            allow_redirects = True
251
252        self.allow_redirects: bool = allow_redirects
253        """ Follow redirects. """
254
255        self.response_code: int = response_code
256        """ The HTTP status code of the response. """
257
258        if (response_headers is None):
259            response_headers = {}
260
261        self.response_headers: typing.Dict[str, typing.Any] = response_headers
262        """ Headers in the response. """
263
264        if (json_body is None):
265            json_body = isinstance(response_body, (dict, list))
266
267        self.json_body: bool = json_body
268        """
269        Indicates that the response is JSON and should be converted to/from a string.
270        If the response body is passed in a dict/list and this is passed as None,
271        then this will be set as true.
272        """
273
274        if (self.json_body and isinstance(response_body, (dict, list))):
275            response_body = edq.util.json.dumps(response_body)
276
277        self.response_body: typing.Union[str, None] = response_body  # type: ignore[assignment]
278        """
279        The response that should be sent in this exchange.
280        """
281
282        self.response_modifier: typing.Union[str, None] = response_modifier
283        """
284        This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response())
285        before sent back to the caller.
286        This reference must be importable via edq.util.pyimport.fetch().
287        """
288
289        self.finalize: typing.Union[str, None] = finalize
290        """
291        This function reference will be used to finalize echanges before sent back to the caller.
292        This reference must be importable via edq.util.pyimport.fetch().
293        """
294
295        self.source_path: typing.Union[str, None] = source_path
296        """
297        The path that this exchange was loaded from (if it was loaded from a file).
298        This value should never be serialized, but can be useful for testing.
299        """
300
301        if (extra_options is None):
302            extra_options = {}
303
304        self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy()
305        """
306        Additional options for this exchange.
307        This library will not use these options, but other's may.
308        kwargs will also be added to this.
309        """
310
311        self.extra_options.update(kwargs)
312
313    def _parse_url_components(self,
314            url: typing.Union[str, None] = None,
315            url_path: typing.Union[str, None] = None,
316            url_anchor: typing.Union[str, None] = None,
317            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
318            ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]:
319        """
320        Parse out all URL-based components from raw inputs.
321        The URL's path and anchor can either be supplied separately, or as part of the full given URL.
322        If content is present in both places, they much match (or an error will be raised).
323        Query parameters may be provided in the full URL,
324        but will be overwritten by any that are provided separately.
325        Any information from the URL aside from the path, anchor/fragment, and query will be ignored.
326        Note that path parameters (not query parameters) will be ignored.
327        The final url path, url anchor, and parameters will be returned.
328        """
329
330        # Do base initialization and cleanup.
331
332        if (url_path is not None):
333            url_path = url_path.strip()
334            if (url_path == ''):
335                url_path = ''
336            else:
337                url_path = url_path.lstrip('/')
338
339        if (url_anchor is not None):
340            url_anchor = url_anchor.strip()
341            if (url_anchor == ''):
342                url_anchor = None
343            else:
344                url_anchor = url_anchor.lstrip('#')
345
346        if (parameters is None):
347            parameters = {}
348
349        # Parse the URL (if present).
350
351        if ((url is not None) and (url.strip() != '')):
352            parts = urllib.parse.urlparse(url)
353
354            # Handle the path.
355
356            path = parts.path.lstrip('/')
357
358            if ((url_path is not None) and (url_path != path)):
359                raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').")
360
361            url_path = path
362
363            # Check the optional anchor/fragment.
364
365            if (parts.fragment != ''):
366                fragment = parts.fragment.lstrip('#')
367
368                if ((url_anchor is not None) and (url_anchor != fragment)):
369                    raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').")
370
371                url_anchor = fragment
372
373            # Check for any parameters.
374
375            url_params = edq.net.util.parse_query_string(parts.query)
376            for (key, value) in url_params.items():
377                if (key not in parameters):
378                    parameters[key] = value
379
380        if (url_path is None):
381            raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.')
382
383        # Sort parameter keys for consistency.
384        parameters = {key: parameters[key] for key in sorted(parameters.keys())}
385
386        return url_path, url_anchor, parameters
387
388    def resolve_paths(self, base_dir: str) -> None:
389        """ Resolve any paths relative to the given base dir. """
390
391        for file_info in self.files:
392            file_info.resolve_path(base_dir)
393
394    def match(self, query: 'HTTPExchange',
395            match_headers: bool = True,
396            headers_to_skip: typing.Union[typing.List[str], None] = None,
397            params_to_skip: typing.Union[typing.List[str], None] = None,
398            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
399        """
400        Check if this exchange matches the query exchange.
401        If they match, `(True, None)` will be returned.
402        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
403
404        Note that this is not an equality check,
405        as a query exchange is often missing the response components.
406        This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
407        """
408
409        if (query.method != self.method):
410            return False, f"HTTP method does not match (query = {query.method}, target = {self.method})."
411
412        if (query.url_path != self.url_path):
413            return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})."
414
415        if (query.url_anchor != self.url_anchor):
416            return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})."
417
418        if (headers_to_skip is None):
419            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
420
421        if (params_to_skip is None):
422            params_to_skip = []
423
424        if (match_headers):
425            match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip)
426            if (not match):
427                return False, hint
428
429        match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip)
430        if (not match):
431            return False, hint
432
433        # Check file names and hash contents.
434        query_filenames = {(file.name, file.hash_content()) for file in query.files}
435        target_filenames = {(file.name, file.hash_content()) for file in self.files}
436        if (query_filenames != target_filenames):
437            return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})."
438
439        return True, None
440
441    def _match_dict(self, label: str,
442            query_dict: typing.Dict[str, typing.Any],
443            target_dict: typing.Dict[str, typing.Any],
444            keys_to_skip: typing.Union[typing.List[str], None] = None,
445            query_label: str = 'query',
446            target_label: str = 'target',
447            normalize_key_case: bool = True,
448            ) -> typing.Tuple[bool, typing.Union[str, None]]:
449        """ A subcheck in match(), specifically for a dictionary. """
450
451        if (keys_to_skip is None):
452            keys_to_skip = []
453
454        if (normalize_key_case):
455            keys_to_skip = [key.lower() for key in keys_to_skip]
456            query_dict = {key.lower(): value for (key, value) in query_dict.items()}
457            target_dict = {key.lower(): value for (key, value) in target_dict.items()}
458
459        query_keys = set(query_dict.keys()) - set(keys_to_skip)
460        target_keys = set(target_dict.keys()) - set(keys_to_skip)
461
462        if (query_keys != target_keys):
463            return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})."
464
465        for key in sorted(query_keys):
466            query_value = query_dict[key]
467            target_value = target_dict[key]
468
469            if (query_value != target_value):
470                comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'"
471                return False, f"{label.title()} '{key}' has a non-matching value ({comparison})."
472
473        return True, None
474
475    def get_url(self) -> str:
476        """ Get the URL path and anchor combined. """
477
478        url = self.url_path
479
480        if (self.url_anchor is not None):
481            url += ('#' + self.url_anchor)
482
483        return url
484
485    def match_response(self, response: requests.Response,
486            override_body: typing.Union[str, None] = None,
487            headers_to_skip: typing.Union[typing.List[str], None] = None,
488            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
489        """
490        Check if this exchange matches the given response.
491        If they match, `(True, None)` will be returned.
492        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
493        """
494
495        if (headers_to_skip is None):
496            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
497
498        response_body = override_body
499        if (response_body is None):
500            response_body = response.text
501
502        if (self.response_code != response.status_code):
503            return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})"
504
505        expected_body = self.response_body
506        actual_body = None
507
508        if (self.json_body):
509            actual_body = response.json()
510
511            # Normalize the actual and expected bodies.
512
513            actual_body = edq.util.json.dumps(actual_body)
514
515            if (isinstance(expected_body, str)):
516                expected_body = edq.util.json.loads(expected_body)
517
518            expected_body = edq.util.json.dumps(expected_body)
519        else:
520            actual_body = response_body
521
522        if (self.response_body != actual_body):
523            body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'"
524            return False, f"body does not match ({body_hint})"
525
526        match, hint = self._match_dict('header', response.headers, self.response_headers,
527                keys_to_skip = headers_to_skip,
528                query_label = 'response', target_label = 'exchange')
529
530        if (not match):
531            return False, hint
532
533        return True, None
534
535    def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str:
536        """ Create a consistent, semi-unique, and relative path for this exchange. """
537
538        url = self.get_url().strip()
539        parts = url.split('/')
540
541
542        if (url in ['', '/']):
543            filename = '_index_'
544            dirname = ''
545        else:
546            filename = parts[-1]
547
548            if (len(parts) > 1):
549                dirname = os.path.join(*parts[0:-1])
550            else:
551                dirname = ''
552
553        parameters = {}
554        for key in sorted(self.parameters.keys()):
555            parameters[key] = self.parameters[key]
556
557        # Treat files as params as well.
558        for file_info in self.files:
559            parameters[f"file-{file_info.name}"] = file_info.hash_content()
560
561        query = urllib.parse.urlencode(parameters)
562        if (query != ''):
563            # The query can get very long, so we may have to clip it.
564            query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH)
565
566            # Note that the '?' is URL encoded.
567            filename += f"%3F{query_text}"
568
569        filename += f"_{self.method}{http_exchange_extension}"
570
571        return os.path.join(dirname, filename)
572
573    def to_dict(self) -> typing.Dict[str, typing.Any]:
574        return vars(self)
575
576    @classmethod
577    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
578        return HTTPExchange(**data)
579
580    @classmethod
581    def from_path(cls, path: str,
582            set_source_path: bool = True,
583            ) -> 'HTTPExchange':
584        """
585        Load an exchange from a file.
586        This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
587        """
588
589        exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange))
590
591        if (set_source_path):
592            exchange.source_path = os.path.abspath(path)
593
594        exchange.resolve_paths(os.path.abspath(os.path.dirname(path)))
595
596        return exchange
597
598    @classmethod
599    def from_response(cls,
600            response: requests.Response,
601            headers_to_skip: typing.Union[typing.List[str], None] = None,
602            params_to_skip: typing.Union[typing.List[str], None] = None,
603            allow_redirects: typing.Union[bool, None] = None,
604            ) -> 'HTTPExchange':
605        """ Create a full excahnge from a response. """
606
607        if (headers_to_skip is None):
608            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
609
610        if (params_to_skip is None):
611            params_to_skip = []
612
613        body = response.text
614
615        # Use a clean function (if one exists).
616        if (_exchanges_clean_func is not None):
617            # Make a copy of the response to avoid cleaning functions modifying it.
618            # Note that this is not a very complete solution, since we can't rely on the deep copy getting everything right.
619            response = copy.deepcopy(response)
620
621            modify_func = edq.util.pyimport.fetch(_exchanges_clean_func)
622            body = modify_func(response, body)
623
624        request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()}
625        response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()}
626
627        # Clean headers.
628        for key in headers_to_skip:
629            key = key.lower()
630
631            request_headers.pop(key, None)
632            response_headers.pop(key, None)
633
634        request_data, request_files = edq.net.util.parse_request_data(response.request.url, response.request.headers, response.request.body)
635
636        # Clean parameters.
637        for key in params_to_skip:
638            request_data.pop(key, None)
639
640        files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()]
641
642        data = {
643            'method': response.request.method,
644            'url': response.request.url,
645            'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None),
646            'parameters': request_data,
647            'files': files,
648            'headers': request_headers,
649            'response_code': response.status_code,
650            'response_headers': response_headers,
651            'response_body': body,
652            'response_modifier': _exchanges_clean_func,
653            'allow_redirects': allow_redirects,
654        }
655
656        exchange = HTTPExchange(**data)
657
658        # Use a finalize function (if one exists).
659        if (_exchanges_finalize_func is not None):
660            finalize_func = edq.util.pyimport.fetch(_exchanges_finalize_func)
661
662            exchange = finalize_func(exchange)
663            exchange.finalize = _exchanges_finalize_func
664
665        return exchange
666
667@typing.runtime_checkable
668class HTTPExchangeComplete(typing.Protocol):
669    """
670    A function that can be called after a request has been made (and exchange constructed).
671    """
672
673    def __call__(self,
674            exchange: HTTPExchange
675            ) -> str:
676        """
677        Called after an HTTP exchange has been completed.
678        """
DEFAULT_HTTP_EXCHANGE_EXTENSION: str = '.httpex.json'
QUERY_CLIP_LENGTH: int = 100

If the filename of an HTTPExhange being saved is longer than this, then clip it.

ANCHOR_HEADER_KEY: str = 'edq-anchor'

By default, requests made via make_request() will send a header with this key that includes the anchor component of the URL. Anchors are not traditionally sent in requests, but this will allow exchanges to capture this extra piece of information.

ALLOWED_METHODS: List[str] = ['DELETE', 'GET', 'HEAD', 'OPTIONS', 'PATCH', 'POST', 'PUT']

Allowed HTTP methods for an HTTPExchange.

DEFAULT_EXCHANGE_IGNORE_HEADERS: List[str] = ['accept', 'accept-encoding', 'accept-language', 'cache-control', 'connection', 'content-length', 'content-security-policy', 'content-type', 'cookie', 'date', 'dnt', 'etag', 'host', 'link', 'location', 'priority', 'referrer-policy', 'sec-fetch-dest', 'sec-fetch-mode', 'sec-fetch-site', 'sec-fetch-user', 'sec-gpc', 'server', 'server-timing', 'set-cookie', 'upgrade-insecure-requests', 'user-agent', 'x-content-type-options', 'x-download-options', 'x-permitted-cross-domain-policies', 'x-rate-limit-remaining', 'x-request-context-id', 'x-request-cost', 'x-runtime', 'x-session-id', 'x-xss-protection', 'edq-anchor']

By default, ignore these headers during exchange matching. Some are sent automatically and we don't need to record (like content-length), and some are additional information we don't need.

class FileInfo(edq.util.json.DictConverter):
 98class FileInfo(edq.util.json.DictConverter):
 99    """ Store info about files used in HTTP exchanges. """
100
101    def __init__(self,
102            path: typing.Union[str, None] = None,
103            name: typing.Union[str, None] = None,
104            content: typing.Union[str, bytes, None] = None,
105            b64_encoded: bool = False,
106            **kwargs: typing.Any) -> None:
107        # Normalize the path from POSIX-style to the system's style.
108        if (path is not None):
109            path = str(pathlib.PurePath(pathlib.PurePosixPath(path)))
110
111        self.path: typing.Union[str, None] = path
112        """ The on-disk path to a file. """
113
114        if ((name is None) and (self.path is not None)):
115            name = os.path.basename(self.path)
116
117        if (name is None):
118            raise ValueError("No name was provided for file.")
119
120        self.name: str = name
121        """ The name for this file used in an HTTP request. """
122
123        self.content: typing.Union[str, bytes, None] = content
124        """ The contents of this file. """
125
126        self.b64_encoded: bool = b64_encoded
127        """ Whether the content is a string encoded in Base64. """
128
129        if ((self.path is None) and (self.content is None)):
130            raise ValueError("File must have either path or content specified.")
131
132    def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
133        """ Resolve this path relative to the given base dir. """
134
135        if ((self.path is not None) and (not os.path.isabs(self.path))):
136            self.path = os.path.abspath(os.path.join(base_dir, self.path))
137
138        if ((self.path is not None) and (self.content is None) and load_file):
139            self.content = edq.util.dirent.read_file_bytes(self.path)
140
141    def hash_content(self) -> str:
142        """
143        Compute a hash for the content present.
144        If no content is provided, use the path.
145        """
146
147        hash_content = self.content
148
149        if (self.b64_encoded and isinstance(hash_content, str)):
150            hash_content = edq.util.encoding.from_base64(hash_content)
151
152        if (hash_content is None):
153            hash_content = self.path
154
155        return edq.util.hash.sha256_hex(hash_content)
156
157    def to_dict(self) -> typing.Dict[str, typing.Any]:
158        data = vars(self).copy()
159
160        # JSON does not support raw bytes, so we will need to base64 encode any binary content.
161        if (isinstance(self.content, bytes)):
162            data['content'] = edq.util.encoding.to_base64(self.content)
163            data['b64_encoded'] = True
164
165        return data
166
167    @classmethod
168    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
169        return FileInfo(**data)

Store info about files used in HTTP exchanges.

FileInfo( path: Optional[str] = None, name: Optional[str] = None, content: Union[str, bytes, NoneType] = None, b64_encoded: bool = False, **kwargs: Any)
101    def __init__(self,
102            path: typing.Union[str, None] = None,
103            name: typing.Union[str, None] = None,
104            content: typing.Union[str, bytes, None] = None,
105            b64_encoded: bool = False,
106            **kwargs: typing.Any) -> None:
107        # Normalize the path from POSIX-style to the system's style.
108        if (path is not None):
109            path = str(pathlib.PurePath(pathlib.PurePosixPath(path)))
110
111        self.path: typing.Union[str, None] = path
112        """ The on-disk path to a file. """
113
114        if ((name is None) and (self.path is not None)):
115            name = os.path.basename(self.path)
116
117        if (name is None):
118            raise ValueError("No name was provided for file.")
119
120        self.name: str = name
121        """ The name for this file used in an HTTP request. """
122
123        self.content: typing.Union[str, bytes, None] = content
124        """ The contents of this file. """
125
126        self.b64_encoded: bool = b64_encoded
127        """ Whether the content is a string encoded in Base64. """
128
129        if ((self.path is None) and (self.content is None)):
130            raise ValueError("File must have either path or content specified.")
path: Optional[str]

The on-disk path to a file.

name: str

The name for this file used in an HTTP request.

content: Union[str, bytes, NoneType]

The contents of this file.

b64_encoded: bool

Whether the content is a string encoded in Base64.

def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
132    def resolve_path(self, base_dir: str, load_file: bool = True) -> None:
133        """ Resolve this path relative to the given base dir. """
134
135        if ((self.path is not None) and (not os.path.isabs(self.path))):
136            self.path = os.path.abspath(os.path.join(base_dir, self.path))
137
138        if ((self.path is not None) and (self.content is None) and load_file):
139            self.content = edq.util.dirent.read_file_bytes(self.path)

Resolve this path relative to the given base dir.

def hash_content(self) -> str:
141    def hash_content(self) -> str:
142        """
143        Compute a hash for the content present.
144        If no content is provided, use the path.
145        """
146
147        hash_content = self.content
148
149        if (self.b64_encoded and isinstance(hash_content, str)):
150            hash_content = edq.util.encoding.from_base64(hash_content)
151
152        if (hash_content is None):
153            hash_content = self.path
154
155        return edq.util.hash.sha256_hex(hash_content)

Compute a hash for the content present. If no content is provided, use the path.

def to_dict(self) -> Dict[str, Any]:
157    def to_dict(self) -> typing.Dict[str, typing.Any]:
158        data = vars(self).copy()
159
160        # JSON does not support raw bytes, so we will need to base64 encode any binary content.
161        if (isinstance(self.content, bytes)):
162            data['content'] = edq.util.encoding.to_base64(self.content)
163            data['b64_encoded'] = True
164
165        return data

Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.

A general (but inefficient) implementation is provided by default.

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Any:
167    @classmethod
168    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
169        return FileInfo(**data)

Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.

A general (but inefficient) implementation is provided by default.

class HTTPExchange(edq.util.json.DictConverter):
171class HTTPExchange(edq.util.json.DictConverter):
172    """
173    The request and response making up a full HTTP exchange.
174    """
175
176    def __init__(self,
177            method: str = 'GET',
178            url: typing.Union[str, None] = None,
179            url_path: typing.Union[str, None] = None,
180            url_anchor: typing.Union[str, None] = None,
181            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
182            files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None,
183            headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
184            allow_redirects: typing.Union[bool, None] = None,
185            response_code: int = http.HTTPStatus.OK,
186            response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
187            json_body: typing.Union[bool, None] = None,
188            response_body: typing.Union[str, dict, list, None] = None,
189            source_path: typing.Union[str, None] = None,
190            response_modifier: typing.Union[str, None] = None,
191            finalize: typing.Union[str, None] = None,
192            extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
193            **kwargs: typing.Any) -> None:
194        method = str(method).upper()
195        if (method not in ALLOWED_METHODS):
196            raise ValueError(f"Got unknown/disallowed method: '{method}'.")
197
198        self.method: str = method
199        """ The HTTP method for this exchange. """
200
201        url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters)
202
203        self.url_path: str = url_path
204        """
205        The path portion of the request URL.
206        Only the path (not domain, port, params, anchor, etc) should be included.
207        """
208
209        self.url_anchor: typing.Union[str, None] = url_anchor
210        """
211        The anchor portion of the request URL (if it exists).
212        """
213
214        self.parameters: typing.Dict[str, typing.Any] = parameters
215        """
216        The parameters/arguments for this request.
217        Parameters should be provided here and not encoded into URLs,
218        regardless of the request method.
219        With the exception of files, all parameters should be placed here.
220        """
221
222        if (files is None):
223            files = []
224
225        parsed_files = []
226        for file in files:
227            if (isinstance(file, FileInfo)):
228                parsed_files.append(file)
229            else:
230                parsed_files.append(FileInfo(**file))
231
232        self.files: typing.List[FileInfo] = parsed_files
233        """
234        A list of files to include in the request.
235        The files are represented as dicts with a
236        "path" (path to the file on disk) and "name" (the filename to send in the request) field.
237        These paths must be POSIX-style paths,
238        they will be converted to system-specific paths.
239        Once this exchange is ready for use, these paths should be resolved (and probably absolute).
240        However, when serialized these paths should probably be relative.
241        To reconcile this, resolve_paths() should be called before using this exchange.
242        """
243
244        if (headers is None):
245            headers = {}
246
247        self.headers: typing.Dict[str, typing.Any] = headers
248        """ Headers in the request. """
249
250        if (allow_redirects is None):
251            allow_redirects = True
252
253        self.allow_redirects: bool = allow_redirects
254        """ Follow redirects. """
255
256        self.response_code: int = response_code
257        """ The HTTP status code of the response. """
258
259        if (response_headers is None):
260            response_headers = {}
261
262        self.response_headers: typing.Dict[str, typing.Any] = response_headers
263        """ Headers in the response. """
264
265        if (json_body is None):
266            json_body = isinstance(response_body, (dict, list))
267
268        self.json_body: bool = json_body
269        """
270        Indicates that the response is JSON and should be converted to/from a string.
271        If the response body is passed in a dict/list and this is passed as None,
272        then this will be set as true.
273        """
274
275        if (self.json_body and isinstance(response_body, (dict, list))):
276            response_body = edq.util.json.dumps(response_body)
277
278        self.response_body: typing.Union[str, None] = response_body  # type: ignore[assignment]
279        """
280        The response that should be sent in this exchange.
281        """
282
283        self.response_modifier: typing.Union[str, None] = response_modifier
284        """
285        This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response())
286        before sent back to the caller.
287        This reference must be importable via edq.util.pyimport.fetch().
288        """
289
290        self.finalize: typing.Union[str, None] = finalize
291        """
292        This function reference will be used to finalize echanges before sent back to the caller.
293        This reference must be importable via edq.util.pyimport.fetch().
294        """
295
296        self.source_path: typing.Union[str, None] = source_path
297        """
298        The path that this exchange was loaded from (if it was loaded from a file).
299        This value should never be serialized, but can be useful for testing.
300        """
301
302        if (extra_options is None):
303            extra_options = {}
304
305        self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy()
306        """
307        Additional options for this exchange.
308        This library will not use these options, but other's may.
309        kwargs will also be added to this.
310        """
311
312        self.extra_options.update(kwargs)
313
314    def _parse_url_components(self,
315            url: typing.Union[str, None] = None,
316            url_path: typing.Union[str, None] = None,
317            url_anchor: typing.Union[str, None] = None,
318            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
319            ) -> typing.Tuple[str, typing.Union[str, None], typing.Dict[str, typing.Any]]:
320        """
321        Parse out all URL-based components from raw inputs.
322        The URL's path and anchor can either be supplied separately, or as part of the full given URL.
323        If content is present in both places, they much match (or an error will be raised).
324        Query parameters may be provided in the full URL,
325        but will be overwritten by any that are provided separately.
326        Any information from the URL aside from the path, anchor/fragment, and query will be ignored.
327        Note that path parameters (not query parameters) will be ignored.
328        The final url path, url anchor, and parameters will be returned.
329        """
330
331        # Do base initialization and cleanup.
332
333        if (url_path is not None):
334            url_path = url_path.strip()
335            if (url_path == ''):
336                url_path = ''
337            else:
338                url_path = url_path.lstrip('/')
339
340        if (url_anchor is not None):
341            url_anchor = url_anchor.strip()
342            if (url_anchor == ''):
343                url_anchor = None
344            else:
345                url_anchor = url_anchor.lstrip('#')
346
347        if (parameters is None):
348            parameters = {}
349
350        # Parse the URL (if present).
351
352        if ((url is not None) and (url.strip() != '')):
353            parts = urllib.parse.urlparse(url)
354
355            # Handle the path.
356
357            path = parts.path.lstrip('/')
358
359            if ((url_path is not None) and (url_path != path)):
360                raise ValueError(f"Mismatched URL paths where supplied implicitly ('{path}') and explicitly ('{url_path}').")
361
362            url_path = path
363
364            # Check the optional anchor/fragment.
365
366            if (parts.fragment != ''):
367                fragment = parts.fragment.lstrip('#')
368
369                if ((url_anchor is not None) and (url_anchor != fragment)):
370                    raise ValueError(f"Mismatched URL anchors where supplied implicitly ('{fragment}') and explicitly ('{url_anchor}').")
371
372                url_anchor = fragment
373
374            # Check for any parameters.
375
376            url_params = edq.net.util.parse_query_string(parts.query)
377            for (key, value) in url_params.items():
378                if (key not in parameters):
379                    parameters[key] = value
380
381        if (url_path is None):
382            raise ValueError('URL path cannot be empty, it must be explicitly set via `url_path`, or indirectly via `url`.')
383
384        # Sort parameter keys for consistency.
385        parameters = {key: parameters[key] for key in sorted(parameters.keys())}
386
387        return url_path, url_anchor, parameters
388
389    def resolve_paths(self, base_dir: str) -> None:
390        """ Resolve any paths relative to the given base dir. """
391
392        for file_info in self.files:
393            file_info.resolve_path(base_dir)
394
395    def match(self, query: 'HTTPExchange',
396            match_headers: bool = True,
397            headers_to_skip: typing.Union[typing.List[str], None] = None,
398            params_to_skip: typing.Union[typing.List[str], None] = None,
399            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
400        """
401        Check if this exchange matches the query exchange.
402        If they match, `(True, None)` will be returned.
403        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
404
405        Note that this is not an equality check,
406        as a query exchange is often missing the response components.
407        This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
408        """
409
410        if (query.method != self.method):
411            return False, f"HTTP method does not match (query = {query.method}, target = {self.method})."
412
413        if (query.url_path != self.url_path):
414            return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})."
415
416        if (query.url_anchor != self.url_anchor):
417            return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})."
418
419        if (headers_to_skip is None):
420            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
421
422        if (params_to_skip is None):
423            params_to_skip = []
424
425        if (match_headers):
426            match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip)
427            if (not match):
428                return False, hint
429
430        match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip)
431        if (not match):
432            return False, hint
433
434        # Check file names and hash contents.
435        query_filenames = {(file.name, file.hash_content()) for file in query.files}
436        target_filenames = {(file.name, file.hash_content()) for file in self.files}
437        if (query_filenames != target_filenames):
438            return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})."
439
440        return True, None
441
442    def _match_dict(self, label: str,
443            query_dict: typing.Dict[str, typing.Any],
444            target_dict: typing.Dict[str, typing.Any],
445            keys_to_skip: typing.Union[typing.List[str], None] = None,
446            query_label: str = 'query',
447            target_label: str = 'target',
448            normalize_key_case: bool = True,
449            ) -> typing.Tuple[bool, typing.Union[str, None]]:
450        """ A subcheck in match(), specifically for a dictionary. """
451
452        if (keys_to_skip is None):
453            keys_to_skip = []
454
455        if (normalize_key_case):
456            keys_to_skip = [key.lower() for key in keys_to_skip]
457            query_dict = {key.lower(): value for (key, value) in query_dict.items()}
458            target_dict = {key.lower(): value for (key, value) in target_dict.items()}
459
460        query_keys = set(query_dict.keys()) - set(keys_to_skip)
461        target_keys = set(target_dict.keys()) - set(keys_to_skip)
462
463        if (query_keys != target_keys):
464            return False, f"{label.title()} keys do not match ({query_label} = {query_keys}, {target_label} = {target_keys})."
465
466        for key in sorted(query_keys):
467            query_value = query_dict[key]
468            target_value = target_dict[key]
469
470            if (query_value != target_value):
471                comparison = f"{query_label} = '{query_value}', {target_label} = '{target_value}'"
472                return False, f"{label.title()} '{key}' has a non-matching value ({comparison})."
473
474        return True, None
475
476    def get_url(self) -> str:
477        """ Get the URL path and anchor combined. """
478
479        url = self.url_path
480
481        if (self.url_anchor is not None):
482            url += ('#' + self.url_anchor)
483
484        return url
485
486    def match_response(self, response: requests.Response,
487            override_body: typing.Union[str, None] = None,
488            headers_to_skip: typing.Union[typing.List[str], None] = None,
489            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
490        """
491        Check if this exchange matches the given response.
492        If they match, `(True, None)` will be returned.
493        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
494        """
495
496        if (headers_to_skip is None):
497            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
498
499        response_body = override_body
500        if (response_body is None):
501            response_body = response.text
502
503        if (self.response_code != response.status_code):
504            return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})"
505
506        expected_body = self.response_body
507        actual_body = None
508
509        if (self.json_body):
510            actual_body = response.json()
511
512            # Normalize the actual and expected bodies.
513
514            actual_body = edq.util.json.dumps(actual_body)
515
516            if (isinstance(expected_body, str)):
517                expected_body = edq.util.json.loads(expected_body)
518
519            expected_body = edq.util.json.dumps(expected_body)
520        else:
521            actual_body = response_body
522
523        if (self.response_body != actual_body):
524            body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'"
525            return False, f"body does not match ({body_hint})"
526
527        match, hint = self._match_dict('header', response.headers, self.response_headers,
528                keys_to_skip = headers_to_skip,
529                query_label = 'response', target_label = 'exchange')
530
531        if (not match):
532            return False, hint
533
534        return True, None
535
536    def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str:
537        """ Create a consistent, semi-unique, and relative path for this exchange. """
538
539        url = self.get_url().strip()
540        parts = url.split('/')
541
542
543        if (url in ['', '/']):
544            filename = '_index_'
545            dirname = ''
546        else:
547            filename = parts[-1]
548
549            if (len(parts) > 1):
550                dirname = os.path.join(*parts[0:-1])
551            else:
552                dirname = ''
553
554        parameters = {}
555        for key in sorted(self.parameters.keys()):
556            parameters[key] = self.parameters[key]
557
558        # Treat files as params as well.
559        for file_info in self.files:
560            parameters[f"file-{file_info.name}"] = file_info.hash_content()
561
562        query = urllib.parse.urlencode(parameters)
563        if (query != ''):
564            # The query can get very long, so we may have to clip it.
565            query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH)
566
567            # Note that the '?' is URL encoded.
568            filename += f"%3F{query_text}"
569
570        filename += f"_{self.method}{http_exchange_extension}"
571
572        return os.path.join(dirname, filename)
573
574    def to_dict(self) -> typing.Dict[str, typing.Any]:
575        return vars(self)
576
577    @classmethod
578    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
579        return HTTPExchange(**data)
580
581    @classmethod
582    def from_path(cls, path: str,
583            set_source_path: bool = True,
584            ) -> 'HTTPExchange':
585        """
586        Load an exchange from a file.
587        This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
588        """
589
590        exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange))
591
592        if (set_source_path):
593            exchange.source_path = os.path.abspath(path)
594
595        exchange.resolve_paths(os.path.abspath(os.path.dirname(path)))
596
597        return exchange
598
599    @classmethod
600    def from_response(cls,
601            response: requests.Response,
602            headers_to_skip: typing.Union[typing.List[str], None] = None,
603            params_to_skip: typing.Union[typing.List[str], None] = None,
604            allow_redirects: typing.Union[bool, None] = None,
605            ) -> 'HTTPExchange':
606        """ Create a full excahnge from a response. """
607
608        if (headers_to_skip is None):
609            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
610
611        if (params_to_skip is None):
612            params_to_skip = []
613
614        body = response.text
615
616        # Use a clean function (if one exists).
617        if (_exchanges_clean_func is not None):
618            # Make a copy of the response to avoid cleaning functions modifying it.
619            # Note that this is not a very complete solution, since we can't rely on the deep copy getting everything right.
620            response = copy.deepcopy(response)
621
622            modify_func = edq.util.pyimport.fetch(_exchanges_clean_func)
623            body = modify_func(response, body)
624
625        request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()}
626        response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()}
627
628        # Clean headers.
629        for key in headers_to_skip:
630            key = key.lower()
631
632            request_headers.pop(key, None)
633            response_headers.pop(key, None)
634
635        request_data, request_files = edq.net.util.parse_request_data(response.request.url, response.request.headers, response.request.body)
636
637        # Clean parameters.
638        for key in params_to_skip:
639            request_data.pop(key, None)
640
641        files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()]
642
643        data = {
644            'method': response.request.method,
645            'url': response.request.url,
646            'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None),
647            'parameters': request_data,
648            'files': files,
649            'headers': request_headers,
650            'response_code': response.status_code,
651            'response_headers': response_headers,
652            'response_body': body,
653            'response_modifier': _exchanges_clean_func,
654            'allow_redirects': allow_redirects,
655        }
656
657        exchange = HTTPExchange(**data)
658
659        # Use a finalize function (if one exists).
660        if (_exchanges_finalize_func is not None):
661            finalize_func = edq.util.pyimport.fetch(_exchanges_finalize_func)
662
663            exchange = finalize_func(exchange)
664            exchange.finalize = _exchanges_finalize_func
665
666        return exchange

The request and response making up a full HTTP exchange.

HTTPExchange( method: str = 'GET', url: Optional[str] = None, url_path: Optional[str] = None, url_anchor: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, files: Optional[List[Union[FileInfo, Dict[str, Any]]]] = None, headers: Optional[Dict[str, Any]] = None, allow_redirects: Optional[bool] = None, response_code: int = <HTTPStatus.OK: 200>, response_headers: Optional[Dict[str, Any]] = None, json_body: Optional[bool] = None, response_body: Union[str, dict, list, NoneType] = None, source_path: Optional[str] = None, response_modifier: Optional[str] = None, finalize: Optional[str] = None, extra_options: Optional[Dict[str, Any]] = None, **kwargs: Any)
176    def __init__(self,
177            method: str = 'GET',
178            url: typing.Union[str, None] = None,
179            url_path: typing.Union[str, None] = None,
180            url_anchor: typing.Union[str, None] = None,
181            parameters: typing.Union[typing.Dict[str, typing.Any], None] = None,
182            files: typing.Union[typing.List[typing.Union[FileInfo, typing.Dict[str, typing.Any]]], None] = None,
183            headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
184            allow_redirects: typing.Union[bool, None] = None,
185            response_code: int = http.HTTPStatus.OK,
186            response_headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
187            json_body: typing.Union[bool, None] = None,
188            response_body: typing.Union[str, dict, list, None] = None,
189            source_path: typing.Union[str, None] = None,
190            response_modifier: typing.Union[str, None] = None,
191            finalize: typing.Union[str, None] = None,
192            extra_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
193            **kwargs: typing.Any) -> None:
194        method = str(method).upper()
195        if (method not in ALLOWED_METHODS):
196            raise ValueError(f"Got unknown/disallowed method: '{method}'.")
197
198        self.method: str = method
199        """ The HTTP method for this exchange. """
200
201        url_path, url_anchor, parameters = self._parse_url_components(url, url_path, url_anchor, parameters)
202
203        self.url_path: str = url_path
204        """
205        The path portion of the request URL.
206        Only the path (not domain, port, params, anchor, etc) should be included.
207        """
208
209        self.url_anchor: typing.Union[str, None] = url_anchor
210        """
211        The anchor portion of the request URL (if it exists).
212        """
213
214        self.parameters: typing.Dict[str, typing.Any] = parameters
215        """
216        The parameters/arguments for this request.
217        Parameters should be provided here and not encoded into URLs,
218        regardless of the request method.
219        With the exception of files, all parameters should be placed here.
220        """
221
222        if (files is None):
223            files = []
224
225        parsed_files = []
226        for file in files:
227            if (isinstance(file, FileInfo)):
228                parsed_files.append(file)
229            else:
230                parsed_files.append(FileInfo(**file))
231
232        self.files: typing.List[FileInfo] = parsed_files
233        """
234        A list of files to include in the request.
235        The files are represented as dicts with a
236        "path" (path to the file on disk) and "name" (the filename to send in the request) field.
237        These paths must be POSIX-style paths,
238        they will be converted to system-specific paths.
239        Once this exchange is ready for use, these paths should be resolved (and probably absolute).
240        However, when serialized these paths should probably be relative.
241        To reconcile this, resolve_paths() should be called before using this exchange.
242        """
243
244        if (headers is None):
245            headers = {}
246
247        self.headers: typing.Dict[str, typing.Any] = headers
248        """ Headers in the request. """
249
250        if (allow_redirects is None):
251            allow_redirects = True
252
253        self.allow_redirects: bool = allow_redirects
254        """ Follow redirects. """
255
256        self.response_code: int = response_code
257        """ The HTTP status code of the response. """
258
259        if (response_headers is None):
260            response_headers = {}
261
262        self.response_headers: typing.Dict[str, typing.Any] = response_headers
263        """ Headers in the response. """
264
265        if (json_body is None):
266            json_body = isinstance(response_body, (dict, list))
267
268        self.json_body: bool = json_body
269        """
270        Indicates that the response is JSON and should be converted to/from a string.
271        If the response body is passed in a dict/list and this is passed as None,
272        then this will be set as true.
273        """
274
275        if (self.json_body and isinstance(response_body, (dict, list))):
276            response_body = edq.util.json.dumps(response_body)
277
278        self.response_body: typing.Union[str, None] = response_body  # type: ignore[assignment]
279        """
280        The response that should be sent in this exchange.
281        """
282
283        self.response_modifier: typing.Union[str, None] = response_modifier
284        """
285        This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response())
286        before sent back to the caller.
287        This reference must be importable via edq.util.pyimport.fetch().
288        """
289
290        self.finalize: typing.Union[str, None] = finalize
291        """
292        This function reference will be used to finalize echanges before sent back to the caller.
293        This reference must be importable via edq.util.pyimport.fetch().
294        """
295
296        self.source_path: typing.Union[str, None] = source_path
297        """
298        The path that this exchange was loaded from (if it was loaded from a file).
299        This value should never be serialized, but can be useful for testing.
300        """
301
302        if (extra_options is None):
303            extra_options = {}
304
305        self.extra_options: typing.Dict[str, typing.Any] = extra_options.copy()
306        """
307        Additional options for this exchange.
308        This library will not use these options, but other's may.
309        kwargs will also be added to this.
310        """
311
312        self.extra_options.update(kwargs)
method: str

The HTTP method for this exchange.

url_path: str

The path portion of the request URL. Only the path (not domain, port, params, anchor, etc) should be included.

url_anchor: Optional[str]

The anchor portion of the request URL (if it exists).

parameters: Dict[str, Any]

The parameters/arguments for this request. Parameters should be provided here and not encoded into URLs, regardless of the request method. With the exception of files, all parameters should be placed here.

files: List[FileInfo]

A list of files to include in the request. The files are represented as dicts with a "path" (path to the file on disk) and "name" (the filename to send in the request) field. These paths must be POSIX-style paths, they will be converted to system-specific paths. Once this exchange is ready for use, these paths should be resolved (and probably absolute). However, when serialized these paths should probably be relative. To reconcile this, resolve_paths() should be called before using this exchange.

headers: Dict[str, Any]

Headers in the request.

allow_redirects: bool

Follow redirects.

response_code: int

The HTTP status code of the response.

response_headers: Dict[str, Any]

Headers in the response.

json_body: bool

Indicates that the response is JSON and should be converted to/from a string. If the response body is passed in a dict/list and this is passed as None, then this will be set as true.

response_body: Optional[str]

The response that should be sent in this exchange.

response_modifier: Optional[str]

This function reference will be used to modify responses (in HTTPExchange.make_request() and HTTPExchange.from_response()) before sent back to the caller. This reference must be importable via edq.util.pyimport.fetch().

finalize: Optional[str]

This function reference will be used to finalize echanges before sent back to the caller. This reference must be importable via edq.util.pyimport.fetch().

source_path: Optional[str]

The path that this exchange was loaded from (if it was loaded from a file). This value should never be serialized, but can be useful for testing.

extra_options: Dict[str, Any]

Additional options for this exchange. This library will not use these options, but other's may. kwargs will also be added to this.

def resolve_paths(self, base_dir: str) -> None:
389    def resolve_paths(self, base_dir: str) -> None:
390        """ Resolve any paths relative to the given base dir. """
391
392        for file_info in self.files:
393            file_info.resolve_path(base_dir)

Resolve any paths relative to the given base dir.

def match( self, query: HTTPExchange, match_headers: bool = True, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, **kwargs: Any) -> Tuple[bool, Optional[str]]:
395    def match(self, query: 'HTTPExchange',
396            match_headers: bool = True,
397            headers_to_skip: typing.Union[typing.List[str], None] = None,
398            params_to_skip: typing.Union[typing.List[str], None] = None,
399            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
400        """
401        Check if this exchange matches the query exchange.
402        If they match, `(True, None)` will be returned.
403        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
404
405        Note that this is not an equality check,
406        as a query exchange is often missing the response components.
407        This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.
408        """
409
410        if (query.method != self.method):
411            return False, f"HTTP method does not match (query = {query.method}, target = {self.method})."
412
413        if (query.url_path != self.url_path):
414            return False, f"URL path does not match (query = {query.url_path}, target = {self.url_path})."
415
416        if (query.url_anchor != self.url_anchor):
417            return False, f"URL anchor does not match (query = {query.url_anchor}, target = {self.url_anchor})."
418
419        if (headers_to_skip is None):
420            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
421
422        if (params_to_skip is None):
423            params_to_skip = []
424
425        if (match_headers):
426            match, hint = self._match_dict('header', query.headers, self.headers, headers_to_skip)
427            if (not match):
428                return False, hint
429
430        match, hint = self._match_dict('parameter', query.parameters, self.parameters, params_to_skip)
431        if (not match):
432            return False, hint
433
434        # Check file names and hash contents.
435        query_filenames = {(file.name, file.hash_content()) for file in query.files}
436        target_filenames = {(file.name, file.hash_content()) for file in self.files}
437        if (query_filenames != target_filenames):
438            return False, f"File names do not match (query = {query_filenames}, target = {target_filenames})."
439
440        return True, None

Check if this exchange matches the query exchange. If they match, (True, None) will be returned. If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.

Note that this is not an equality check, as a query exchange is often missing the response components. This method is often invoked the see if an incoming HTTP request (the query) matches an existing exchange.

def get_url(self) -> str:
476    def get_url(self) -> str:
477        """ Get the URL path and anchor combined. """
478
479        url = self.url_path
480
481        if (self.url_anchor is not None):
482            url += ('#' + self.url_anchor)
483
484        return url

Get the URL path and anchor combined.

def match_response( self, response: requests.models.Response, override_body: Optional[str] = None, headers_to_skip: Optional[List[str]] = None, **kwargs: Any) -> Tuple[bool, Optional[str]]:
486    def match_response(self, response: requests.Response,
487            override_body: typing.Union[str, None] = None,
488            headers_to_skip: typing.Union[typing.List[str], None] = None,
489            **kwargs: typing.Any) -> typing.Tuple[bool, typing.Union[str, None]]:
490        """
491        Check if this exchange matches the given response.
492        If they match, `(True, None)` will be returned.
493        If they do not match, `(False, <hint>)` will be returned, where `<hint>` points to where the mismatch is.
494        """
495
496        if (headers_to_skip is None):
497            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
498
499        response_body = override_body
500        if (response_body is None):
501            response_body = response.text
502
503        if (self.response_code != response.status_code):
504            return False, f"http status code does match (expected: {self.response_code}, actual: {response.status_code})"
505
506        expected_body = self.response_body
507        actual_body = None
508
509        if (self.json_body):
510            actual_body = response.json()
511
512            # Normalize the actual and expected bodies.
513
514            actual_body = edq.util.json.dumps(actual_body)
515
516            if (isinstance(expected_body, str)):
517                expected_body = edq.util.json.loads(expected_body)
518
519            expected_body = edq.util.json.dumps(expected_body)
520        else:
521            actual_body = response_body
522
523        if (self.response_body != actual_body):
524            body_hint = f"expected: '{self.response_body}', actual: '{actual_body}'"
525            return False, f"body does not match ({body_hint})"
526
527        match, hint = self._match_dict('header', response.headers, self.response_headers,
528                keys_to_skip = headers_to_skip,
529                query_label = 'response', target_label = 'exchange')
530
531        if (not match):
532            return False, hint
533
534        return True, None

Check if this exchange matches the given response. If they match, (True, None) will be returned. If they do not match, (False, <hint>) will be returned, where <hint> points to where the mismatch is.

def compute_relpath(self, http_exchange_extension: str = '.httpex.json') -> str:
536    def compute_relpath(self, http_exchange_extension: str = DEFAULT_HTTP_EXCHANGE_EXTENSION) -> str:
537        """ Create a consistent, semi-unique, and relative path for this exchange. """
538
539        url = self.get_url().strip()
540        parts = url.split('/')
541
542
543        if (url in ['', '/']):
544            filename = '_index_'
545            dirname = ''
546        else:
547            filename = parts[-1]
548
549            if (len(parts) > 1):
550                dirname = os.path.join(*parts[0:-1])
551            else:
552                dirname = ''
553
554        parameters = {}
555        for key in sorted(self.parameters.keys()):
556            parameters[key] = self.parameters[key]
557
558        # Treat files as params as well.
559        for file_info in self.files:
560            parameters[f"file-{file_info.name}"] = file_info.hash_content()
561
562        query = urllib.parse.urlencode(parameters)
563        if (query != ''):
564            # The query can get very long, so we may have to clip it.
565            query_text = edq.util.hash.clip_text(query, QUERY_CLIP_LENGTH)
566
567            # Note that the '?' is URL encoded.
568            filename += f"%3F{query_text}"
569
570        filename += f"_{self.method}{http_exchange_extension}"
571
572        return os.path.join(dirname, filename)

Create a consistent, semi-unique, and relative path for this exchange.

def to_dict(self) -> Dict[str, Any]:
574    def to_dict(self) -> typing.Dict[str, typing.Any]:
575        return vars(self)

Return a dict that can be used to represent this object. If the dict is passed to from_dict(), an identical object should be reconstructed.

A general (but inefficient) implementation is provided by default.

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Any:
577    @classmethod
578    def from_dict(cls, data: typing.Dict[str, typing.Any]) -> typing.Any:
579        return HTTPExchange(**data)

Return an instance of this subclass created using the given dict. If the dict came from to_dict(), the returned object should be identical to the original.

A general (but inefficient) implementation is provided by default.

@classmethod
def from_path( cls, path: str, set_source_path: bool = True) -> HTTPExchange:
581    @classmethod
582    def from_path(cls, path: str,
583            set_source_path: bool = True,
584            ) -> 'HTTPExchange':
585        """
586        Load an exchange from a file.
587        This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.
588        """
589
590        exchange = typing.cast(HTTPExchange, edq.util.json.load_object_path(path, HTTPExchange))
591
592        if (set_source_path):
593            exchange.source_path = os.path.abspath(path)
594
595        exchange.resolve_paths(os.path.abspath(os.path.dirname(path)))
596
597        return exchange

Load an exchange from a file. This will also handle setting the exchanges source path (if specified) and resolving the exchange's paths.

@classmethod
def from_response( cls, response: requests.models.Response, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, allow_redirects: Optional[bool] = None) -> HTTPExchange:
599    @classmethod
600    def from_response(cls,
601            response: requests.Response,
602            headers_to_skip: typing.Union[typing.List[str], None] = None,
603            params_to_skip: typing.Union[typing.List[str], None] = None,
604            allow_redirects: typing.Union[bool, None] = None,
605            ) -> 'HTTPExchange':
606        """ Create a full excahnge from a response. """
607
608        if (headers_to_skip is None):
609            headers_to_skip = DEFAULT_EXCHANGE_IGNORE_HEADERS
610
611        if (params_to_skip is None):
612            params_to_skip = []
613
614        body = response.text
615
616        # Use a clean function (if one exists).
617        if (_exchanges_clean_func is not None):
618            # Make a copy of the response to avoid cleaning functions modifying it.
619            # Note that this is not a very complete solution, since we can't rely on the deep copy getting everything right.
620            response = copy.deepcopy(response)
621
622            modify_func = edq.util.pyimport.fetch(_exchanges_clean_func)
623            body = modify_func(response, body)
624
625        request_headers = {key.lower().strip(): value for (key, value) in response.request.headers.items()}
626        response_headers = {key.lower().strip(): value for (key, value) in response.headers.items()}
627
628        # Clean headers.
629        for key in headers_to_skip:
630            key = key.lower()
631
632            request_headers.pop(key, None)
633            response_headers.pop(key, None)
634
635        request_data, request_files = edq.net.util.parse_request_data(response.request.url, response.request.headers, response.request.body)
636
637        # Clean parameters.
638        for key in params_to_skip:
639            request_data.pop(key, None)
640
641        files = [FileInfo(name = name, content = content) for (name, content) in request_files.items()]
642
643        data = {
644            'method': response.request.method,
645            'url': response.request.url,
646            'url_anchor': response.request.headers.get(ANCHOR_HEADER_KEY, None),
647            'parameters': request_data,
648            'files': files,
649            'headers': request_headers,
650            'response_code': response.status_code,
651            'response_headers': response_headers,
652            'response_body': body,
653            'response_modifier': _exchanges_clean_func,
654            'allow_redirects': allow_redirects,
655        }
656
657        exchange = HTTPExchange(**data)
658
659        # Use a finalize function (if one exists).
660        if (_exchanges_finalize_func is not None):
661            finalize_func = edq.util.pyimport.fetch(_exchanges_finalize_func)
662
663            exchange = finalize_func(exchange)
664            exchange.finalize = _exchanges_finalize_func
665
666        return exchange

Create a full excahnge from a response.

@typing.runtime_checkable
class HTTPExchangeComplete(typing.Protocol):
668@typing.runtime_checkable
669class HTTPExchangeComplete(typing.Protocol):
670    """
671    A function that can be called after a request has been made (and exchange constructed).
672    """
673
674    def __call__(self,
675            exchange: HTTPExchange
676            ) -> str:
677        """
678        Called after an HTTP exchange has been completed.
679        """

A function that can be called after a request has been made (and exchange constructed).

HTTPExchangeComplete(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)