edq.util.dirent

Operations relating to directory entries (dirents).

These operations are designed for clarity and compatibility, not performance.

Only directories, files, and links will be handled. Other types of dirents may result in an error being raised.

In general, all recursive operations do not follow symlinks by default and instead treat the link as a file.

  1"""
  2Operations relating to directory entries (dirents).
  3
  4These operations are designed for clarity and compatibility, not performance.
  5
  6Only directories, files, and links will be handled.
  7Other types of dirents may result in an error being raised.
  8
  9In general, all recursive operations do not follow symlinks by default and instead treat the link as a file.
 10"""
 11
 12import atexit
 13import os
 14import shutil
 15import tempfile
 16import typing
 17import uuid
 18
 19DEFAULT_ENCODING: str = 'utf-8'
 20""" The default encoding that will be used when reading and writing. """
 21
 22DEPTH_LIMIT: int = 10000
 23
 24def exists(path: str) -> bool:
 25    """
 26    Check if a path exists.
 27    This will transparently call os.path.lexists(),
 28    which will include broken links.
 29    """
 30
 31    return os.path.lexists(path)
 32
 33def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
 34    """
 35    Get a path to a valid (but not currently existing) temp dirent.
 36    If rm is True, then the dirent will be attempted to be deleted on exit
 37    (no error will occur if the path is not there).
 38    """
 39
 40    path = None
 41    while ((path is None) or exists(path)):
 42        path = os.path.join(tempfile.gettempdir(), prefix + str(uuid.uuid4()) + suffix)
 43
 44    if (rm):
 45        atexit.register(remove, path)
 46
 47    return path
 48
 49def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
 50    """
 51    Get a temp directory.
 52    The directory will exist when returned.
 53    """
 54
 55    path = get_temp_path(prefix = prefix, suffix = suffix, rm = rm)
 56    mkdir(path)
 57    return path
 58
 59def mkdir(raw_path: str) -> None:
 60    """
 61    Make a directory (including any required parent directories).
 62    Does not complain if the directory (or parents) already exist
 63    (this includes if the directory or parents are links to directories).
 64    """
 65
 66    path = os.path.abspath(raw_path)
 67
 68    if (exists(path)):
 69        if (os.path.isdir(path)):
 70            return
 71
 72        raise ValueError(f"Target of mkdir already exists, and is not a dir: '{raw_path}'.")
 73
 74    _check_parent_dirs(raw_path)
 75
 76    os.makedirs(path, exist_ok = True)
 77
 78def _check_parent_dirs(raw_path: str) -> None:
 79    """
 80    Check all parents to ensure that they are all dirs (or don't exist).
 81    This is naturally handled by os.makedirs(),
 82    but the error messages are not consistent between POSIX and Windows.
 83    """
 84
 85    path = os.path.abspath(raw_path)
 86
 87    parent_path = path
 88    for _ in range(DEPTH_LIMIT):
 89        new_parent_path = os.path.dirname(parent_path)
 90        if (parent_path == new_parent_path):
 91            # We have reached root (are our own parent).
 92            return
 93
 94        parent_path = new_parent_path
 95
 96        if (os.path.exists(parent_path) and (not os.path.isdir(parent_path))):
 97            raise ValueError(f"Target of mkdir contains parent ('{os.path.basename(parent_path)}') that exists and is not a dir: '{raw_path}'.")
 98
 99    raise ValueError("Depth limit reached.")
