edq.net.util

  1import email.message
  2import errno
  3import io
  4import socket
  5import time
  6import typing
  7import urllib.parse
  8
  9import requests_toolbelt.multipart.decoder
 10
 11import edq.util.dirent
 12
 13DEFAULT_START_PORT: int = 30000
 14DEFAULT_END_PORT: int = 40000
 15DEFAULT_PORT_SEARCH_WAIT_SEC: float = 0.01
 16
 17def find_open_port(
 18        start_port: int = DEFAULT_START_PORT,
 19        end_port: int = DEFAULT_END_PORT,
 20        wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC,
 21        ) -> int:
 22    """
 23    Find an open port on this machine within the given range (inclusive).
 24    If no open port is found, an error is raised.
 25    """
 26
 27    for port in range(start_port, end_port + 1):
 28        try:
 29            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 30            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 31            sock.bind(('127.0.0.1', port))
 32
 33            # Explicitly close the port and wait a short amount of time for the port to clear.
 34            # This should not be required because of the socket option above,
 35            # but the cost is small.
 36            sock.close()
 37            time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC)
 38
 39            return port
 40        except socket.error as ex:
 41            sock.close()
 42
 43            if (ex.errno == errno.EADDRINUSE):
 44                continue
 45
 46            # Unknown error.
 47            raise ex
 48
 49    raise ValueError(f"Could not find open port in [{start_port}, {end_port}].")
 50
 51def parse_request_data(
 52        url: str,
 53        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
 54        body: typing.Union[bytes, str, io.BufferedIOBase],
 55        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
 56    """ Parse data and files from an HTTP request URL and body. """
 57
 58    # Parse data from the request body.
 59    request_data, request_files = parse_request_body_data(headers, body)
 60
 61    # Parse parameters from the URL.
 62    url_parts = urllib.parse.urlparse(url)
 63    request_data.update(parse_query_string(url_parts.query))
 64
 65    return request_data, request_files
 66
 67def parse_request_body_data(
 68        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
 69        body: typing.Union[bytes, str, io.BufferedIOBase],
 70        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
 71    """ Parse data and files from an HTTP request body. """
 72
 73    data: typing.Dict[str, typing.Any] = {}
 74    files: typing.Dict[str, bytes] = {}
 75
 76    length = int(headers.get('Content-Length', 0))
 77    if (length == 0):
 78        return data, files
 79
 80    if (isinstance(body, io.BufferedIOBase)):
 81        raw_content = body.read(length)
 82    elif (isinstance(body, str)):
 83        raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING)
 84    else:
 85        raw_content = body
 86
 87    content_type = headers.get('Content-Type', '').lower()
 88
 89    if (content_type.startswith('text/plain')):
 90        data[''] = raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip()
 91        return data, files
 92
 93    if (content_type in ['', 'application/x-www-form-urlencoded']):
 94        data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip())
 95        return data, files
 96
 97    if (content_type.startswith('multipart/form-data')):
 98        decoder = requests_toolbelt.multipart.decoder.MultipartDecoder(
 99            raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING)
100
101        for multipart_section in decoder.parts:
102            values = parse_content_dispositions(multipart_section.headers)
103
104            name = values.get('name', None)
105            if (name is None):
106                raise ValueError("Could not find name for multipart section.")
107
108            # Look for a "filename" field to indicate a multipart section is a file.
109            # The file's desired name is still in "name", but an alternate name is in "filename".
110            if ('filename' in values):
111                filename = values.get('name', '')
112                if (filename == ''):
113                    raise ValueError("Unable to find filename for multipart section.")
114
115                files[filename] = multipart_section.content
116            else:
117                # Normal Parameter
118                data[name] = multipart_section.text
119
120        return data, files
121
122    raise ValueError(f"Unknown content type: '{content_type}'.")
123
124def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]:
125    """ Parse a request's content dispositions from headers. """
126
127    values = {}
128    for (key, value) in headers.items():
129        if (isinstance(key, bytes)):
130            key = key.decode(edq.util.dirent.DEFAULT_ENCODING)
131
132        if (isinstance(value, bytes)):
133            value = value.decode(edq.util.dirent.DEFAULT_ENCODING)
134
135        key = key.strip().lower()
136        if (key != 'content-disposition'):
137            continue
138
139        # The Python stdlib recommends using the email library for this parsing,
140        # but I have not had a good experience with it.
141        for part in value.strip().split(';'):
142            part = part.strip()
143
144            parts = part.split('=')
145            if (len(parts) != 2):
146                continue
147
148            cd_key = parts[0].strip()
149            cd_value = parts[1].strip().strip('"')
150
151            values[cd_key] = cd_value
152
153    return values
154
155def parse_query_string(text: str,
156        replace_single_lists: bool = True,
157        keep_blank_values: bool = True,
158        **kwargs: typing.Any) -> typing.Dict[str, typing.Any]:
159    """
160    Parse a query string (like urllib.parse.parse_qs()), and normalize the result.
161    If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.
162    """
163
164    results = urllib.parse.parse_qs(text, keep_blank_values = True)
165    for (key, value) in results.items():
166        if (replace_single_lists and (len(value) == 1)):
167            results[key] = value[0]  # type: ignore[assignment]
168
169    return results
DEFAULT_START_PORT: int = 30000
DEFAULT_END_PORT: int = 40000
DEFAULT_PORT_SEARCH_WAIT_SEC: float = 0.01
def find_open_port( start_port: int = 30000, end_port: int = 40000, wait_time: float = 0.01) -> int:
18def find_open_port(
19        start_port: int = DEFAULT_START_PORT,
20        end_port: int = DEFAULT_END_PORT,
21        wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC,
22        ) -> int:
23    """
24    Find an open port on this machine within the given range (inclusive).
25    If no open port is found, an error is raised.
26    """
27
28    for port in range(start_port, end_port + 1):
29        try:
30            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
31            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
32            sock.bind(('127.0.0.1', port))
33
34            # Explicitly close the port and wait a short amount of time for the port to clear.
35            # This should not be required because of the socket option above,
36            # but the cost is small.
37            sock.close()
38            time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC)
39
40            return port
41        except socket.error as ex:
42            sock.close()
43
44            if (ex.errno == errno.EADDRINUSE):
45                continue
46
47            # Unknown error.
48            raise ex
49
50    raise ValueError(f"Could not find open port in [{start_port}, {end_port}].")

