edq.net.request

  1import atexit
  2import http
  3import logging
  4import os
  5import time
  6import typing
  7import urllib.parse
  8import urllib3
  9
 10import requests
 11
 12import edq.core.errors
 13import edq.net.exchange
 14import edq.net.exchangeserver
 15import edq.util.dirent
 16import edq.util.encoding
 17import edq.util.json
 18import edq.util.pyimport
 19
 20_logger = logging.getLogger(__name__)
 21
 22DEFAULT_REQUEST_TIMEOUT_SECS: float = 10.0
 23""" Default timeout for an HTTP request. """
 24
 25RETRY_BACKOFF_SECS: float = 0.5
 26
 27_exchanges_cache_dir: typing.Union[str, None] = None  # pylint: disable=invalid-name
 28""" If not None, all requests made via make_request() will attempt to look in this directory for a matching exchange first. """
 29
 30_exchanges_out_dir: typing.Union[str, None] = None  # pylint: disable=invalid-name
 31""" If not None, all requests made via make_request() will be saved as an HTTPExchange in this directory. """
 32
 33_module_makerequest_options: typing.Union[typing.Dict[str, typing.Any], None] = None  # pylint: disable=invalid-name
 34"""
 35Module-wide options for requests.request().
 36These should generally only be used in testing.
 37"""
 38
 39_cache_servers: typing.Dict[str, edq.net.exchangeserver.HTTPExchangeServer] = {}
 40""" A mapping of cache dirs to their active cache server. """
 41
 42_make_request_exchange_complete_func: typing.Union[edq.net.exchange.HTTPExchangeComplete, None] = None  # pylint: disable=invalid-name
 43""" If not None, call this func after make_request() has created its HTTPExchange. """
 44
 45@typing.runtime_checkable
 46class ResponseModifierFunction(typing.Protocol):
 47    """
 48    A function that can be used to modify an exchange's response.
 49    Exchanges can use these functions to normalize their responses before saving.
 50    """
 51
 52    def __call__(self,
 53            response: requests.Response,
 54            body: str,
 55            ) -> str:
 56        """
 57        Modify the http response.
 58        Headers may be modified in the response directly,
 59        while the modified (or same) body must be returned.
 60        """
 61
 62def make_request(method: str, url: str,
 63        headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 64        data: typing.Union[typing.Dict[str, typing.Any], None] = None,
 65        files: typing.Union[typing.List[typing.Any], None] = None,
 66        raise_for_status: bool = True,
 67        timeout_secs: float = DEFAULT_REQUEST_TIMEOUT_SECS,
 68        cache_dir: typing.Union[str, None] = None,
 69        ignore_cache: bool = False,
 70        output_dir: typing.Union[str, None] = None,
 71        send_anchor_header: bool = True,
 72        headers_to_skip: typing.Union[typing.List[str], None] = None,
 73        params_to_skip: typing.Union[typing.List[str], None] = None,
 74        http_exchange_extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION,
 75        add_http_prefix: bool = True,
 76        additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 77        exchange_complete_func: typing.Union[edq.net.exchange.HTTPExchangeComplete, None] = None,
 78        allow_redirects: typing.Union[bool, None] = None,
 79        use_module_options: bool = True,
 80        retries: int = 0,
 81        **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 82    """
 83    Make an HTTP request and return the response object and text body.
 84    """
 85
 86    if (add_http_prefix and (not url.lower().startswith('http'))):
 87        url = 'http://' + url
 88
 89    retries = max(0, retries)
 90
 91    if (cache_dir is None):
 92        cache_dir = _exchanges_cache_dir
 93
 94    if (ignore_cache):
 95        cache_dir = None
 96
 97    if (output_dir is None):
 98        output_dir = _exchanges_out_dir
 99
100    if (headers is None):
101        headers = {}
102
103    if (data is None):
104        data = {}
105
106    if (files is None):
107        files = []
108
109    if (additional_requests_options is None):
110        additional_requests_options = {}
111
112    # Add in the anchor as a header (since it is not traditionally sent in an HTTP request).
113    if (send_anchor_header):
114        headers = headers.copy()
115
116        parts = urllib.parse.urlparse(url)
117        headers[edq.net.exchange.ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#')
118
119    options: typing.Dict[str, typing.Any] = {
120        'timeout': timeout_secs,
121    }
122
123    if (use_module_options and (_module_makerequest_options is not None)):
124        options.update(_module_makerequest_options)
125
126    options.update(additional_requests_options)
127
128    options.update({
129        'headers': headers,
130        'files': files,
131    })
132
133    if (allow_redirects is not None):
134        options['allow_redirects'] = allow_redirects
135
136    if (method == 'GET'):
137        options['params'] = data
138    else:
139        options['data'] = data
140
141    _logger.debug("Making %s request: '%s' (options = %s).", method, url, options)
142    response = _make_request_with_cache(method, url, options, cache_dir, retries)
143
144    body = response.text
145    if (_logger.level <= logging.DEBUG):
146        log_body = body
147        if (response.encoding is None):
148            log_body = f"<hash> {edq.util.hash.sha256_hex(response.content)}"
149
150        _logger.debug("Response:\n%s", log_body)
151
152    if (raise_for_status):
153        # Handle 404s a little special, as their body may contain useful information.
154        if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')):
155            response.reason += f" (Body: '{body.strip()}')"
156
157        response.raise_for_status()
158
159    exchange = None
160    if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)):
161        exchange = edq.net.exchange.HTTPExchange.from_response(response,
162                headers_to_skip = headers_to_skip, params_to_skip = params_to_skip,
163                allow_redirects = options.get('allow_redirects', None))
164
165    if ((output_dir is not None) and (exchange is not None)):
166        relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension)
167        path = os.path.abspath(os.path.join(output_dir, relpath))
168
169        edq.util.dirent.mkdir(os.path.dirname(path))
170        edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False)
171
172    if ((exchange_complete_func is not None) and (exchange is not None)):
173        exchange_complete_func(exchange)
174
175    if ((_make_request_exchange_complete_func is not None) and (exchange is not None)):
176        _make_request_exchange_complete_func(exchange)  # pylint: disable=not-callable
177
178    return response, body
179
180def make_with_exchange(
181        exchange: edq.net.exchange.HTTPExchange,
182        base_url: str,
183        raise_for_status: bool = True,
184        **kwargs: typing.Any,
185        ) -> typing.Tuple[requests.Response, str]:
186    """ Perform the HTTP request described by the given exchange. """
187
188    files = []
189    for file_info in exchange.files:
190        content = file_info.content
191
192        # Content is base64 encoded.
193        if (file_info.b64_encoded and isinstance(content, str)):
194            content = edq.util.encoding.from_base64(content)
195
196        # Content is missing and must be in a file.
197        if (content is None):
198            content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
199
200        files.append((file_info.name, content))
201
202    url = f"{base_url}/{exchange.get_url()}"
203
204    response, body = make_request(exchange.method, url,
205            headers = exchange.headers,
206            data = exchange.parameters,
207            files = files,
208            raise_for_status = raise_for_status,
209            allow_redirects = exchange.allow_redirects,
210            **kwargs,
211    )
212
213    if (exchange.response_modifier is not None):
214        modify_func = edq.util.pyimport.fetch(exchange.response_modifier)
215        body = modify_func(response, body)
216
217    return response, body
218
219
220def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
221    """
222    Make a GET request and return the response object and text body.
223    """
224
225    return make_request('GET', url, **kwargs)
226
227def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
228    """
229    Make a POST request and return the response object and text body.
230    """
231
232    return make_request('POST', url, **kwargs)
233
234def _make_request_with_cache(
235        method: str,
236        url: str,
237        options: typing.Dict[str, typing.Any],
238        cache_dir: typing.Union[str, None],
239        retries: int,
240        ) -> requests.Response:
241    """ Make a request, potentially using a cache. """
242
243    response: typing.Union[requests.Response, None] = None
244    if (cache_dir is not None):
245        response = _cache_lookup(method, url, options, cache_dir)
246
247    if (response is not None):
248        return response
249
250    # Try once and then the number of allowed retries.
251    attempt_count = 1 + retries
252
253    errors = []
254    for attempt_index in range(attempt_count):
255        if (attempt_index > 0):
256            # Wait before the next retry.
257            time.sleep(attempt_index * RETRY_BACKOFF_SECS)
258
259        try:
260            response = requests.request(method, url, **options)  # pylint: disable=missing-timeout
261            break
262        except Exception as ex:
263            errors.append(ex)
264
265    if (len(errors) == attempt_count):
266        raise edq.core.errors.RetryError(f"HTTP {method} for '{url}'", attempt_count, retry_errors = errors)
267
268    return response
269
270def _cache_lookup(
271        method: str,
272        url: str,
273        options: typing.Dict[str, typing.Any],
274        cache_dir: str,
275        ) -> typing.Union[requests.Response, None]:
276    """ Attempt to lookup an exchange from the cache. """
277
278    if (not os.path.isdir(cache_dir)):
279        _logger.warning("Cache dir does not exist or is not a dir: '%s'.", cache_dir)
280        return None
281
282    cache_dir = os.path.abspath(cache_dir)
283
284    server = _ensure_cache_server(cache_dir)
285
286    # Create a URL that points to the cache server.
287    parts = urllib.parse.urlparse(url)
288    cache_url = parts._replace(scheme = 'http', netloc = f"127.0.0.1:{server.port}").geturl()
289
290    response = requests.request(method, cache_url, **options)  # pylint: disable=missing-timeout
291    if (response.status_code == http.HTTPStatus.NOT_FOUND):
292        return None
293
294    return response
295
296def _ensure_cache_server(cache_dir: str) -> edq.net.exchangeserver.HTTPExchangeServer:
297    """
298    Ensure that a cache server is runner on the specified dir.
299    Return the cache server.
300    """
301
302    server = _cache_servers.get(cache_dir, None)
303    if (server is not None):
304        return server
305
306    edq.util.dirent.mkdir(cache_dir)
307
308    server = edq.net.exchangeserver.HTTPExchangeServer()
309    server.load_exchanges_dir(cache_dir)
310    server.start()
311    atexit.register(_cleanup_cache_server, cache_dir)
312
313    _cache_servers[cache_dir] = server
314
315    return server
316
317def _cleanup_cache_server(cache_dir: str) -> None:
318    """ Stop a cache server and remove it from the mapping. """
319
320    server = _cache_servers.get(cache_dir, None)
321    if (server is None):
322        return
323
324    server.stop()
325    del _cache_servers[cache_dir]
326
327def _clear_cache_servers() -> None:
328    """ Stop and remove any cache servers. """
329
330    for cache_dir in list(_cache_servers.keys()):
331        _cleanup_cache_server(cache_dir)
332
333def _disable_https_verification() -> None:
334    """ Disable checking the SSL certificate for HTTPS requests. """
335
336    global _module_makerequest_options  # pylint: disable=global-statement
337
338    if (_module_makerequest_options is None):
339        _module_makerequest_options = {}
340
341    _module_makerequest_options['verify'] = False
342
343    # Ignore insecure warnings.
344    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DEFAULT_REQUEST_TIMEOUT_SECS: float = 10.0