100
101def remove(path: str) -> None:
102    """
103    Remove the given path.
104    The path can be of any type (dir, file, link),
105    and does not need to exist.
106    """
107
108    if (not exists(path)):
109        return
110
111    if (os.path.isfile(path) or os.path.islink(path)):
112        os.remove(path)
113    elif (os.path.isdir(path)):
114        shutil.rmtree(path)
115    else:
116        raise ValueError(f"Unknown type of dirent: '{path}'.")
117
118def same(a: str, b: str) -> bool:
119    """
120    Check if two paths represent the same dirent.
121    If either (or both) paths do not exist, false will be returned.
122    If either paths are links, they are resolved before checking
123    (so a link and the target file are considered the "same").
124    """
125
126    return (exists(a) and exists(b) and os.path.samefile(a, b))
127
128def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
129    """
130    Move the source dirent to the given destination.
131    Any existing destination will be removed before moving.
132    """
133
134    source = os.path.abspath(raw_source)
135    dest = os.path.abspath(raw_dest)
136
137    if (not exists(source)):
138        raise ValueError(f"Source of move does not exist: '{raw_source}'.")
139
140    # If dest is a dir, then resolve the path.
141    if (os.path.isdir(dest)):
142        dest = os.path.abspath(os.path.join(dest, os.path.basename(source)))
143
144    # Skip if this is self.
145    if (same(source, dest)):
146        return
147
148    # Check for clobber.
149    if (exists(dest)):
150        if (no_clobber):
151            raise ValueError(f"Destination of move already exists: '{raw_dest}'.")
152
153        remove(dest)
154
155    # Create any required parents.
156    os.makedirs(os.path.dirname(dest), exist_ok = True)
157
158    shutil.move(source, dest)
159
160def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
161    """
162    Copy a dirent or directory to a destination.
163
164    The destination will be overwritten if it exists (and no_clobber is false).
165    For copying the contents of a directory INTO another directory, use copy_contents().
166
167    No copy is made if the source and dest refer to the same dirent.
168    """
169
170    source = os.path.abspath(raw_source)
171    dest = os.path.abspath(raw_dest)
172
173    if (same(source, dest)):
174        return
175
176    if (not exists(source)):
177        raise ValueError(f"Source of copy does not exist: '{raw_source}'.")
178
179    if (exists(dest)):
180        if (no_clobber):
181            raise ValueError(f"Destination of copy already exists: '{raw_dest}'.")
182
183        if (contains_path(dest, source)):
184            raise ValueError(f"Destination of copy cannot contain the source. Destination: '{raw_dest}', Source: '{raw_source}'.")
185
186        remove(dest)
187
188    mkdir(os.path.dirname(dest))
189
190    if (os.path.islink(source)):
191        # shutil.copy2() can generally handle (broken) links, but Windows is inconsistent (between 3.11 and 3.12) on link handling.
192        link_target = os.readlink(source)
193        os.symlink(link_target, dest)
194    elif (os.path.isfile(source)):
195        shutil.copy2(source, dest, follow_symlinks = False)
196    elif (os.path.isdir(source)):
197        mkdir(dest)
198
199        for child in sorted(os.listdir(source)):
200            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child))
201    else:
202        raise ValueError(f"Source of copy is not a dir, fie, or link: '{raw_source}'.")
203
204def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
205    """
206    Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination.
207    If the destination exists, it must be a directory.
208
209    The source and destination should not be the same file.
210
211    For a file, this is equivalent to `mkdir -p dest && cp source dest`
212    For a dir, this is equivalent to `mkdir -p dest && cp -r source/* dest`
213    """
214
215    source = os.path.abspath(raw_source)
216    dest = os.path.abspath(raw_dest)
217
218    if (same(source, dest)):
219        raise ValueError(f"Source and destination of contents copy cannot be the same: '{raw_source}'.")
220
221    if (exists(dest) and (not os.path.isdir(dest))):
222        raise ValueError(f"Destination of contents copy exists and is not a dir: '{raw_dest}'.")
223
224    mkdir(dest)
225
226    if (os.path.isfile(source) or os.path.islink(source)):
227        copy(source, os.path.join(dest, os.path.basename(source)), no_clobber = no_clobber)
228    elif (os.path.isdir(source)):
229        for child in sorted(os.listdir(source)):
230            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child), no_clobber = no_clobber)
231    else:
232        raise ValueError(f"Source of contents copy is not a dir, fie, or link: '{raw_source}'.")
233
234def read_file(raw_path: str, strip: bool = True, encoding: str = DEFAULT_ENCODING) -> str:
235    """ Read the contents of a file. """
236
237    path = os.path.abspath(raw_path)
238
239    if (not exists(path)):
240        raise ValueError(f"Source of read does not exist: '{raw_path}'.")
241
242    with open(path, 'r', encoding = encoding) as file:
243        contents = file.read()
244
245    if (strip):
246        contents = contents.strip()
247
248    return contents
249
250def write_file(
251        raw_path: str, contents: typing.Union[str, None],
252        strip: bool = True, newline: bool = True,
253        encoding: str = DEFAULT_ENCODING,
254        no_clobber: bool = False) -> None:
255    """
256    Write the contents of a file.
257    If clobbering, any existing dirent will be removed before write.
258    """
259
260    path = os.path.abspath(raw_path)
261
262    if (exists(path)):
263        if (no_clobber):
264            raise ValueError(f"Destination of write already exists: '{raw_path}'.")
265
266        remove(path)
267
268    if (contents is None):
269        contents = ''
270
271    if (strip):
272        contents = contents.strip()
273
274    if (newline):
275        contents += "\n"
276
277    with open(path, 'w', encoding = encoding) as file:
278        file.write(contents)
279
280def read_file_bytes(raw_path: str) -> bytes:
281    """ Read the contents of a file as bytes. """
282
283    path = os.path.abspath(raw_path)
284
285    if (not exists(path)):
286        raise ValueError(f"Source of read bytes does not exist: '{raw_path}'.")
287
288    with open(path, 'rb') as file:
289        return file.read()
290
291def write_file_bytes(
292        raw_path: str, contents: typing.Union[bytes, None],
293        no_clobber: bool = False) -> None:
294    """
295    Write the contents of a file as bytes.
296    If clobbering, any existing dirent will be removed before write.
297    """
298
299    path = os.path.abspath(raw_path)
300
301    if (exists(path)):
302        if (no_clobber):
303            raise ValueError(f"Destination of write bytes already exists: '{raw_path}'.")
304
305        remove(path)
306
307    if (contents is None):
308        contents = b''
309
310    with open(path, 'wb') as file:
311        file.write(contents)
312
313def contains_path(parent: str, child: str) -> bool:
314    """
315    Check if the parent path contains the child path.
316    This is pure lexical analysis, no dirent stats are checked.
317    Will return false if the (absolute) paths are the same
318    (this function does not allow a path to contain itself).
319    """
320
321    if ((parent == '') or (child == '')):
322        return False
323
324    parent = os.path.abspath(parent)
325    child = os.path.abspath(child)
326
327    child = os.path.dirname(child)
328    for _ in range(DEPTH_LIMIT):
329        if (parent == child):
330            return True
331
332        new_child = os.path.dirname(child)
333        if (child == new_child):
334            return False
335
336        child = new_child
337
338    raise ValueError("Depth limit reached.")
DEFAULT_ENCODING: str = 'utf-8'

