edq.util.dirent
Operations relating to directory entries (dirents).
These operations are designed for clarity and compatibility, not performance.
Only directories, files, and links will be handled. Other types of dirents may result in an error being raised.
In general, all recursive operations do not follow symlinks by default and instead treat the link as a file.
1""" 2Operations relating to directory entries (dirents). 3 4These operations are designed for clarity and compatibility, not performance. 5 6Only directories, files, and links will be handled. 7Other types of dirents may result in an error being raised. 8 9In general, all recursive operations do not follow symlinks by default and instead treat the link as a file. 10""" 11 12import atexit 13import os 14import shutil 15import tempfile 16import typing 17import uuid 18 19DEFAULT_ENCODING: str = 'utf-8' 20""" The default encoding that will be used when reading and writing. """ 21 22DEPTH_LIMIT: int = 10000 23 24def exists(path: str) -> bool: 25 """ 26 Check if a path exists. 27 This will transparently call os.path.lexists(), 28 which will include broken links. 29 """ 30 31 return os.path.lexists(path) 32 33def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str: 34 """ 35 Get a path to a valid (but not currently existing) temp dirent. 36 If rm is True, then the dirent will be attempted to be deleted on exit 37 (no error will occur if the path is not there). 38 """ 39 40 path = None 41 while ((path is None) or exists(path)): 42 path = os.path.join(tempfile.gettempdir(), prefix + str(uuid.uuid4()) + suffix) 43 44 path = os.path.realpath(path) 45 46 if (rm): 47 atexit.register(remove, path) 48 49 return path 50 51def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str: 52 """ 53 Get a temp directory. 54 The directory will exist when returned. 55 """ 56 57 path = get_temp_path(prefix = prefix, suffix = suffix, rm = rm) 58 mkdir(path) 59 return path 60 61def mkdir(raw_path: str) -> None: 62 """ 63 Make a directory (including any required parent directories). 64 Does not complain if the directory (or parents) already exist 65 (this includes if the directory or parents are links to directories). 66 """ 67 68 path = os.path.abspath(raw_path) 69 70 if (exists(path)): 71 if (os.path.isdir(path)): 72 return 73 74 raise ValueError(f"Target of mkdir already exists, and is not a dir: '{raw_path}'.") 75 76 _check_parent_dirs(raw_path) 77 78 os.makedirs(path, exist_ok = True) 79 80def _check_parent_dirs(raw_path: str) -> None: 81 """ 82 Check all parents to ensure that they are all dirs (or don't exist). 83 This is naturally handled by os.makedirs(), 84 but the error messages are not consistent between POSIX and Windows. 85 """ 86 87 path = os.path.abspath(raw_path) 88 89 parent_path = path 90 for _ in range(DEPTH_LIMIT): 91 new_parent_path = os.path.dirname(parent_path) 92 if (parent_path == new_parent_path): 93 # We have reached root (are our own parent). 94 return 95 96 parent_path = new_parent_path 97 98 if (os.path.exists(parent_path) and (not os.path.isdir(parent_path))): 99 raise ValueError(f"Target of mkdir contains parent ('{os.path.basename(parent_path)}') that exists and is not a dir: '{raw_path}'.") 100 101 raise ValueError("Depth limit reached.") 102 103def remove(path: str) -> None: 104 """ 105 Remove the given path. 106 The path can be of any type (dir, file, link), 107 and does not need to exist. 108 """ 109 110 if (not exists(path)): 111 return 112 113 if (os.path.isfile(path) or os.path.islink(path)): 114 os.remove(path) 115 elif (os.path.isdir(path)): 116 shutil.rmtree(path) 117 else: 118 raise ValueError(f"Unknown type of dirent: '{path}'.") 119 120def same(a: str, b: str) -> bool: 121 """ 122 Check if two paths represent the same dirent. 123 If either (or both) paths do not exist, false will be returned. 124 If either paths are links, they are resolved before checking 125 (so a link and the target file are considered the "same"). 126 """ 127 128 return (exists(a) and exists(b) and os.path.samefile(a, b)) 129 130def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None: 131 """ 132 Move the source dirent to the given destination. 133 Any existing destination will be removed before moving. 134 """ 135 136 source = os.path.abspath(raw_source) 137 dest = os.path.abspath(raw_dest) 138 139 if (not exists(source)): 140 raise ValueError(f"Source of move does not exist: '{raw_source}'.") 141 142 # If dest is a dir, then resolve the path. 143 if (os.path.isdir(dest)): 144 dest = os.path.abspath(os.path.join(dest, os.path.basename(source))) 145 146 # Skip if this is self. 147 if (same(source, dest)): 148 return 149 150 # Check for clobber. 151 if (exists(dest)): 152 if (no_clobber): 153 raise ValueError(f"Destination of move already exists: '{raw_dest}'.") 154 155 remove(dest) 156 157 # Create any required parents. 158 os.makedirs(os.path.dirname(dest), exist_ok = True) 159 160 shutil.move(source, dest) 161 162def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None: 163 """ 164 Copy a dirent or directory to a destination. 165 166 The destination will be overwritten if it exists (and no_clobber is false). 167 For copying the contents of a directory INTO another directory, use copy_contents(). 168 169 No copy is made if the source and dest refer to the same dirent. 170 """ 171 172 source = os.path.abspath(raw_source) 173 dest = os.path.abspath(raw_dest) 174 175 if (same(source, dest)): 176 return 177 178 if (not exists(source)): 179 raise ValueError(f"Source of copy does not exist: '{raw_source}'.") 180 181 if (exists(dest)): 182 if (no_clobber): 183 raise ValueError(f"Destination of copy already exists: '{raw_dest}'.") 184 185 if (contains_path(dest, source)): 186 raise ValueError(f"Destination of copy cannot contain the source. Destination: '{raw_dest}', Source: '{raw_source}'.") 187 188 remove(dest) 189 190 mkdir(os.path.dirname(dest)) 191 192 if (os.path.islink(source)): 193 # shutil.copy2() can generally handle (broken) links, but Windows is inconsistent (between 3.11 and 3.12) on link handling. 194 link_target = os.readlink(source) 195 os.symlink(link_target, dest) 196 elif (os.path.isfile(source)): 197 shutil.copy2(source, dest, follow_symlinks = False) 198 elif (os.path.isdir(source)): 199 mkdir(dest) 200 201 for child in sorted(os.listdir(source)): 202 copy(os.path.join(raw_source, child), os.path.join(raw_dest, child)) 203 else: 204 raise ValueError(f"Source of copy is not a dir, fie, or link: '{raw_source}'.") 205 206def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None: 207 """ 208 Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination. 209 If the destination exists, it must be a directory. 210 211 The source and destination should not be the same file. 212 213 For a file, this is equivalent to `mkdir -p dest && cp source dest` 214 For a dir, this is equivalent to `mkdir -p dest && cp -r source/* dest` 215 """ 216 217 source = os.path.abspath(raw_source) 218 dest = os.path.abspath(raw_dest) 219 220 if (same(source, dest)): 221 raise ValueError(f"Source and destination of contents copy cannot be the same: '{raw_source}'.") 222 223 if (exists(dest) and (not os.path.isdir(dest))): 224 raise ValueError(f"Destination of contents copy exists and is not a dir: '{raw_dest}'.") 225 226 mkdir(dest) 227 228 if (os.path.isfile(source) or os.path.islink(source)): 229 copy(source, os.path.join(dest, os.path.basename(source)), no_clobber = no_clobber) 230 elif (os.path.isdir(source)): 231 for child in sorted(os.listdir(source)): 232 copy(os.path.join(raw_source, child), os.path.join(raw_dest, child), no_clobber = no_clobber) 233 else: 234 raise ValueError(f"Source of contents copy is not a dir, fie, or link: '{raw_source}'.") 235 236def read_file(raw_path: str, strip: bool = True, encoding: str = DEFAULT_ENCODING) -> str: 237 """ Read the contents of a file. """ 238 239 path = os.path.abspath(raw_path) 240 241 if (not exists(path)): 242 raise ValueError(f"Source of read does not exist: '{raw_path}'.") 243 244 with open(path, 'r', encoding = encoding) as file: 245 contents = file.read() 246 247 if (strip): 248 contents = contents.strip() 249 250 return contents 251 252def write_file( 253 raw_path: str, contents: typing.Union[str, None], 254 strip: bool = True, newline: bool = True, 255 encoding: str = DEFAULT_ENCODING, 256 no_clobber: bool = False) -> None: 257 """ 258 Write the contents of a file. 259 If clobbering, any existing dirent will be removed before write. 260 """ 261 262 path = os.path.abspath(raw_path) 263 264 if (exists(path)): 265 if (no_clobber): 266 raise ValueError(f"Destination of write already exists: '{raw_path}'.") 267 268 remove(path) 269 270 if (contents is None): 271 contents = '' 272 273 if (strip): 274 contents = contents.strip() 275 276 if (newline): 277 contents += "\n" 278 279 with open(path, 'w', encoding = encoding) as file: 280 file.write(contents) 281 282def read_file_bytes(raw_path: str) -> bytes: 283 """ Read the contents of a file as bytes. """ 284 285 path = os.path.abspath(raw_path) 286 287 if (not exists(path)): 288 raise ValueError(f"Source of read bytes does not exist: '{raw_path}'.") 289 290 with open(path, 'rb') as file: 291 return file.read() 292 293def write_file_bytes( 294 raw_path: str, contents: typing.Union[bytes, str, None], 295 no_clobber: bool = False) -> None: 296 """ 297 Write the contents of a file as bytes. 298 If clobbering, any existing dirent will be removed before write. 299 """ 300 301 if (contents is None): 302 contents = b'' 303 304 if (isinstance(contents, str)): 305 contents = contents.encode(DEFAULT_ENCODING) 306 307 path = os.path.abspath(raw_path) 308 309 if (exists(path)): 310 if (no_clobber): 311 raise ValueError(f"Destination of write bytes already exists: '{raw_path}'.") 312 313 remove(path) 314 315 with open(path, 'wb') as file: 316 file.write(contents) 317 318def contains_path(parent: str, child: str) -> bool: 319 """ 320 Check if the parent path contains the child path. 321 This is pure lexical analysis, no dirent stats are checked. 322 Will return false if the (absolute) paths are the same 323 (this function does not allow a path to contain itself). 324 """ 325 326 if ((parent == '') or (child == '')): 327 return False 328 329 parent = os.path.abspath(parent) 330 child = os.path.abspath(child) 331 332 child = os.path.dirname(child) 333 for _ in range(DEPTH_LIMIT): 334 if (parent == child): 335 return True 336 337 new_child = os.path.dirname(child) 338 if (child == new_child): 339 return False 340 341 child = new_child 342 343 raise ValueError("Depth limit reached.")
The default encoding that will be used when reading and writing.
25def exists(path: str) -> bool: 26 """ 27 Check if a path exists. 28 This will transparently call os.path.lexists(), 29 which will include broken links. 30 """ 31 32 return os.path.lexists(path)
Check if a path exists. This will transparently call os.path.lexists(), which will include broken links.
34def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str: 35 """ 36 Get a path to a valid (but not currently existing) temp dirent. 37 If rm is True, then the dirent will be attempted to be deleted on exit 38 (no error will occur if the path is not there). 39 """ 40 41 path = None 42 while ((path is None) or exists(path)): 43 path = os.path.join(tempfile.gettempdir(), prefix + str(uuid.uuid4()) + suffix) 44 45 path = os.path.realpath(path) 46 47 if (rm): 48 atexit.register(remove, path) 49 50 return path
Get a path to a valid (but not currently existing) temp dirent. If rm is True, then the dirent will be attempted to be deleted on exit (no error will occur if the path is not there).
52def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str: 53 """ 54 Get a temp directory. 55 The directory will exist when returned. 56 """ 57 58 path = get_temp_path(prefix = prefix, suffix = suffix, rm = rm) 59 mkdir(path) 60 return path
Get a temp directory. The directory will exist when returned.
62def mkdir(raw_path: str) -> None: 63 """ 64 Make a directory (including any required parent directories). 65 Does not complain if the directory (or parents) already exist 66 (this includes if the directory or parents are links to directories). 67 """ 68 69 path = os.path.abspath(raw_path) 70 71 if (exists(path)): 72 if (os.path.isdir(path)): 73 return 74 75 raise ValueError(f"Target of mkdir already exists, and is not a dir: '{raw_path}'.") 76 77 _check_parent_dirs(raw_path) 78 79 os.makedirs(path, exist_ok = True)
Make a directory (including any required parent directories). Does not complain if the directory (or parents) already exist (this includes if the directory or parents are links to directories).
104def remove(path: str) -> None: 105 """ 106 Remove the given path. 107 The path can be of any type (dir, file, link), 108 and does not need to exist. 109 """ 110 111 if (not exists(path)): 112 return 113 114 if (os.path.isfile(path) or os.path.islink(path)): 115 os.remove(path) 116 elif (os.path.isdir(path)): 117 shutil.rmtree(path) 118 else: 119 raise ValueError(f"Unknown type of dirent: '{path}'.")
Remove the given path. The path can be of any type (dir, file, link), and does not need to exist.
121def same(a: str, b: str) -> bool: 122 """ 123 Check if two paths represent the same dirent. 124 If either (or both) paths do not exist, false will be returned. 125 If either paths are links, they are resolved before checking 126 (so a link and the target file are considered the "same"). 127 """ 128 129 return (exists(a) and exists(b) and os.path.samefile(a, b))
Check if two paths represent the same dirent. If either (or both) paths do not exist, false will be returned. If either paths are links, they are resolved before checking (so a link and the target file are considered the "same").
131def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None: 132 """ 133 Move the source dirent to the given destination. 134 Any existing destination will be removed before moving. 135 """ 136 137 source = os.path.abspath(raw_source) 138 dest = os.path.abspath(raw_dest) 139 140 if (not exists(source)): 141 raise ValueError(f"Source of move does not exist: '{raw_source}'.") 142 143 # If dest is a dir, then resolve the path. 144 if (os.path.isdir(dest)): 145 dest = os.path.abspath(os.path.join(dest, os.path.basename(source))) 146 147 # Skip if this is self. 148 if (same(source, dest)): 149 return 150 151 # Check for clobber. 152 if (exists(dest)): 153 if (no_clobber): 154 raise ValueError(f"Destination of move already exists: '{raw_dest}'.") 155 156 remove(dest) 157 158 # Create any required parents. 159 os.makedirs(os.path.dirname(dest), exist_ok = True) 160 161 shutil.move(source, dest)
Move the source dirent to the given destination. Any existing destination will be removed before moving.
163def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None: 164 """ 165 Copy a dirent or directory to a destination. 166 167 The destination will be overwritten if it exists (and no_clobber is false). 168 For copying the contents of a directory INTO another directory, use copy_contents(). 169 170 No copy is made if the source and dest refer to the same dirent. 171 """ 172 173 source = os.path.abspath(raw_source) 174 dest = os.path.abspath(raw_dest) 175 176 if (same(source, dest)): 177 return 178 179 if (not exists(source)): 180 raise ValueError(f"Source of copy does not exist: '{raw_source}'.") 181 182 if (exists(dest)): 183 if (no_clobber): 184 raise ValueError(f"Destination of copy already exists: '{raw_dest}'.") 185 186 if (contains_path(dest, source)): 187 raise ValueError(f"Destination of copy cannot contain the source. Destination: '{raw_dest}', Source: '{raw_source}'.") 188 189 remove(dest) 190 191 mkdir(os.path.dirname(dest)) 192 193 if (os.path.islink(source)): 194 # shutil.copy2() can generally handle (broken) links, but Windows is inconsistent (between 3.11 and 3.12) on link handling. 195 link_target = os.readlink(source) 196 os.symlink(link_target, dest) 197 elif (os.path.isfile(source)): 198 shutil.copy2(source, dest, follow_symlinks = False) 199 elif (os.path.isdir(source)): 200 mkdir(dest) 201 202 for child in sorted(os.listdir(source)): 203 copy(os.path.join(raw_source, child), os.path.join(raw_dest, child)) 204 else: 205 raise ValueError(f"Source of copy is not a dir, fie, or link: '{raw_source}'.")
Copy a dirent or directory to a destination.
The destination will be overwritten if it exists (and no_clobber is false). For copying the contents of a directory INTO another directory, use copy_contents().
No copy is made if the source and dest refer to the same dirent.
207def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None: 208 """ 209 Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination. 210 If the destination exists, it must be a directory. 211 212 The source and destination should not be the same file. 213 214 For a file, this is equivalent to `mkdir -p dest && cp source dest` 215 For a dir, this is equivalent to `mkdir -p dest && cp -r source/* dest` 216 """ 217 218 source = os.path.abspath(raw_source) 219 dest = os.path.abspath(raw_dest) 220 221 if (same(source, dest)): 222 raise ValueError(f"Source and destination of contents copy cannot be the same: '{raw_source}'.") 223 224 if (exists(dest) and (not os.path.isdir(dest))): 225 raise ValueError(f"Destination of contents copy exists and is not a dir: '{raw_dest}'.") 226 227 mkdir(dest) 228 229 if (os.path.isfile(source) or os.path.islink(source)): 230 copy(source, os.path.join(dest, os.path.basename(source)), no_clobber = no_clobber) 231 elif (os.path.isdir(source)): 232 for child in sorted(os.listdir(source)): 233 copy(os.path.join(raw_source, child), os.path.join(raw_dest, child), no_clobber = no_clobber) 234 else: 235 raise ValueError(f"Source of contents copy is not a dir, fie, or link: '{raw_source}'.")
Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination. If the destination exists, it must be a directory.
The source and destination should not be the same file.
For a file, this is equivalent to mkdir -p dest && cp source dest
For a dir, this is equivalent to mkdir -p dest && cp -r source/* dest
237def read_file(raw_path: str, strip: bool = True, encoding: str = DEFAULT_ENCODING) -> str: 238 """ Read the contents of a file. """ 239 240 path = os.path.abspath(raw_path) 241 242 if (not exists(path)): 243 raise ValueError(f"Source of read does not exist: '{raw_path}'.") 244 245 with open(path, 'r', encoding = encoding) as file: 246 contents = file.read() 247 248 if (strip): 249 contents = contents.strip() 250 251 return contents
Read the contents of a file.
253def write_file( 254 raw_path: str, contents: typing.Union[str, None], 255 strip: bool = True, newline: bool = True, 256 encoding: str = DEFAULT_ENCODING, 257 no_clobber: bool = False) -> None: 258 """ 259 Write the contents of a file. 260 If clobbering, any existing dirent will be removed before write. 261 """ 262 263 path = os.path.abspath(raw_path) 264 265 if (exists(path)): 266 if (no_clobber): 267 raise ValueError(f"Destination of write already exists: '{raw_path}'.") 268 269 remove(path) 270 271 if (contents is None): 272 contents = '' 273 274 if (strip): 275 contents = contents.strip() 276 277 if (newline): 278 contents += "\n" 279 280 with open(path, 'w', encoding = encoding) as file: 281 file.write(contents)
Write the contents of a file. If clobbering, any existing dirent will be removed before write.
283def read_file_bytes(raw_path: str) -> bytes: 284 """ Read the contents of a file as bytes. """ 285 286 path = os.path.abspath(raw_path) 287 288 if (not exists(path)): 289 raise ValueError(f"Source of read bytes does not exist: '{raw_path}'.") 290 291 with open(path, 'rb') as file: 292 return file.read()
Read the contents of a file as bytes.
294def write_file_bytes( 295 raw_path: str, contents: typing.Union[bytes, str, None], 296 no_clobber: bool = False) -> None: 297 """ 298 Write the contents of a file as bytes. 299 If clobbering, any existing dirent will be removed before write. 300 """ 301 302 if (contents is None): 303 contents = b'' 304 305 if (isinstance(contents, str)): 306 contents = contents.encode(DEFAULT_ENCODING) 307 308 path = os.path.abspath(raw_path) 309 310 if (exists(path)): 311 if (no_clobber): 312 raise ValueError(f"Destination of write bytes already exists: '{raw_path}'.") 313 314 remove(path) 315 316 with open(path, 'wb') as file: 317 file.write(contents)
Write the contents of a file as bytes. If clobbering, any existing dirent will be removed before write.
319def contains_path(parent: str, child: str) -> bool: 320 """ 321 Check if the parent path contains the child path. 322 This is pure lexical analysis, no dirent stats are checked. 323 Will return false if the (absolute) paths are the same 324 (this function does not allow a path to contain itself). 325 """ 326 327 if ((parent == '') or (child == '')): 328 return False 329 330 parent = os.path.abspath(parent) 331 child = os.path.abspath(child) 332 333 child = os.path.dirname(child) 334 for _ in range(DEPTH_LIMIT): 335 if (parent == child): 336 return True 337 338 new_child = os.path.dirname(child) 339 if (child == new_child): 340 return False 341 342 child = new_child 343 344 raise ValueError("Depth limit reached.")
Check if the parent path contains the child path. This is pure lexical analysis, no dirent stats are checked. Will return false if the (absolute) paths are the same (this function does not allow a path to contain itself).