Default timeout for an HTTP request.

RETRY_BACKOFF_SECS: float = 0.5
@typing.runtime_checkable
class ResponseModifierFunction(typing.Protocol):
46@typing.runtime_checkable
47class ResponseModifierFunction(typing.Protocol):
48    """
49    A function that can be used to modify an exchange's response.
50    Exchanges can use these functions to normalize their responses before saving.
51    """
52
53    def __call__(self,
54            response: requests.Response,
55            body: str,
56            ) -> str:
57        """
58        Modify the http response.
59        Headers may be modified in the response directly,
60        while the modified (or same) body must be returned.
61        """

A function that can be used to modify an exchange's response. Exchanges can use these functions to normalize their responses before saving.

ResponseModifierFunction(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
def make_request( method: str, url: str, headers: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[List[Any]] = None, raise_for_status: bool = True, timeout_secs: float = 10.0, cache_dir: Optional[str] = None, ignore_cache: bool = False, output_dir: Optional[str] = None, send_anchor_header: bool = True, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, http_exchange_extension: str = '.httpex.json', add_http_prefix: bool = True, additional_requests_options: Optional[Dict[str, Any]] = None, exchange_complete_func: Optional[edq.net.exchange.HTTPExchangeComplete] = None, allow_redirects: Optional[bool] = None, use_module_options: bool = True, retries: int = 0, **kwargs: Any) -> Tuple[requests.models.Response, str]:
 63def make_request(method: str, url: str,
 64        headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 65        data: typing.Union[typing.Dict[str, typing.Any], None] = None,
 66        files: typing.Union[typing.List[typing.Any], None] = None,
 67        raise_for_status: bool = True,
 68        timeout_secs: float = DEFAULT_REQUEST_TIMEOUT_SECS,
 69        cache_dir: typing.Union[str, None] = None,
 70        ignore_cache: bool = False,
 71        output_dir: typing.Union[str, None] = None,
 72        send_anchor_header: bool = True,
 73        headers_to_skip: typing.Union[typing.List[str], None] = None,
 74        params_to_skip: typing.Union[typing.List[str], None] = None,
 75        http_exchange_extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION,
 76        add_http_prefix: bool = True,
 77        additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 78        exchange_complete_func: typing.Union[edq.net.exchange.HTTPExchangeComplete, None] = None,
 79        allow_redirects: typing.Union[bool, None] = None,
 80        use_module_options: bool = True,
 81        retries: int = 0,
 82        **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 83    """
 84    Make an HTTP request and return the response object and text body.
 85    """
 86
 87    if (add_http_prefix and (not url.lower().startswith('http'))):
 88        url = 'http://' + url
 89
 90    retries = max(0, retries)
 91
 92    if (cache_dir is None):
 93        cache_dir = _exchanges_cache_dir
 94
 95    if (ignore_cache):
 96        cache_dir = None
 97
 98    if (output_dir is None):
 99        output_dir = _exchanges_out_dir
100
101    if (headers is None):
102        headers = {}
103
104    if (data is None):
105        data = {}
106
107    if (files is None):
108        files = []
109
110    if (additional_requests_options is None):
111        additional_requests_options = {}
112
113    # Add in the anchor as a header (since it is not traditionally sent in an HTTP request).
114    if (send_anchor_header):
115        headers = headers.copy()
116
117        parts = urllib.parse.urlparse(url)
118        headers[edq.net.exchange.ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#')
119
120    options: typing.Dict[str, typing.Any] = {
121        'timeout': timeout_secs,
122    }
123
124    if (use_module_options and (_module_makerequest_options is not None)):
125        options.update(_module_makerequest_options)
126
127    options.update(additional_requests_options)
128
129    options.update({
130        'headers': headers,
131        'files': files,
132    })
133
134    if (allow_redirects is not None):
135        options['allow_redirects'] = allow_redirects
136
137    if (method == 'GET'):
138        options['params'] = data
139    else:
140        options['data'] = data
141
142    _logger.debug("Making %s request: '%s' (options = %s).", method, url, options)
143    response = _make_request_with_cache(method, url, options, cache_dir, retries)
144
145    body = response.text
146    if (_logger.level <= logging.DEBUG):
147        log_body = body
148        if (response.encoding is None):
149            log_body = f"<hash> {edq.util.hash.sha256_hex(response.content)}"
150
151        _logger.debug("Response:\n%s", log_body)
152
153    if (raise_for_status):
154        # Handle 404s a little special, as their body may contain useful information.
155        if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')):
156            response.reason += f" (Body: '{body.strip()}')"
157
158        response.raise_for_status()
159
160    exchange = None
161    if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)):
162        exchange = edq.net.exchange.HTTPExchange.from_response(response,
163                headers_to_skip = headers_to_skip, params_to_skip = params_to_skip,
164                allow_redirects = options.get('allow_redirects', None))
165
166    if ((output_dir is not None) and (exchange is not None)):
167        relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension)
168        path = os.path.abspath(os.path.join(output_dir, relpath))
169
170        edq.util.dirent.mkdir(os.path.dirname(path))
171        edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False)
172
173    if ((exchange_complete_func is not None) and (exchange is not None)):
174        exchange_complete_func(exchange)
175
176    if ((_make_request_exchange_complete_func is not None) and (exchange is not None)):
177        _make_request_exchange_complete_func(exchange)  # pylint: disable=not-callable
178
179    return response, body