The default encoding that will be used when reading and writing.

DEPTH_LIMIT: int = 10000
def exists(path: str) -> bool:
25def exists(path: str) -> bool:
26    """
27    Check if a path exists.
28    This will transparently call os.path.lexists(),
29    which will include broken links.
30    """
31
32    return os.path.lexists(path)

Check if a path exists. This will transparently call os.path.lexists(), which will include broken links.

def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
34def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
35    """
36    Get a path to a valid (but not currently existing) temp dirent.
37    If rm is True, then the dirent will be attempted to be deleted on exit
38    (no error will occur if the path is not there).
39    """
40
41    path = None
42    while ((path is None) or exists(path)):
43        path = os.path.join(tempfile.gettempdir(), prefix + str(uuid.uuid4()) + suffix)
44
45    if (rm):
46        atexit.register(remove, path)
47
48    return path

Get a path to a valid (but not currently existing) temp dirent. If rm is True, then the dirent will be attempted to be deleted on exit (no error will occur if the path is not there).

def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
50def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
51    """
52    Get a temp directory.
53    The directory will exist when returned.
54    """
55
56    path = get_temp_path(prefix = prefix, suffix = suffix, rm = rm)
57    mkdir(path)
58    return path

Get a temp directory. The directory will exist when returned.

def mkdir(raw_path: str) -> None:
60def mkdir(raw_path: str) -> None:
61    """
62    Make a directory (including any required parent directories).
63    Does not complain if the directory (or parents) already exist
64    (this includes if the directory or parents are links to directories).
65    """
66
67    path = os.path.abspath(raw_path)
68
69    if (exists(path)):
70        if (os.path.isdir(path)):
71            return
72
73        raise ValueError(f"Target of mkdir already exists, and is not a dir: '{raw_path}'.")
74
75    _check_parent_dirs(raw_path)
76
77    os.makedirs(path, exist_ok = True)

Make a directory (including any required parent directories). Does not complain if the directory (or parents) already exist (this includes if the directory or parents are links to directories).

def remove(path: str) -> None:
102def remove(path: str) -> None:
103    """
104    Remove the given path.
105    The path can be of any type (dir, file, link),
106    and does not need to exist.
107    """
108
109    if (not exists(path)):
110        return
111
112    if (os.path.isfile(path) or os.path.islink(path)):
113        os.remove(path)
114    elif (os.path.isdir(path)):
115        shutil.rmtree(path)
116    else:
117        raise ValueError(f"Unknown type of dirent: '{path}'.")

