edq.util.dirent

Operations relating to directory entries (dirents).

These operations are designed for clarity and compatibility, not performance.

Only directories, files, and links will be handled. Other types of dirents may result in an error being raised.

In general, all recursive operations do not follow symlinks by default and instead treat the link as a file.

  1"""
  2Operations relating to directory entries (dirents).
  3
  4These operations are designed for clarity and compatibility, not performance.
  5
  6Only directories, files, and links will be handled.
  7Other types of dirents may result in an error being raised.
  8
  9In general, all recursive operations do not follow symlinks by default and instead treat the link as a file.
 10"""
 11
 12import atexit
 13import os
 14import shutil
 15import tempfile
 16import typing
 17import uuid
 18
 19DEFAULT_ENCODING: str = 'utf-8'
 20""" The default encoding that will be used when reading and writing. """
 21
 22DEPTH_LIMIT: int = 10000
 23
 24def exists(path: str) -> bool:
 25    """
 26    Check if a path exists.
 27    This will transparently call os.path.lexists(),
 28    which will include broken links.
 29    """
 30
 31    return os.path.lexists(path)
 32
 33def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
 34    """
 35    Get a path to a valid (but not currently existing) temp dirent.
 36    If rm is True, then the dirent will be attempted to be deleted on exit
 37    (no error will occur if the path is not there).
 38    """
 39
 40    path = None
 41    while ((path is None) or exists(path)):
 42        path = os.path.join(tempfile.gettempdir(), prefix + str(uuid.uuid4()) + suffix)
 43
 44    path = os.path.realpath(path)
 45
 46    if (rm):
 47        atexit.register(remove, path)
 48
 49    return path
 50
 51def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
 52    """
 53    Get a temp directory.
 54    The directory will exist when returned.
 55    """
 56
 57    path = get_temp_path(prefix = prefix, suffix = suffix, rm = rm)
 58    mkdir(path)
 59    return path
 60
 61def mkdir(raw_path: str) -> None:
 62    """
 63    Make a directory (including any required parent directories).
 64    Does not complain if the directory (or parents) already exist
 65    (this includes if the directory or parents are links to directories).
 66    """
 67
 68    path = os.path.abspath(raw_path)
 69
 70    if (exists(path)):
 71        if (os.path.isdir(path)):
 72            return
 73
 74        raise ValueError(f"Target of mkdir already exists, and is not a dir: '{raw_path}'.")
 75
 76    _check_parent_dirs(raw_path)
 77
 78    os.makedirs(path, exist_ok = True)
 79
 80def _check_parent_dirs(raw_path: str) -> None:
 81    """
 82    Check all parents to ensure that they are all dirs (or don't exist).
 83    This is naturally handled by os.makedirs(),
 84    but the error messages are not consistent between POSIX and Windows.
 85    """
 86
 87    path = os.path.abspath(raw_path)
 88
 89    parent_path = path
 90    for _ in range(DEPTH_LIMIT):
 91        new_parent_path = os.path.dirname(parent_path)
 92        if (parent_path == new_parent_path):
 93            # We have reached root (are our own parent).
 94            return
 95
 96        parent_path = new_parent_path
 97
 98        if (os.path.exists(parent_path) and (not os.path.isdir(parent_path))):
 99            raise ValueError(f"Target of mkdir contains parent ('{os.path.basename(parent_path)}') that exists and is not a dir: '{raw_path}'.")