Find an open port on this machine within the given range (inclusive). If no open port is found, an error is raised.

def parse_request_data( url: str, headers: Union[email.message.Message, Dict[str, Any]], body: Union[bytes, str, io.BufferedIOBase]) -> Tuple[Dict[str, Any], Dict[str, bytes]]:
52def parse_request_data(
53        url: str,
54        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
55        body: typing.Union[bytes, str, io.BufferedIOBase],
56        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
57    """ Parse data and files from an HTTP request URL and body. """
58
59    # Parse data from the request body.
60    request_data, request_files = parse_request_body_data(headers, body)
61
62    # Parse parameters from the URL.
63    url_parts = urllib.parse.urlparse(url)
64    request_data.update(parse_query_string(url_parts.query))
65
66    return request_data, request_files

Parse data and files from an HTTP request URL and body.

def parse_request_body_data( headers: Union[email.message.Message, Dict[str, Any]], body: Union[bytes, str, io.BufferedIOBase]) -> Tuple[Dict[str, Any], Dict[str, bytes]]:
 68def parse_request_body_data(
 69        headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]],
 70        body: typing.Union[bytes, str, io.BufferedIOBase],
 71        ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]:
 72    """ Parse data and files from an HTTP request body. """
 73
 74    data: typing.Dict[str, typing.Any] = {}
 75    files: typing.Dict[str, bytes] = {}
 76
 77    length = int(headers.get('Content-Length', 0))
 78    if (length == 0):
 79        return data, files
 80
 81    if (isinstance(body, io.BufferedIOBase)):
 82        raw_content = body.read(length)
 83    elif (isinstance(body, str)):
 84        raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING)
 85    else:
 86        raw_content = body
 87
 88    content_type = headers.get('Content-Type', '').lower()
 89
 90    if (content_type.startswith('text/plain')):
 91        data[''] = raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip()
 92        return data, files
 93
 94    if (content_type in ['', 'application/x-www-form-urlencoded']):
 95        data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip())
 96        return data, files
 97
 98    if (content_type.startswith('multipart/form-data')):
 99        decoder = requests_toolbelt.multipart.decoder.MultipartDecoder(
100            raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING)
101
102        for multipart_section in decoder.parts:
103            values = parse_content_dispositions(multipart_section.headers)
104
105            name = values.get('name', None)
106            if (name is None):
107                raise ValueError("Could not find name for multipart section.")
108
109            # Look for a "filename" field to indicate a multipart section is a file.
110            # The file's desired name is still in "name", but an alternate name is in "filename".
111            if ('filename' in values):
112                filename = values.get('name', '')
113                if (filename == ''):
114                    raise ValueError("Unable to find filename for multipart section.")
115
116                files[filename] = multipart_section.content
117            else:
118                # Normal Parameter
119                data[name] = multipart_section.text
120
121        return data, files
122
123    raise ValueError(f"Unknown content type: '{content_type}'.")

Parse data and files from an HTTP request body.

def parse_content_dispositions(headers: Union[email.message.Message, Dict[str, Any]]) -> Dict[str, Any]:
125def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]:
126    """ Parse a request's content dispositions from headers. """
127
128    values = {}
129    for (key, value) in headers.items():
130        if (isinstance(key, bytes)):
131            key = key.decode(edq.util.dirent.DEFAULT_ENCODING)
132
133        if (isinstance(value, bytes)):
134            value = value.decode(edq.util.dirent.DEFAULT_ENCODING)
135
136        key = key.strip().lower()
137        if (key != 'content-disposition'):
138            continue
139
140        # The Python stdlib recommends using the email library for this parsing,
141        # but I have not had a good experience with it.
142        for part in value.strip().split(';'):
143            part = part.strip()
144
145            parts = part.split('=')
146            if (len(parts) != 2):
147                continue
148
149            cd_key = parts[0].strip()
150            cd_value = parts[1].strip().strip('"')
151
152            values[cd_key] = cd_value
153
154    return values

Parse a request's content dispositions from headers.

def parse_query_string( text: str, replace_single_lists: bool = True, keep_blank_values: bool = True, **kwargs: Any) -> Dict[str, Any]:
156def parse_query_string(text: str,
157        replace_single_lists: bool = True,
158        keep_blank_values: bool = True,
159        **kwargs: typing.Any) -> typing.Dict[str, typing.Any]:
160    """
161    Parse a query string (like urllib.parse.parse_qs()), and normalize the result.
162    If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.
163    """
164
165    results = urllib.parse.parse_qs(text, keep_blank_values = True)
166    for (key, value) in results.items():
167        if (replace_single_lists and (len(value) == 1)):
168            results[key] = value[0]  # type: ignore[assignment]
169
170    return results

Parse a query string (like urllib.parse.parse_qs()), and normalize the result. If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.