Remove the given path. The path can be of any type (dir, file, link), and does not need to exist.

def same(a: str, b: str) -> bool:
119def same(a: str, b: str) -> bool:
120    """
121    Check if two paths represent the same dirent.
122    If either (or both) paths do not exist, false will be returned.
123    If either paths are links, they are resolved before checking
124    (so a link and the target file are considered the "same").
125    """
126
127    return (exists(a) and exists(b) and os.path.samefile(a, b))

Check if two paths represent the same dirent. If either (or both) paths do not exist, false will be returned. If either paths are links, they are resolved before checking (so a link and the target file are considered the "same").

def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
129def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
130    """
131    Move the source dirent to the given destination.
132    Any existing destination will be removed before moving.
133    """
134
135    source = os.path.abspath(raw_source)
136    dest = os.path.abspath(raw_dest)
137
138    if (not exists(source)):
139        raise ValueError(f"Source of move does not exist: '{raw_source}'.")
140
141    # If dest is a dir, then resolve the path.
142    if (os.path.isdir(dest)):
143        dest = os.path.abspath(os.path.join(dest, os.path.basename(source)))
144
145    # Skip if this is self.
146    if (same(source, dest)):
147        return
148
149    # Check for clobber.
150    if (exists(dest)):
151        if (no_clobber):
152            raise ValueError(f"Destination of move already exists: '{raw_dest}'.")
153
154        remove(dest)
155
156    # Create any required parents.
157    os.makedirs(os.path.dirname(dest), exist_ok = True)
158
159    shutil.move(source, dest)

Move the source dirent to the given destination. Any existing destination will be removed before moving.

def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
161def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
162    """
163    Copy a dirent or directory to a destination.
164
165    The destination will be overwritten if it exists (and no_clobber is false).
166    For copying the contents of a directory INTO another directory, use copy_contents().
167
168    No copy is made if the source and dest refer to the same dirent.
169    """
170
171    source = os.path.abspath(raw_source)
172    dest = os.path.abspath(raw_dest)
173
174    if (same(source, dest)):
175        return
176
177    if (not exists(source)):
178        raise ValueError(f"Source of copy does not exist: '{raw_source}'.")
179
180    if (exists(dest)):
181        if (no_clobber):
182            raise ValueError(f"Destination of copy already exists: '{raw_dest}'.")
183
184        if (contains_path(dest, source)):
185            raise ValueError(f"Destination of copy cannot contain the source. Destination: '{raw_dest}', Source: '{raw_source}'.")
186
187        remove(dest)
188
189    mkdir(os.path.dirname(dest))
190
191    if (os.path.islink(source)):
192        # shutil.copy2() can generally handle (broken) links, but Windows is inconsistent (between 3.11 and 3.12) on link handling.
193        link_target = os.readlink(source)
194        os.symlink(link_target, dest)
195    elif (os.path.isfile(source)):
196        shutil.copy2(source, dest, follow_symlinks = False)
197    elif (os.path.isdir(source)):
198        mkdir(dest)
199
200        for child in sorted(os.listdir(source)):
201            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child))
202    else:
203        raise ValueError(f"Source of copy is not a dir, fie, or link: '{raw_source}'.")

Copy a dirent or directory to a destination.

The destination will be overwritten if it exists (and no_clobber is false). For copying the contents of a directory INTO another directory, use copy_contents().

No copy is made if the source and dest refer to the same dirent.

def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
205def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
206    """
207    Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination.
208    If the destination exists, it must be a directory.
209
210    The source and destination should not be the same file.
211
212    For a file, this is equivalent to `mkdir -p dest && cp source dest`
213    For a dir, this is equivalent to `mkdir -p dest && cp -r source/* dest`
214    """
215
216    source = os.path.abspath(raw_source)
217    dest = os.path.abspath(raw_dest)
218
219    if (same(source, dest)):
220        raise ValueError(f"Source and destination of contents copy cannot be the same: '{raw_source}'.")
221
222    if (exists(dest) and (not os.path.isdir(dest))):
223        raise ValueError(f"Destination of contents copy exists and is not a dir: '{raw_dest}'.")
224
225    mkdir(dest)
226
227    if (os.path.isfile(source) or os.path.islink(source)):
228        copy(source, os.path.join(dest, os.path.basename(source)), no_clobber = no_clobber)
229    elif (os.path.isdir(source)):
230        for child in sorted(os.listdir(source)):
231            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child), no_clobber = no_clobber)
232    else:
233        raise ValueError(f"Source of contents copy is not a dir, fie, or link: '{raw_source}'.")

Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination. If the destination exists, it must be a directory.

