edq.net.request

  1import atexit
  2import http
  3import logging
  4import os
  5import time
  6import typing
  7import urllib.parse
  8import urllib3
  9
 10import requests
 11
 12import edq.core.errors
 13import edq.net.exchange
 14import edq.net.exchangeserver
 15import edq.util.dirent
 16import edq.util.encoding
 17import edq.util.json
 18import edq.util.pyimport
 19
 20_logger = logging.getLogger(__name__)
 21
 22DEFAULT_REQUEST_TIMEOUT_SECS: typing.Union[float, typing.Tuple[float, float]] = (30.0, 60.0 * 30)
 23"""
 24Default timeout for an HTTP request.
 25Can be a single float for both the connection and read timeouts.
 26Or a tuple for the connection and read timeouts, respectively.
 27
 28See: https://docs.python-requests.org/en/latest/user/advanced/#timeouts
 29"""
 30
 31RETRY_BACKOFF_SECS: float = 0.5
 32
 33_exchanges_cache_dir: typing.Union[str, None] = None  # pylint: disable=invalid-name
 34""" If not None, all requests made via make_request() will attempt to look in this directory for a matching exchange first. """
 35
 36_exchanges_out_dir: typing.Union[str, None] = None  # pylint: disable=invalid-name
 37""" If not None, all requests made via make_request() will be saved as an HTTPExchange in this directory. """
 38
 39_module_makerequest_options: typing.Union[typing.Dict[str, typing.Any], None] = None  # pylint: disable=invalid-name
 40"""
 41Module-wide options for requests.request().
 42These should generally only be used in testing.
 43"""
 44
 45_cache_servers: typing.Dict[str, edq.net.exchangeserver.HTTPExchangeServer] = {}
 46""" A mapping of cache dirs to their active cache server. """
 47
 48_make_request_exchange_complete_func: typing.Union[edq.net.exchange.HTTPExchangeComplete, None] = None  # pylint: disable=invalid-name
 49""" If not None, call this func after make_request() has created its HTTPExchange. """
 50
 51@typing.runtime_checkable
 52class ResponseModifierFunction(typing.Protocol):
 53    """
 54    A function that can be used to modify an exchange's response.
 55    Exchanges can use these functions to normalize their responses before saving.
 56    """
 57
 58    def __call__(self,
 59            response: requests.Response,
 60            body: str,
 61            ) -> str:
 62        """
 63        Modify the http response.
 64        Headers may be modified in the response directly,
 65        while the modified (or same) body must be returned.
 66        """
 67
 68def make_request(method: str, url: str,
 69        headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 70        data: typing.Union[typing.Dict[str, typing.Any], None] = None,
 71        files: typing.Union[typing.List[typing.Any], None] = None,
 72        raise_for_status: bool = True,
 73        timeout_secs: typing.Union[float, typing.Tuple[float, float]] = DEFAULT_REQUEST_TIMEOUT_SECS,
 74        cache_dir: typing.Union[str, None] = None,
 75        ignore_cache: bool = False,
 76        output_dir: typing.Union[str, None] = None,
 77        send_anchor_header: bool = True,
 78        headers_to_skip: typing.Union[typing.List[str], None] = None,
 79        params_to_skip: typing.Union[typing.List[str], None] = None,
 80        http_exchange_extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION,
 81        add_http_prefix: bool = True,
 82        additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 83        exchange_complete_func: typing.Union[edq.net.exchange.HTTPExchangeComplete, None] = None,
 84        allow_redirects: typing.Union[bool, None] = None,
 85        use_module_options: bool = True,
 86        retries: int = 0,
 87        **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 88    """
 89    Make an HTTP request and return the response object and text body.
 90    """
 91
 92    if (add_http_prefix and (not url.lower().startswith('http'))):
 93        url = 'http://' + url
 94
 95    retries = max(0, retries)
 96
 97    if (cache_dir is None):
 98        cache_dir = _exchanges_cache_dir
 99
100    if (ignore_cache):
101        cache_dir = None
102
103    if (output_dir is None):
104        output_dir = _exchanges_out_dir
105
106    if (headers is None):
107        headers = {}
108
109    if (data is None):
110        data = {}
111
112    if (files is None):
113        files = []
114
115    if (additional_requests_options is None):
116        additional_requests_options = {}
117
118    # Add in the anchor as a header (since it is not traditionally sent in an HTTP request).
119    if (send_anchor_header):
120        headers = headers.copy()
121
122        parts = urllib.parse.urlparse(url)
123        headers[edq.net.exchange.ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#')
124
125    options: typing.Dict[str, typing.Any] = {
126        'timeout': timeout_secs,
127    }
128
129    if (use_module_options and (_module_makerequest_options is not None)):
130        options.update(_module_makerequest_options)
131
132    options.update(additional_requests_options)
133
134    options.update({
135        'headers': headers,
136        'files': files,
137    })
138
139    if (allow_redirects is not None):
140        options['allow_redirects'] = allow_redirects
141
142    if (method == 'GET'):
143        options['params'] = data
144    else:
145        options['data'] = data
146
147    _logger.debug("Making %s request: '%s' (options = %s).", method, url, options)
148    response = _make_request_with_cache(method, url, options, cache_dir, retries)
149
150    body = response.text
151    if (_logger.level <= logging.DEBUG):
152        log_body = body
153        if (response.encoding is None):
154            log_body = f"<hash> {edq.util.hash.sha256_hex(response.content)}"
155
156        _logger.debug("Response:\n%s", log_body)
157
158    if (raise_for_status):
159        # Handle 404s a little special, as their body may contain useful information.
160        if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')):
161            response.reason += f" (Body: '{body.strip()}')"
162
163        response.raise_for_status()
164
165    exchange = None
166    if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)):
167        exchange = edq.net.exchange.HTTPExchange.from_response(response,
168                headers_to_skip = headers_to_skip, params_to_skip = params_to_skip,
169                allow_redirects = options.get('allow_redirects', None))
170
171    if ((output_dir is not None) and (exchange is not None)):
172        relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension)
173        path = os.path.abspath(os.path.join(output_dir, relpath))
174
175        edq.util.dirent.mkdir(os.path.dirname(path))
176        edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False)
177
178    if ((exchange_complete_func is not None) and (exchange is not None)):
179        exchange_complete_func(exchange)
180
181    if ((_make_request_exchange_complete_func is not None) and (exchange is not None)):
182        _make_request_exchange_complete_func(exchange)  # pylint: disable=not-callable
183
184    return response, body
185
186def make_with_exchange(
187        exchange: edq.net.exchange.HTTPExchange,
188        base_url: str,
189        raise_for_status: bool = True,
190        **kwargs: typing.Any,
191        ) -> typing.Tuple[requests.Response, str]:
192    """ Perform the HTTP request described by the given exchange. """
193
194    files = []
195    for file_info in exchange.files:
196        content = file_info.content
197
198        # Content is base64 encoded.
199        if (file_info.b64_encoded and isinstance(content, str)):
200            content = edq.util.encoding.from_base64(content)
201
202        # Content is missing and must be in a file.
203        if (content is None):
204            content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
205
206        files.append((file_info.name, content))
207
208    url = f"{base_url}/{exchange.get_url()}"
209
210    response, body = make_request(exchange.method, url,
211            headers = exchange.headers,
212            data = exchange.parameters,
213            files = files,
214            raise_for_status = raise_for_status,
215            allow_redirects = exchange.allow_redirects,
216            **kwargs,
217    )
218
219    if (exchange.response_modifier is not None):
220        modify_func = edq.util.pyimport.fetch(exchange.response_modifier)
221        body = modify_func(response, body)
222
223    return response, body
224
225
226def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
227    """
228    Make a GET request and return the response object and text body.
229    """
230
231    return make_request('GET', url, **kwargs)
232
233def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
234    """
235    Make a POST request and return the response object and text body.
236    """
237
238    return make_request('POST', url, **kwargs)
239
240def _make_request_with_cache(
241        method: str,
242        url: str,
243        options: typing.Dict[str, typing.Any],
244        cache_dir: typing.Union[str, None],
245        retries: int,
246        ) -> requests.Response:
247    """ Make a request, potentially using a cache. """
248
249    response: typing.Union[requests.Response, None] = None
250    if (cache_dir is not None):
251        response = _cache_lookup(method, url, options, cache_dir)
252
253    if (response is not None):
254        return response
255
256    # Try once and then the number of allowed retries.
257    attempt_count = 1 + retries
258
259    errors = []
260    for attempt_index in range(attempt_count):
261        if (attempt_index > 0):
262            # Wait before the next retry.
263            time.sleep(attempt_index * RETRY_BACKOFF_SECS)
264
265        try:
266            response = requests.request(method, url, **options)  # pylint: disable=missing-timeout
267            break
268        except Exception as ex:
269            errors.append(ex)
270
271    if (len(errors) == attempt_count):
272        raise edq.core.errors.RetryError(f"HTTP {method} for '{url}'", attempt_count, retry_errors = errors)
273
274    return response
275
276def _cache_lookup(
277        method: str,
278        url: str,
279        options: typing.Dict[str, typing.Any],
280        cache_dir: str,
281        ) -> typing.Union[requests.Response, None]:
282    """ Attempt to lookup an exchange from the cache. """
283
284    if (not os.path.isdir(cache_dir)):
285        _logger.warning("Cache dir does not exist or is not a dir: '%s'.", cache_dir)
286        return None
287
288    cache_dir = os.path.abspath(cache_dir)
289
290    server = _ensure_cache_server(cache_dir)
291
292    # Create a URL that points to the cache server.
293    parts = urllib.parse.urlparse(url)
294    cache_url = parts._replace(scheme = 'http', netloc = f"127.0.0.1:{server.port}").geturl()
295
296    response = requests.request(method, cache_url, **options)  # pylint: disable=missing-timeout
297    if (response.status_code == http.HTTPStatus.NOT_FOUND):
298        return None
299
300    return response
301
302def _ensure_cache_server(cache_dir: str) -> edq.net.exchangeserver.HTTPExchangeServer:
303    """
304    Ensure that a cache server is runner on the specified dir.
305    Return the cache server.
306    """
307
308    server = _cache_servers.get(cache_dir, None)
309    if (server is not None):
310        return server
311
312    edq.util.dirent.mkdir(cache_dir)
313
314    server = edq.net.exchangeserver.HTTPExchangeServer()
315    server.load_exchanges_dir(cache_dir)
316    server.start()
317    atexit.register(_cleanup_cache_server, cache_dir)
318
319    _cache_servers[cache_dir] = server
320
321    return server
322
323def _cleanup_cache_server(cache_dir: str) -> None:
324    """ Stop a cache server and remove it from the mapping. """
325
326    server = _cache_servers.get(cache_dir, None)
327    if (server is None):
328        return
329
330    server.stop()
331    del _cache_servers[cache_dir]
332
333def _clear_cache_servers() -> None:
334    """ Stop and remove any cache servers. """
335
336    for cache_dir in list(_cache_servers.keys()):
337        _cleanup_cache_server(cache_dir)
338
339def _disable_https_verification() -> None:
340    """ Disable checking the SSL certificate for HTTPS requests. """
341
342    global _module_makerequest_options  # pylint: disable=global-statement
343
344    if (_module_makerequest_options is None):
345        _module_makerequest_options = {}
346
347    _module_makerequest_options['verify'] = False
348
349    # Ignore insecure warnings.
350    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DEFAULT_REQUEST_TIMEOUT_SECS: Union[float, Tuple[float, float]] = (30.0, 1800.0)