Make an HTTP request and return the response object and text body.

def make_with_exchange( exchange: edq.net.exchange.HTTPExchange, base_url: str, raise_for_status: bool = True, **kwargs: Any) -> Tuple[requests.models.Response, str]:
181def make_with_exchange(
182        exchange: edq.net.exchange.HTTPExchange,
183        base_url: str,
184        raise_for_status: bool = True,
185        **kwargs: typing.Any,
186        ) -> typing.Tuple[requests.Response, str]:
187    """ Perform the HTTP request described by the given exchange. """
188
189    files = []
190    for file_info in exchange.files:
191        content = file_info.content
192
193        # Content is base64 encoded.
194        if (file_info.b64_encoded and isinstance(content, str)):
195            content = edq.util.encoding.from_base64(content)
196
197        # Content is missing and must be in a file.
198        if (content is None):
199            content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
200
201        files.append((file_info.name, content))
202
203    url = f"{base_url}/{exchange.get_url()}"
204
205    response, body = make_request(exchange.method, url,
206            headers = exchange.headers,
207            data = exchange.parameters,
208            files = files,
209            raise_for_status = raise_for_status,
210            allow_redirects = exchange.allow_redirects,
211            **kwargs,
212    )
213
214    if (exchange.response_modifier is not None):
215        modify_func = edq.util.pyimport.fetch(exchange.response_modifier)
216        body = modify_func(response, body)
217
218    return response, body

Perform the HTTP request described by the given exchange.

def make_get(url: str, **kwargs: Any) -> Tuple[requests.models.Response, str]:
221def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
222    """
223    Make a GET request and return the response object and text body.
224    """
225
226    return make_request('GET', url, **kwargs)

Make a GET request and return the response object and text body.

def make_post(url: str, **kwargs: Any) -> Tuple[requests.models.Response, str]:
228def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
229    """
230    Make a POST request and return the response object and text body.
231    """
232
233    return make_request('POST', url, **kwargs)

Make a POST request and return the response object and text body.