edq.net.util
1import email.message 2import errno 3import io 4import socket 5import time 6import typing 7import urllib.parse 8 9import requests_toolbelt.multipart.decoder 10 11import edq.util.dirent 12 13DEFAULT_START_PORT: int = 30000 14DEFAULT_END_PORT: int = 40000 15DEFAULT_PORT_SEARCH_WAIT_SEC: float = 0.01 16 17def find_open_port( 18 start_port: int = DEFAULT_START_PORT, 19 end_port: int = DEFAULT_END_PORT, 20 wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC, 21 ) -> int: 22 """ 23 Find an open port on this machine within the given range (inclusive). 24 If no open port is found, an error is raised. 25 """ 26 27 for port in range(start_port, end_port + 1): 28 try: 29 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 30 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 31 sock.bind(('127.0.0.1', port)) 32 33 # Explicitly close the port and wait a short amount of time for the port to clear. 34 # This should not be required because of the socket option above, 35 # but the cost is small. 36 sock.close() 37 time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC) 38 39 return port 40 except socket.error as ex: 41 sock.close() 42 43 if (ex.errno == errno.EADDRINUSE): 44 continue 45 46 # Unknown error. 47 raise ex 48 49 raise ValueError(f"Could not find open port in [{start_port}, {end_port}].") 50 51def parse_request_data( 52 url: str, 53 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 54 body: typing.Union[bytes, str, io.BufferedIOBase], 55 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 56 """ Parse data and files from an HTTP request URL and body. """ 57 58 # Parse data from the request body. 59 request_data, request_files = parse_request_body_data(headers, body) 60 61 # Parse parameters from the URL. 62 url_parts = urllib.parse.urlparse(url) 63 request_data.update(parse_query_string(url_parts.query)) 64 65 return request_data, request_files 66 67def parse_request_body_data( 68 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 69 body: typing.Union[bytes, str, io.BufferedIOBase], 70 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 71 """ Parse data and files from an HTTP request body. """ 72 73 data: typing.Dict[str, typing.Any] = {} 74 files: typing.Dict[str, bytes] = {} 75 76 length = int(headers.get('Content-Length', 0)) 77 if (length == 0): 78 return data, files 79 80 if (isinstance(body, io.BufferedIOBase)): 81 raw_content = body.read(length) 82 elif (isinstance(body, str)): 83 raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING) 84 else: 85 raw_content = body 86 87 content_type = headers.get('Content-Type', '').lower() 88 89 if (content_type.startswith('text/plain')): 90 data[''] = raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip() 91 return data, files 92 93 if (content_type in ['', 'application/x-www-form-urlencoded']): 94 data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip()) 95 return data, files 96 97 if (content_type.startswith('multipart/form-data')): 98 decoder = requests_toolbelt.multipart.decoder.MultipartDecoder( 99 raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING) 100 101 for multipart_section in decoder.parts: 102 values = parse_content_dispositions(multipart_section.headers) 103 104 name = values.get('name', None) 105 if (name is None): 106 raise ValueError("Could not find name for multipart section.") 107 108 # Look for a "filename" field to indicate a multipart section is a file. 109 # The file's desired name is still in "name", but an alternate name is in "filename". 110 if ('filename' in values): 111 filename = values.get('name', '') 112 if (filename == ''): 113 raise ValueError("Unable to find filename for multipart section.") 114 115 files[filename] = multipart_section.content 116 else: 117 # Normal Parameter 118 data[name] = multipart_section.text 119 120 return data, files 121 122 raise ValueError(f"Unknown content type: '{content_type}'.") 123 124def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]: 125 """ Parse a request's content dispositions from headers. """ 126 127 values = {} 128 for (key, value) in headers.items(): 129 if (isinstance(key, bytes)): 130 key = key.decode(edq.util.dirent.DEFAULT_ENCODING) 131 132 if (isinstance(value, bytes)): 133 value = value.decode(edq.util.dirent.DEFAULT_ENCODING) 134 135 key = key.strip().lower() 136 if (key != 'content-disposition'): 137 continue 138 139 # The Python stdlib recommends using the email library for this parsing, 140 # but I have not had a good experience with it. 141 for part in value.strip().split(';'): 142 part = part.strip() 143 144 parts = part.split('=') 145 if (len(parts) != 2): 146 continue 147 148 cd_key = parts[0].strip() 149 cd_value = parts[1].strip().strip('"') 150 151 values[cd_key] = cd_value 152 153 return values 154 155def parse_query_string(text: str, 156 replace_single_lists: bool = True, 157 keep_blank_values: bool = True, 158 **kwargs: typing.Any) -> typing.Dict[str, typing.Any]: 159 """ 160 Parse a query string (like urllib.parse.parse_qs()), and normalize the result. 161 If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value. 162 """ 163 164 results = urllib.parse.parse_qs(text, keep_blank_values = True) 165 for (key, value) in results.items(): 166 if (replace_single_lists and (len(value) == 1)): 167 results[key] = value[0] # type: ignore[assignment] 168 169 return results
DEFAULT_START_PORT: int =
30000
DEFAULT_END_PORT: int =
40000
DEFAULT_PORT_SEARCH_WAIT_SEC: float =
0.01
def
find_open_port( start_port: int = 30000, end_port: int = 40000, wait_time: float = 0.01) -> int:
18def find_open_port( 19 start_port: int = DEFAULT_START_PORT, 20 end_port: int = DEFAULT_END_PORT, 21 wait_time: float = DEFAULT_PORT_SEARCH_WAIT_SEC, 22 ) -> int: 23 """ 24 Find an open port on this machine within the given range (inclusive). 25 If no open port is found, an error is raised. 26 """ 27 28 for port in range(start_port, end_port + 1): 29 try: 30 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 31 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 32 sock.bind(('127.0.0.1', port)) 33 34 # Explicitly close the port and wait a short amount of time for the port to clear. 35 # This should not be required because of the socket option above, 36 # but the cost is small. 37 sock.close() 38 time.sleep(DEFAULT_PORT_SEARCH_WAIT_SEC) 39 40 return port 41 except socket.error as ex: 42 sock.close() 43 44 if (ex.errno == errno.EADDRINUSE): 45 continue 46 47 # Unknown error. 48 raise ex 49 50 raise ValueError(f"Could not find open port in [{start_port}, {end_port}].")
Find an open port on this machine within the given range (inclusive). If no open port is found, an error is raised.
def
parse_request_data( url: str, headers: Union[email.message.Message, Dict[str, Any]], body: Union[bytes, str, io.BufferedIOBase]) -> Tuple[Dict[str, Any], Dict[str, bytes]]:
52def parse_request_data( 53 url: str, 54 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 55 body: typing.Union[bytes, str, io.BufferedIOBase], 56 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 57 """ Parse data and files from an HTTP request URL and body. """ 58 59 # Parse data from the request body. 60 request_data, request_files = parse_request_body_data(headers, body) 61 62 # Parse parameters from the URL. 63 url_parts = urllib.parse.urlparse(url) 64 request_data.update(parse_query_string(url_parts.query)) 65 66 return request_data, request_files
Parse data and files from an HTTP request URL and body.
def
parse_request_body_data( headers: Union[email.message.Message, Dict[str, Any]], body: Union[bytes, str, io.BufferedIOBase]) -> Tuple[Dict[str, Any], Dict[str, bytes]]:
68def parse_request_body_data( 69 headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]], 70 body: typing.Union[bytes, str, io.BufferedIOBase], 71 ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, bytes]]: 72 """ Parse data and files from an HTTP request body. """ 73 74 data: typing.Dict[str, typing.Any] = {} 75 files: typing.Dict[str, bytes] = {} 76 77 length = int(headers.get('Content-Length', 0)) 78 if (length == 0): 79 return data, files 80 81 if (isinstance(body, io.BufferedIOBase)): 82 raw_content = body.read(length) 83 elif (isinstance(body, str)): 84 raw_content = body.encode(edq.util.dirent.DEFAULT_ENCODING) 85 else: 86 raw_content = body 87 88 content_type = headers.get('Content-Type', '').lower() 89 90 if (content_type.startswith('text/plain')): 91 data[''] = raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip() 92 return data, files 93 94 if (content_type in ['', 'application/x-www-form-urlencoded']): 95 data = parse_query_string(raw_content.decode(edq.util.dirent.DEFAULT_ENCODING).strip()) 96 return data, files 97 98 if (content_type.startswith('multipart/form-data')): 99 decoder = requests_toolbelt.multipart.decoder.MultipartDecoder( 100 raw_content, content_type, encoding = edq.util.dirent.DEFAULT_ENCODING) 101 102 for multipart_section in decoder.parts: 103 values = parse_content_dispositions(multipart_section.headers) 104 105 name = values.get('name', None) 106 if (name is None): 107 raise ValueError("Could not find name for multipart section.") 108 109 # Look for a "filename" field to indicate a multipart section is a file. 110 # The file's desired name is still in "name", but an alternate name is in "filename". 111 if ('filename' in values): 112 filename = values.get('name', '') 113 if (filename == ''): 114 raise ValueError("Unable to find filename for multipart section.") 115 116 files[filename] = multipart_section.content 117 else: 118 # Normal Parameter 119 data[name] = multipart_section.text 120 121 return data, files 122 123 raise ValueError(f"Unknown content type: '{content_type}'.")
Parse data and files from an HTTP request body.
def
parse_content_dispositions(headers: Union[email.message.Message, Dict[str, Any]]) -> Dict[str, Any]:
125def parse_content_dispositions(headers: typing.Union[email.message.Message, typing.Dict[str, typing.Any]]) -> typing.Dict[str, typing.Any]: 126 """ Parse a request's content dispositions from headers. """ 127 128 values = {} 129 for (key, value) in headers.items(): 130 if (isinstance(key, bytes)): 131 key = key.decode(edq.util.dirent.DEFAULT_ENCODING) 132 133 if (isinstance(value, bytes)): 134 value = value.decode(edq.util.dirent.DEFAULT_ENCODING) 135 136 key = key.strip().lower() 137 if (key != 'content-disposition'): 138 continue 139 140 # The Python stdlib recommends using the email library for this parsing, 141 # but I have not had a good experience with it. 142 for part in value.strip().split(';'): 143 part = part.strip() 144 145 parts = part.split('=') 146 if (len(parts) != 2): 147 continue 148 149 cd_key = parts[0].strip() 150 cd_value = parts[1].strip().strip('"') 151 152 values[cd_key] = cd_value 153 154 return values
Parse a request's content dispositions from headers.
def
parse_query_string( text: str, replace_single_lists: bool = True, keep_blank_values: bool = True, **kwargs: Any) -> Dict[str, Any]:
156def parse_query_string(text: str, 157 replace_single_lists: bool = True, 158 keep_blank_values: bool = True, 159 **kwargs: typing.Any) -> typing.Dict[str, typing.Any]: 160 """ 161 Parse a query string (like urllib.parse.parse_qs()), and normalize the result. 162 If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value. 163 """ 164 165 results = urllib.parse.parse_qs(text, keep_blank_values = True) 166 for (key, value) in results.items(): 167 if (replace_single_lists and (len(value) == 1)): 168 results[key] = value[0] # type: ignore[assignment] 169 170 return results
Parse a query string (like urllib.parse.parse_qs()), and normalize the result. If specified, lists with single values (as returned from urllib.parse.parse_qs()) will be replaced with the single value.