Default timeout for an HTTP request. Can be a single float for both the connection and read timeouts. Or a tuple for the connection and read timeouts, respectively.

See: https://docs.python-requests.org/en/latest/user/advanced/#timeouts

RETRY_BACKOFF_SECS: float = 0.5
@typing.runtime_checkable
class ResponseModifierFunction(typing.Protocol):
52@typing.runtime_checkable
53class ResponseModifierFunction(typing.Protocol):
54    """
55    A function that can be used to modify an exchange's response.
56    Exchanges can use these functions to normalize their responses before saving.
57    """
58
59    def __call__(self,
60            response: requests.Response,
61            body: str,
62            ) -> str:
63        """
64        Modify the http response.
65        Headers may be modified in the response directly,
66        while the modified (or same) body must be returned.
67        """

A function that can be used to modify an exchange's response. Exchanges can use these functions to normalize their responses before saving.

ResponseModifierFunction(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
def make_request( method: str, url: str, headers: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, files: Optional[List[Any]] = None, raise_for_status: bool = True, timeout_secs: Union[float, Tuple[float, float]] = (30.0, 1800.0), cache_dir: Optional[str] = None, ignore_cache: bool = False, output_dir: Optional[str] = None, send_anchor_header: bool = True, headers_to_skip: Optional[List[str]] = None, params_to_skip: Optional[List[str]] = None, http_exchange_extension: str = '.httpex.json', add_http_prefix: bool = True, additional_requests_options: Optional[Dict[str, Any]] = None, exchange_complete_func: Optional[edq.net.exchange.HTTPExchangeComplete] = None, allow_redirects: Optional[bool] = None, use_module_options: bool = True, retries: int = 0, **kwargs: Any) -> Tuple[requests.models.Response, str]:
 69def make_request(method: str, url: str,
 70        headers: typing.Union[typing.Dict[str, typing.Any], None] = None,
 71        data: typing.Union[typing.Dict[str, typing.Any], None] = None,
 72        files: typing.Union[typing.List[typing.Any], None] = None,
 73        raise_for_status: bool = True,
 74        timeout_secs: typing.Union[float, typing.Tuple[float, float]] = DEFAULT_REQUEST_TIMEOUT_SECS,
 75        cache_dir: typing.Union[str, None] = None,
 76        ignore_cache: bool = False,
 77        output_dir: typing.Union[str, None] = None,
 78        send_anchor_header: bool = True,
 79        headers_to_skip: typing.Union[typing.List[str], None] = None,
 80        params_to_skip: typing.Union[typing.List[str], None] = None,
 81        http_exchange_extension: str = edq.net.exchange.DEFAULT_HTTP_EXCHANGE_EXTENSION,
 82        add_http_prefix: bool = True,
 83        additional_requests_options: typing.Union[typing.Dict[str, typing.Any], None] = None,
 84        exchange_complete_func: typing.Union[edq.net.exchange.HTTPExchangeComplete, None] = None,
 85        allow_redirects: typing.Union[bool, None] = None,
 86        use_module_options: bool = True,
 87        retries: int = 0,
 88        **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
 89    """
 90    Make an HTTP request and return the response object and text body.
 91    """
 92
 93    if (add_http_prefix and (not url.lower().startswith('http'))):
 94        url = 'http://' + url
 95
 96    retries = max(0, retries)
 97
 98    if (cache_dir is None):
 99        cache_dir = _exchanges_cache_dir
100
101    if (ignore_cache):
102        cache_dir = None
103
104    if (output_dir is None):
105        output_dir = _exchanges_out_dir
106
107    if (headers is None):
108        headers = {}
109
110    if (data is None):
111        data = {}
112
113    if (files is None):
114        files = []
115
116    if (additional_requests_options is None):
117        additional_requests_options = {}
118
119    # Add in the anchor as a header (since it is not traditionally sent in an HTTP request).
120    if (send_anchor_header):
121        headers = headers.copy()
122
123        parts = urllib.parse.urlparse(url)
124        headers[edq.net.exchange.ANCHOR_HEADER_KEY] = parts.fragment.lstrip('#')
125
126    options: typing.Dict[str, typing.Any] = {
127        'timeout': timeout_secs,
128    }
129
130    if (use_module_options and (_module_makerequest_options is not None)):
131        options.update(_module_makerequest_options)
132
133    options.update(additional_requests_options)
134
135    options.update({
136        'headers': headers,
137        'files': files,
138    })
139
140    if (allow_redirects is not None):
141        options['allow_redirects'] = allow_redirects
142
143    if (method == 'GET'):
144        options['params'] = data
145    else:
146        options['data'] = data
147
148    _logger.debug("Making %s request: '%s' (options = %s).", method, url, options)
149    response = _make_request_with_cache(method, url, options, cache_dir, retries)
150
151    body = response.text
152    if (_logger.level <= logging.DEBUG):
153        log_body = body
154        if (response.encoding is None):
155            log_body = f"<hash> {edq.util.hash.sha256_hex(response.content)}"
156
157        _logger.debug("Response:\n%s", log_body)
158
159    if (raise_for_status):
160        # Handle 404s a little special, as their body may contain useful information.
161        if ((response.status_code == http.HTTPStatus.NOT_FOUND) and (body is not None) and (body.strip() != '')):
162            response.reason += f" (Body: '{body.strip()}')"
163
164        response.raise_for_status()
165
166    exchange = None
167    if ((output_dir is not None) or (exchange_complete_func is not None) or (_make_request_exchange_complete_func is not None)):
168        exchange = edq.net.exchange.HTTPExchange.from_response(response,
169                headers_to_skip = headers_to_skip, params_to_skip = params_to_skip,
170                allow_redirects = options.get('allow_redirects', None))
171
172    if ((output_dir is not None) and (exchange is not None)):
173        relpath = exchange.compute_relpath(http_exchange_extension = http_exchange_extension)
174        path = os.path.abspath(os.path.join(output_dir, relpath))
175
176        edq.util.dirent.mkdir(os.path.dirname(path))
177        edq.util.json.dump_path(exchange, path, indent = 4, sort_keys = False)
178
179    if ((exchange_complete_func is not None) and (exchange is not None)):
180        exchange_complete_func(exchange)
181
182    if ((_make_request_exchange_complete_func is not None) and (exchange is not None)):
183        _make_request_exchange_complete_func(exchange)  # pylint: disable=not-callable
184
185    return response, body

Make an HTTP request and return the response object and text body.

def make_with_exchange( exchange: edq.net.exchange.HTTPExchange, base_url: str, raise_for_status: bool = True, **kwargs: Any) -> Tuple[requests.models.Response, str]:
187def make_with_exchange(
188        exchange: edq.net.exchange.HTTPExchange,
189        base_url: str,
190        raise_for_status: bool = True,
191        **kwargs: typing.Any,
192        ) -> typing.Tuple[requests.Response, str]:
193    """ Perform the HTTP request described by the given exchange. """
194
195    files = []
196    for file_info in exchange.files:
197        content = file_info.content
198
199        # Content is base64 encoded.
200        if (file_info.b64_encoded and isinstance(content, str)):
201            content = edq.util.encoding.from_base64(content)
202
203        # Content is missing and must be in a file.
204        if (content is None):
205            content = open(file_info.path, 'rb')  # type: ignore[assignment,arg-type]  # pylint: disable=consider-using-with
206
207        files.append((file_info.name, content))
208
209    url = f"{base_url}/{exchange.get_url()}"
210
211    response, body = make_request(exchange.method, url,
212            headers = exchange.headers,
213            data = exchange.parameters,
214            files = files,
215            raise_for_status = raise_for_status,
216            allow_redirects = exchange.allow_redirects,
217            **kwargs,
218    )
219
220    if (exchange.response_modifier is not None):
221        modify_func = edq.util.pyimport.fetch(exchange.response_modifier)
222        body = modify_func(response, body)
223
224    return response, body

Perform the HTTP request described by the given exchange.

def make_get(url: str, **kwargs: Any) -> Tuple[requests.models.Response, str]:
227def make_get(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
228    """
229    Make a GET request and return the response object and text body.
230    """
231
232    return make_request('GET', url, **kwargs)

Make a GET request and return the response object and text body.

def make_post(url: str, **kwargs: Any) -> Tuple[requests.models.Response, str]:
234def make_post(url: str, **kwargs: typing.Any) -> typing.Tuple[requests.Response, str]:
235    """
236    Make a POST request and return the response object and text body.
237    """
238
239    return make_request('POST', url, **kwargs)

Make a POST request and return the response object and text body.