100
101    raise ValueError("Depth limit reached.")
102
103def remove(path: str) -> None:
104    """
105    Remove the given path.
106    The path can be of any type (dir, file, link),
107    and does not need to exist.
108    """
109
110    if (not exists(path)):
111        return
112
113    if (os.path.isfile(path) or os.path.islink(path)):
114        os.remove(path)
115    elif (os.path.isdir(path)):
116        shutil.rmtree(path)
117    else:
118        raise ValueError(f"Unknown type of dirent: '{path}'.")
119
120def same(a: str, b: str) -> bool:
121    """
122    Check if two paths represent the same dirent.
123    If either (or both) paths do not exist, false will be returned.
124    If either paths are links, they are resolved before checking
125    (so a link and the target file are considered the "same").
126    """
127
128    return (exists(a) and exists(b) and os.path.samefile(a, b))
129
130def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
131    """
132    Move the source dirent to the given destination.
133    Any existing destination will be removed before moving.
134    """
135
136    source = os.path.abspath(raw_source)
137    dest = os.path.abspath(raw_dest)
138
139    if (not exists(source)):
140        raise ValueError(f"Source of move does not exist: '{raw_source}'.")
141
142    # If dest is a dir, then resolve the path.
143    if (os.path.isdir(dest)):
144        dest = os.path.abspath(os.path.join(dest, os.path.basename(source)))
145
146    # Skip if this is self.
147    if (same(source, dest)):
148        return
149
150    # Check for clobber.
151    if (exists(dest)):
152        if (no_clobber):
153            raise ValueError(f"Destination of move already exists: '{raw_dest}'.")
154
155        remove(dest)
156
157    # Create any required parents.
158    os.makedirs(os.path.dirname(dest), exist_ok = True)
159
160    shutil.move(source, dest)
161
162def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
163    """
164    Copy a dirent or directory to a destination.
165
166    The destination will be overwritten if it exists (and no_clobber is false).
167    For copying the contents of a directory INTO another directory, use copy_contents().
168
169    No copy is made if the source and dest refer to the same dirent.
170    """
171
172    source = os.path.abspath(raw_source)
173    dest = os.path.abspath(raw_dest)
174
175    if (same(source, dest)):
176        return
177
178    if (not exists(source)):
179        raise ValueError(f"Source of copy does not exist: '{raw_source}'.")
180
181    if (exists(dest)):
182        if (no_clobber):
183            raise ValueError(f"Destination of copy already exists: '{raw_dest}'.")
184
185        if (contains_path(dest, source)):
186            raise ValueError(f"Destination of copy cannot contain the source. Destination: '{raw_dest}', Source: '{raw_source}'.")
187
188        remove(dest)
189
190    mkdir(os.path.dirname(dest))
191
192    if (os.path.islink(source)):
193        # shutil.copy2() can generally handle (broken) links, but Windows is inconsistent (between 3.11 and 3.12) on link handling.
194        link_target = os.readlink(source)
195        os.symlink(link_target, dest)
196    elif (os.path.isfile(source)):
197        shutil.copy2(source, dest, follow_symlinks = False)
198    elif (os.path.isdir(source)):
199        mkdir(dest)
200
201        for child in sorted(os.listdir(source)):
202            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child))
203    else:
204        raise ValueError(f"Source of copy is not a dir, fie, or link: '{raw_source}'.")
205
206def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
207    """
208    Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination.
209    If the destination exists, it must be a directory.
210
211    The source and destination should not be the same file.
212
213    For a file, this is equivalent to `mkdir -p dest && cp source dest`
214    For a dir, this is equivalent to `mkdir -p dest && cp -r source/* dest`
215    """
216
217    source = os.path.abspath(raw_source)
218    dest = os.path.abspath(raw_dest)
219
220    if (same(source, dest)):
221        raise ValueError(f"Source and destination of contents copy cannot be the same: '{raw_source}'.")
222
223    if (exists(dest) and (not os.path.isdir(dest))):
224        raise ValueError(f"Destination of contents copy exists and is not a dir: '{raw_dest}'.")
225
226    mkdir(dest)
227
228    if (os.path.isfile(source) or os.path.islink(source)):
229        copy(source, os.path.join(dest, os.path.basename(source)), no_clobber = no_clobber)
230    elif (os.path.isdir(source)):
231        for child in sorted(os.listdir(source)):
232            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child), no_clobber = no_clobber)
233    else:
234        raise ValueError(f"Source of contents copy is not a dir, fie, or link: '{raw_source}'.")
235
236def read_file(raw_path: str, strip: bool = True, encoding: str = DEFAULT_ENCODING) -> str:
237    """ Read the contents of a file. """
238
239    path = os.path.abspath(raw_path)
240
241    if (not exists(path)):
242        raise ValueError(f"Source of read does not exist: '{raw_path}'.")
243
244    with open(path, 'r', encoding = encoding) as file:
245        contents = file.read()
246
247    if (strip):
248        contents = contents.strip()
249
250    return contents
251
252def write_file(
253        raw_path: str, contents: typing.Union[str, None],
254        strip: bool = True, newline: bool = True,
255        encoding: str = DEFAULT_ENCODING,
256        no_clobber: bool = False) -> None:
257    """
258    Write the contents of a file.
259    If clobbering, any existing dirent will be removed before write.
260    """
261
262    path = os.path.abspath(raw_path)
263
264    if (exists(path)):
265        if (no_clobber):
266            raise ValueError(f"Destination of write already exists: '{raw_path}'.")
267
268        remove(path)
269
270    if (contents is None):
271        contents = ''
272
273    if (strip):
274        contents = contents.strip()
275
276    if (newline):
277        contents += "\n"
278
279    with open(path, 'w', encoding = encoding) as file:
280        file.write(contents)
281
282def read_file_bytes(raw_path: str) -> bytes:
283    """ Read the contents of a file as bytes. """
284
285    path = os.path.abspath(raw_path)
286
287    if (not exists(path)):
288        raise ValueError(f"Source of read bytes does not exist: '{raw_path}'.")
289
290    with open(path, 'rb') as file:
291        return file.read()
292
293def write_file_bytes(
294        raw_path: str, contents: typing.Union[bytes, str, None],
295        no_clobber: bool = False) -> None:
296    """
297    Write the contents of a file as bytes.
298    If clobbering, any existing dirent will be removed before write.
299    """
300
301    if (contents is None):
302        contents = b''
303
304    if (isinstance(contents, str)):
305        contents = contents.encode(DEFAULT_ENCODING)
306
307    path = os.path.abspath(raw_path)
308
309    if (exists(path)):
310        if (no_clobber):
311            raise ValueError(f"Destination of write bytes already exists: '{raw_path}'.")
312
313        remove(path)
314
315    with open(path, 'wb') as file:
316        file.write(contents)
317
318def contains_path(parent: str, child: str) -> bool:
319    """
320    Check if the parent path contains the child path.
321    This is pure lexical analysis, no dirent stats are checked.
322    Will return false if the (absolute) paths are the same
323    (this function does not allow a path to contain itself).
324    """
325
326    if ((parent == '') or (child == '')):
327        return False
328
329    parent = os.path.abspath(parent)
330    child = os.path.abspath(child)
331
332    child = os.path.dirname(child)
333    for _ in range(DEPTH_LIMIT):
334        if (parent == child):
335            return True
336
337        new_child = os.path.dirname(child)
338        if (child == new_child):
339            return False
340
341        child = new_child
342
343    raise ValueError("Depth limit reached.")
DEFAULT_ENCODING: str = 'utf-8'