The source and destination should not be the same file.

For a file, this is equivalent to mkdir -p dest && cp source dest For a dir, this is equivalent to mkdir -p dest && cp -r source/* dest

def read_file(raw_path: str, strip: bool = True, encoding: str = 'utf-8') -> str:
235def read_file(raw_path: str, strip: bool = True, encoding: str = DEFAULT_ENCODING) -> str:
236    """ Read the contents of a file. """
237
238    path = os.path.abspath(raw_path)
239
240    if (not exists(path)):
241        raise ValueError(f"Source of read does not exist: '{raw_path}'.")
242
243    with open(path, 'r', encoding = encoding) as file:
244        contents = file.read()
245
246    if (strip):
247        contents = contents.strip()
248
249    return contents

Read the contents of a file.

def write_file( raw_path: str, contents: Optional[str], strip: bool = True, newline: bool = True, encoding: str = 'utf-8', no_clobber: bool = False) -> None:
251def write_file(
252        raw_path: str, contents: typing.Union[str, None],
253        strip: bool = True, newline: bool = True,
254        encoding: str = DEFAULT_ENCODING,
255        no_clobber: bool = False) -> None:
256    """
257    Write the contents of a file.
258    If clobbering, any existing dirent will be removed before write.
259    """
260
261    path = os.path.abspath(raw_path)
262
263    if (exists(path)):
264        if (no_clobber):
265            raise ValueError(f"Destination of write already exists: '{raw_path}'.")
266
267        remove(path)
268
269    if (contents is None):
270        contents = ''
271
272    if (strip):
273        contents = contents.strip()
274
275    if (newline):
276        contents += "\n"
277
278    with open(path, 'w', encoding = encoding) as file:
279        file.write(contents)

Write the contents of a file. If clobbering, any existing dirent will be removed before write.

def read_file_bytes(raw_path: str) -> bytes:
281def read_file_bytes(raw_path: str) -> bytes:
282    """ Read the contents of a file as bytes. """
283
284    path = os.path.abspath(raw_path)
285
286    if (not exists(path)):
287        raise ValueError(f"Source of read bytes does not exist: '{raw_path}'.")
288
289    with open(path, 'rb') as file:
290        return file.read()

Read the contents of a file as bytes.

def write_file_bytes( raw_path: str, contents: Optional[bytes], no_clobber: bool = False) -> None:
292def write_file_bytes(
293        raw_path: str, contents: typing.Union[bytes, None],
294        no_clobber: bool = False) -> None:
295    """
296    Write the contents of a file as bytes.
297    If clobbering, any existing dirent will be removed before write.
298    """
299
300    path = os.path.abspath(raw_path)
301
302    if (exists(path)):
303        if (no_clobber):
304            raise ValueError(f"Destination of write bytes already exists: '{raw_path}'.")
305
306        remove(path)
307
308    if (contents is None):
309        contents = b''
310
311    with open(path, 'wb') as file:
312        file.write(contents)

Write the contents of a file as bytes. If clobbering, any existing dirent will be removed before write.

def contains_path(parent: str, child: str) -> bool:
314def contains_path(parent: str, child: str) -> bool:
315    """
316    Check if the parent path contains the child path.
317    This is pure lexical analysis, no dirent stats are checked.
318    Will return false if the (absolute) paths are the same
319    (this function does not allow a path to contain itself).
320    """
321
322    if ((parent == '') or (child == '')):
323        return False
324
325    parent = os.path.abspath(parent)
326    child = os.path.abspath(child)
327
328    child = os.path.dirname(child)
329    for _ in range(DEPTH_LIMIT):
330        if (parent == child):
331            return True
332
333        new_child = os.path.dirname(child)
334        if (child == new_child):
335            return False
336
337        child = new_child
338
339    raise ValueError("Depth limit reached.")

Check if the parent path contains the child path. This is pure lexical analysis, no dirent stats are checked. Will return false if the (absolute) paths are the same (this function does not allow a path to contain itself).