The default encoding that will be used when reading and writing.

DEPTH_LIMIT: int = 10000
def exists(path: str) -> bool:
25def exists(path: str) -> bool:
26    """
27    Check if a path exists.
28    This will transparently call os.path.lexists(),
29    which will include broken links.
30    """
31
32    return os.path.lexists(path)

Check if a path exists. This will transparently call os.path.lexists(), which will include broken links.

def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
34def get_temp_path(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
35    """
36    Get a path to a valid (but not currently existing) temp dirent.
37    If rm is True, then the dirent will be attempted to be deleted on exit
38    (no error will occur if the path is not there).
39    """
40
41    path = None
42    while ((path is None) or exists(path)):
43        path = os.path.join(tempfile.gettempdir(), prefix + str(uuid.uuid4()) + suffix)
44
45    path = os.path.realpath(path)
46
47    if (rm):
48        atexit.register(remove, path)
49
50    return path

Get a path to a valid (but not currently existing) temp dirent. If rm is True, then the dirent will be attempted to be deleted on exit (no error will occur if the path is not there).

def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
52def get_temp_dir(prefix: str = '', suffix: str = '', rm: bool = True) -> str:
53    """
54    Get a temp directory.
55    The directory will exist when returned.
56    """
57
58    path = get_temp_path(prefix = prefix, suffix = suffix, rm = rm)
59    mkdir(path)
60    return path

Get a temp directory. The directory will exist when returned.

def mkdir(raw_path: str) -> None:
62def mkdir(raw_path: str) -> None:
63    """
64    Make a directory (including any required parent directories).
65    Does not complain if the directory (or parents) already exist
66    (this includes if the directory or parents are links to directories).
67    """
68
69    path = os.path.abspath(raw_path)
70
71    if (exists(path)):
72        if (os.path.isdir(path)):
73            return
74
75        raise ValueError(f"Target of mkdir already exists, and is not a dir: '{raw_path}'.")
76
77    _check_parent_dirs(raw_path)
78
79    os.makedirs(path, exist_ok = True)

Make a directory (including any required parent directories). Does not complain if the directory (or parents) already exist (this includes if the directory or parents are links to directories).

def remove(path: str) -> None:
104def remove(path: str) -> None:
105    """
106    Remove the given path.
107    The path can be of any type (dir, file, link),
108    and does not need to exist.
109    """
110
111    if (not exists(path)):
112        return
113
114    if (os.path.isfile(path) or os.path.islink(path)):
115        os.remove(path)
116    elif (os.path.isdir(path)):
117        shutil.rmtree(path)
118    else:
119        raise ValueError(f"Unknown type of dirent: '{path}'.")

Remove the given path. The path can be of any type (dir, file, link), and does not need to exist.

def same(a: str, b: str) -> bool:
121def same(a: str, b: str) -> bool:
122    """
123    Check if two paths represent the same dirent.
124    If either (or both) paths do not exist, false will be returned.
125    If either paths are links, they are resolved before checking
126    (so a link and the target file are considered the "same").
127    """
128
129    return (exists(a) and exists(b) and os.path.samefile(a, b))

Check if two paths represent the same dirent. If either (or both) paths do not exist, false will be returned. If either paths are links, they are resolved before checking (so a link and the target file are considered the "same").

def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
131def move(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
132    """
133    Move the source dirent to the given destination.
134    Any existing destination will be removed before moving.
135    """
136
137    source = os.path.abspath(raw_source)
138    dest = os.path.abspath(raw_dest)
139
140    if (not exists(source)):
141        raise ValueError(f"Source of move does not exist: '{raw_source}'.")
142
143    # If dest is a dir, then resolve the path.
144    if (os.path.isdir(dest)):
145        dest = os.path.abspath(os.path.join(dest, os.path.basename(source)))
146
147    # Skip if this is self.
148    if (same(source, dest)):
149        return
150
151    # Check for clobber.
152    if (exists(dest)):
153        if (no_clobber):
154            raise ValueError(f"Destination of move already exists: '{raw_dest}'.")
155
156        remove(dest)
157
158    # Create any required parents.
159    os.makedirs(os.path.dirname(dest), exist_ok = True)
160
161    shutil.move(source, dest)

Move the source dirent to the given destination. Any existing destination will be removed before moving.

def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
163def copy(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
164    """
165    Copy a dirent or directory to a destination.
166
167    The destination will be overwritten if it exists (and no_clobber is false).
168    For copying the contents of a directory INTO another directory, use copy_contents().
169
170    No copy is made if the source and dest refer to the same dirent.
171    """
172
173    source = os.path.abspath(raw_source)
174    dest = os.path.abspath(raw_dest)
175
176    if (same(source, dest)):
177        return
178
179    if (not exists(source)):
180        raise ValueError(f"Source of copy does not exist: '{raw_source}'.")
181
182    if (exists(dest)):
183        if (no_clobber):
184            raise ValueError(f"Destination of copy already exists: '{raw_dest}'.")
185
186        if (contains_path(dest, source)):
187            raise ValueError(f"Destination of copy cannot contain the source. Destination: '{raw_dest}', Source: '{raw_source}'.")
188
189        remove(dest)
190
191    mkdir(os.path.dirname(dest))
192
193    if (os.path.islink(source)):
194        # shutil.copy2() can generally handle (broken) links, but Windows is inconsistent (between 3.11 and 3.12) on link handling.
195        link_target = os.readlink(source)
196        os.symlink(link_target, dest)
197    elif (os.path.isfile(source)):
198        shutil.copy2(source, dest, follow_symlinks = False)
199    elif (os.path.isdir(source)):
200        mkdir(dest)
201
202        for child in sorted(os.listdir(source)):
203            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child))
204    else:
205        raise ValueError(f"Source of copy is not a dir, fie, or link: '{raw_source}'.")

Copy a dirent or directory to a destination.

The destination will be overwritten if it exists (and no_clobber is false). For copying the contents of a directory INTO another directory, use copy_contents().

No copy is made if the source and dest refer to the same dirent.

def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
207def copy_contents(raw_source: str, raw_dest: str, no_clobber: bool = False) -> None:
208    """
209    Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination.
210    If the destination exists, it must be a directory.
211
212    The source and destination should not be the same file.
213
214    For a file, this is equivalent to `mkdir -p dest && cp source dest`
215    For a dir, this is equivalent to `mkdir -p dest && cp -r source/* dest`
216    """
217
218    source = os.path.abspath(raw_source)
219    dest = os.path.abspath(raw_dest)
220
221    if (same(source, dest)):
222        raise ValueError(f"Source and destination of contents copy cannot be the same: '{raw_source}'.")
223
224    if (exists(dest) and (not os.path.isdir(dest))):
225        raise ValueError(f"Destination of contents copy exists and is not a dir: '{raw_dest}'.")
226
227    mkdir(dest)
228
229    if (os.path.isfile(source) or os.path.islink(source)):
230        copy(source, os.path.join(dest, os.path.basename(source)), no_clobber = no_clobber)
231    elif (os.path.isdir(source)):
232        for child in sorted(os.listdir(source)):
233            copy(os.path.join(raw_source, child), os.path.join(raw_dest, child), no_clobber = no_clobber)
234    else:
235        raise ValueError(f"Source of contents copy is not a dir, fie, or link: '{raw_source}'.")

Copy a file or the contents of a directory (excluding the top-level directory itself) into a destination. If the destination exists, it must be a directory.

The source and destination should not be the same file.

For a file, this is equivalent to mkdir -p dest && cp source dest For a dir, this is equivalent to mkdir -p dest && cp -r source/* dest

def read_file(raw_path: str, strip: bool = True, encoding: str = 'utf-8') -> str:
237def read_file(raw_path: str, strip: bool = True, encoding: str = DEFAULT_ENCODING) -> str:
238    """ Read the contents of a file. """
239
240    path = os.path.abspath(raw_path)
241
242    if (not exists(path)):
243        raise ValueError(f"Source of read does not exist: '{raw_path}'.")
244
245    with open(path, 'r', encoding = encoding) as file:
246        contents = file.read()
247
248    if (strip):
249        contents = contents.strip()
250
251    return contents

Read the contents of a file.

def write_file( raw_path: str, contents: Optional[str], strip: bool = True, newline: bool = True, encoding: str = 'utf-8', no_clobber: bool = False) -> None:
253def write_file(
254        raw_path: str, contents: typing.Union[str, None],
255        strip: bool = True, newline: bool = True,
256        encoding: str = DEFAULT_ENCODING,
257        no_clobber: bool = False) -> None:
258    """
259    Write the contents of a file.
260    If clobbering, any existing dirent will be removed before write.
261    """
262
263    path = os.path.abspath(raw_path)
264
265    if (exists(path)):
266        if (no_clobber):
267            raise ValueError(f"Destination of write already exists: '{raw_path}'.")
268
269        remove(path)
270
271    if (contents is None):
272        contents = ''
273
274    if (strip):
275        contents = contents.strip()
276
277    if (newline):
278        contents += "\n"
279
280    with open(path, 'w', encoding = encoding) as file:
281        file.write(contents)

Write the contents of a file. If clobbering, any existing dirent will be removed before write.

def read_file_bytes(raw_path: str) -> bytes:
283def read_file_bytes(raw_path: str) -> bytes:
284    """ Read the contents of a file as bytes. """
285
286    path = os.path.abspath(raw_path)
287
288    if (not exists(path)):
289        raise ValueError(f"Source of read bytes does not exist: '{raw_path}'.")
290
291    with open(path, 'rb') as file:
292        return file.read()

Read the contents of a file as bytes.

def write_file_bytes( raw_path: str, contents: Union[bytes, str, NoneType], no_clobber: bool = False) -> None:
294def write_file_bytes(
295        raw_path: str, contents: typing.Union[bytes, str, None],
296        no_clobber: bool = False) -> None:
297    """
298    Write the contents of a file as bytes.
299    If clobbering, any existing dirent will be removed before write.
300    """
301
302    if (contents is None):
303        contents = b''
304
305    if (isinstance(contents, str)):
306        contents = contents.encode(DEFAULT_ENCODING)
307
308    path = os.path.abspath(raw_path)
309
310    if (exists(path)):
311        if (no_clobber):
312            raise ValueError(f"Destination of write bytes already exists: '{raw_path}'.")
313
314        remove(path)
315
316    with open(path, 'wb') as file:
317        file.write(contents)

Write the contents of a file as bytes. If clobbering, any existing dirent will be removed before write.

def contains_path(parent: str, child: str) -> bool:
319def contains_path(parent: str, child: str) -> bool:
320    """
321    Check if the parent path contains the child path.
322    This is pure lexical analysis, no dirent stats are checked.
323    Will return false if the (absolute) paths are the same
324    (this function does not allow a path to contain itself).
325    """
326
327    if ((parent == '') or (child == '')):
328        return False
329
330    parent = os.path.abspath(parent)
331    child = os.path.abspath(child)
332
333    child = os.path.dirname(child)
334    for _ in range(DEPTH_LIMIT):
335        if (parent == child):
336            return True
337
338        new_child = os.path.dirname(child)
339        if (child == new_child):
340            return False
341
342        child = new_child
343
344    raise ValueError("Depth limit reached.")

Check if the parent path contains the child path. This is pure lexical analysis, no dirent stats are checked. Will return false if the (absolute) paths are the same (this function does not allow a path to contain itself).