Skip to content

utils¤

General functions.

GitHead dataclass ¤

GitHead(hash, timestamp)

Information about the HEAD of a git repository.

This class is returned from the git_head function.

checksum ¤

checksum(
    file,
    *,
    algorithm="sha256",
    chunk_size=65536,
    shake_length=256,
)

Get the checksum of a file.

A checksum is a sequence of numbers and letters that act as a fingerprint for a file against which later comparisons can be made to detect errors or changes in the file. It can be used to verify the integrity of the file.

Parameters:

Name Type Description Default
file PathLike | FileLikeRead[bytes]

A file to get the checksum of.

required
algorithm str

The hash algorithm to use to compute the checksum. See hashlib for more details.

'sha256'
chunk_size int

The number of bytes to read at a time from the file. It is useful to tweak this parameter when reading a large file to improve performance.

65536
shake_length int

The digest length to use for the shake_128 or shake_256 algorithm. See hexdigest for more details.

256

Returns:

Type Description
str

The checksum value (which only contains hexadecimal digits).

Source code in src/msl/io/utils.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def checksum(
    file: PathLike | FileLikeRead[bytes], *, algorithm: str = "sha256", chunk_size: int = 65536, shake_length: int = 256
) -> str:
    """Get the checksum of a file.

    A checksum is a sequence of numbers and letters that act as a fingerprint
    for a file against which later comparisons can be made to detect errors or
    changes in the file. It can be used to verify the integrity of the file.

    Args:
        file: A file to get the checksum of.
        algorithm: The hash algorithm to use to compute the checksum. See [hashlib][] for more details.
        chunk_size: The number of bytes to read at a time from the file. It is useful
            to tweak this parameter when reading a large file to improve performance.
        shake_length: The digest length to use for the `shake_128` or `shake_256` algorithm.
            See [hexdigest][hashlib.shake.hexdigest] for more details.

    Returns:
        The checksum value (which only contains hexadecimal digits).
    """
    import hashlib  # noqa: PLC0415

    def _read(fp: FileLikeRead[bytes]) -> None:
        # read in chucks to avoid loading the entire file at once
        while True:
            data = fp.read(chunk_size)
            if not data:
                break
            _hash.update(data)

    _hash = hashlib.new(algorithm)

    if isinstance(file, (str, bytes, os.PathLike)):
        with Path(os.fsdecode(file)).open("rb") as f:
            _read(f)
    else:
        position = file.tell()
        _read(file)
        _ = file.seek(position)

    try:
        return _hash.hexdigest()
    except TypeError:
        return _hash.hexdigest(shake_length)  # type: ignore[call-arg]  # pyright: ignore[reportCallIssue,reportUnknownVariableType]

copy ¤

copy(
    source,
    destination,
    *,
    overwrite=False,
    include_metadata=True,
    follow_symlinks=True,
)

Copy a file.

Parameters:

Name Type Description Default
source PathLike

The path to a file to copy.

required
destination PathLike

A directory to copy the file to or a full path (i.e., includes the basename). If the directory does not exist then it, and all intermediate directories, will be created.

required
overwrite bool

Whether to overwrite the destination file if it already exists. If destination already exists and overwrite is False then a FileExistsError is raised.

False
include_metadata bool

Whether to also copy information such as the file permissions, the latest access time and latest modification time with the file.

True
follow_symlinks bool

Whether to follow symbolic links.

True

Returns:

Type Description
Path

The path to where the file was copied.

Source code in src/msl/io/utils.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def copy(
    source: PathLike,
    destination: PathLike,
    *,
    overwrite: bool = False,
    include_metadata: bool = True,
    follow_symlinks: bool = True,
) -> Path:
    """Copy a file.

    Args:
        source: The path to a file to copy.
        destination: A directory to copy the file to or a full path (i.e., includes the basename).
            If the directory does not exist then it, and all intermediate directories, will be created.
        overwrite: Whether to overwrite the `destination` file if it already exists. If `destination`
            already exists and `overwrite` is `False` then a [FileExistsError][] is raised.
        include_metadata: Whether to also copy information such as the file permissions,
            the latest access time and latest modification time with the file.
        follow_symlinks: Whether to follow symbolic links.

    Returns:
        The path to where the file was copied.
    """
    import shutil  # noqa: PLC0415

    src = Path(os.fsdecode(source))
    dst = Path(os.fsdecode(destination))
    if dst.is_dir():
        dst = dst / src.name
    else:
        dst.parent.mkdir(parents=True, exist_ok=True)

    if not overwrite and dst.is_file():
        msg = f"Will not overwrite {destination!r}"
        raise FileExistsError(msg)

    _ = shutil.copyfile(src, dst, follow_symlinks=follow_symlinks)
    if include_metadata:
        shutil.copystat(src, dst, follow_symlinks=follow_symlinks)

    return dst

get_basename ¤

get_basename(obj)

Get the basename (the final path component) of a file.

Parameters:

Name Type Description Default
obj PathLike | ReadLike | WriteLike

The object to get the basename of. If obj is an in-memory file-like object then the class __name__ of obj is returned.

required

Returns:

Type Description
str

The basename of obj.

Source code in src/msl/io/utils.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def get_basename(obj: PathLike | ReadLike | WriteLike) -> str:
    r"""Get the basename (the final path component) of a file.

    Args:
        obj: The object to get the basename of. If `obj` is an in-memory file-like
            object then the class [\_\_name\_\_][definition.__name__] of `obj` is returned.

    Returns:
        The basename of `obj`.
    """
    if isinstance(obj, (str, bytes, os.PathLike)):
        return Path(os.fsdecode(obj)).name

    try:
        return Path(obj.name).name
    except AttributeError:
        return obj.__class__.__name__

get_bytes ¤

get_bytes(file, *positions)

Return bytes from a file.

Parameters:

Name Type Description Default
file FileLikeRead[bytes] | PathLike

The file to read bytes from.

required
positions int | None

The position(s) in the file to retrieve bytes from.

()

Examples:

get_bytes(file)  # returns all bytes
get_bytes(file, 5)  # returns the first 5 bytes
get_bytes(file, -5)  # returns the last 5 bytes
get_bytes(file, 5, 10)  # returns bytes 5 through 10 (inclusive)
get_bytes(file, 3, -1)  # skips the first 2 bytes and returns the rest
get_bytes(file, -8, -4)  # returns the eighth- through fourth-last bytes (inclusive)
get_bytes(file, 1, -1, 2)  # returns every other byte

Returns:

Type Description
bytes

The bytes from the file.

Source code in src/msl/io/utils.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def get_bytes(file: FileLikeRead[bytes] | PathLike, *positions: int | None) -> bytes:  # noqa: C901, PLR0912, PLR0915
    """Return bytes from a file.

    Args:
        file: The file to read bytes from.
        positions: The position(s) in the file to retrieve bytes from.

    **Examples:**
    ```python
    get_bytes(file)  # returns all bytes
    get_bytes(file, 5)  # returns the first 5 bytes
    get_bytes(file, -5)  # returns the last 5 bytes
    get_bytes(file, 5, 10)  # returns bytes 5 through 10 (inclusive)
    get_bytes(file, 3, -1)  # skips the first 2 bytes and returns the rest
    get_bytes(file, -8, -4)  # returns the eighth- through fourth-last bytes (inclusive)
    get_bytes(file, 1, -1, 2)  # returns every other byte
    ```

    Returns:
        The bytes from the file.
    """
    size: int
    path: Path | None
    if isinstance(file, (bytes, str, os.PathLike)):
        path = Path(os.fsdecode(file))
        try:
            size = path.stat().st_size
        except OSError:
            # A file on a mapped network drive can raise the following:
            #   [WinError 87] The parameter is incorrect
            # for Python 3.5, 3.6 and 3.7. Also, calling os.path.getsize
            # on a file on a mapped network drive could return 0
            # (without raising OSError) on Python 3.8 and 3.9, which is
            # why we set size=0 on an OSError
            size = 0

        if size == 0:
            with path.open("rb") as f:
                _ = f.seek(0, os.SEEK_END)
                size = f.tell()
    else:
        path = None
        position = file.tell()
        _ = file.seek(0, os.SEEK_END)
        size = file.tell()
        _ = file.seek(position)

    if not positions:
        start, stop, step = 0, size, 1
    elif len(positions) == 1:
        start, step = 0, 1
        stop = size if positions[0] is None else positions[0]
        if stop < 0:
            start, stop = size + stop + 1, size
    elif len(positions) == 2:  # noqa: PLR2004
        start, step = positions[0] or 0, 1
        stop = size if positions[1] is None or positions[1] == -1 else positions[1]
    else:
        start, stop, step = positions[0] or 0, positions[1] or size, positions[2] or 1

    if start < 0:
        start = max(size + start, 0)
    elif start > 0:
        start -= 1
    start = min(size, start)

    if stop < 0:
        stop += size + 1
    stop = min(size, stop)

    n_bytes = max(0, stop - start)
    if isinstance(file, (bytes, str, os.PathLike)):
        assert path is not None  # noqa: S101
        with path.open("rb") as f:
            _ = f.seek(start)
            data = f.read(n_bytes)
    else:
        position = file.tell()
        _ = file.seek(start)
        data = file.read(n_bytes)
        _ = file.seek(position)

    if step != 1:
        return data[::step]
    return data

get_extension ¤

get_extension(file)

Return the extension of the file.

Parameters:

Name Type Description Default
file PathLike | ReadLike | WriteLike

The file to get the extension of.

required

Returns:

Type Description
str

The extension (including the .).

Source code in src/msl/io/utils.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def get_extension(file: PathLike | ReadLike | WriteLike) -> str:
    """Return the extension of the file.

    Args:
        file: The file to get the extension of.

    Returns:
        The extension (including the `.`).
    """
    if isinstance(file, (bytes, str, os.PathLike)):
        return Path(os.fsdecode(file)).suffix

    try:
        return get_extension(file.name)
    except AttributeError:
        return ""

get_lines ¤

get_lines(
    file: FileLikeRead[str] | PathLike,
    *lines: int | None,
    remove_empty_lines: bool = False,
    encoding: str | None = "utf-8",
    errors: Literal["strict", "ignore"] | None = "strict",
) -> list[str]
get_lines(
    file: FileLikeRead[bytes],
    *lines: int | None,
    remove_empty_lines: bool = False,
    encoding: str | None = None,
    errors: Literal["strict", "ignore"] | None = None,
) -> list[bytes]
get_lines(
    file,
    *lines,
    remove_empty_lines=False,
    encoding="utf-8",
    errors="strict",
)

Return lines from a file.

Parameters:

Name Type Description Default
file PathLike | ReadLike

The file to read lines from.

required
lines int | None

The line(s) in the file to get.

()
remove_empty_lines bool

Whether to remove all empty lines.

False
encoding str | None

The name of the encoding to use to decode the file.

'utf-8'
errors Literal['strict', 'ignore'] | None

How encoding errors are to be handled.

'strict'

Examples:

get_lines(file)  # returns all lines
get_lines(file, 5)  # returns the first 5 lines
get_lines(file, -5)  # returns the last 5 lines
get_lines(file, 2, 4)  # returns lines 2, 3 and 4
get_lines(file, 2, -2)  # skips the first and last lines and returns the rest
get_lines(file, -4, -2)  # returns the fourth-, third- and second-last lines
get_lines(file, 1, -1, 6)  # returns every sixth line in the file

Returns:

Type Description
list[bytes] | list[str]

The lines from the file. Trailing whitespace is stripped from each line. A list[bytes] is returned if file is a file-like object opened in binary mode, otherwise a list[str] is returned.

Source code in src/msl/io/utils.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def get_lines(  # noqa: PLR0912
    file: PathLike | ReadLike,
    *lines: int | None,
    remove_empty_lines: bool = False,
    encoding: str | None = "utf-8",
    errors: Literal["strict", "ignore"] | None = "strict",
) -> list[bytes] | list[str]:
    """Return lines from a file.

    Args:
        file: The file to read lines from.
        lines: The line(s) in the file to get.
        remove_empty_lines: Whether to remove all empty lines.
        encoding: The name of the encoding to use to decode the file.
        errors: How encoding errors are to be handled.

    **Examples:**
    ```python
    get_lines(file)  # returns all lines
    get_lines(file, 5)  # returns the first 5 lines
    get_lines(file, -5)  # returns the last 5 lines
    get_lines(file, 2, 4)  # returns lines 2, 3 and 4
    get_lines(file, 2, -2)  # skips the first and last lines and returns the rest
    get_lines(file, -4, -2)  # returns the fourth-, third- and second-last lines
    get_lines(file, 1, -1, 6)  # returns every sixth line in the file
    ```

    Returns:
        The lines from the `file`. Trailing whitespace is stripped from each line.
            A [list][][[bytes][]] is returned if `file` is a file-like object
            opened in binary mode, otherwise a [list][][[str][]] is returned.
    """
    # want the "stop" line to be included
    if (len(lines) > 1) and (lines[1] is not None) and (lines[1] < 0):
        lines = (lines[0], None, *lines[2:]) if lines[1] == -1 else (lines[0], lines[1] + 1, *lines[2:])

    # want the "start" line to be included
    if (len(lines) > 1) and (lines[0] is not None) and (lines[0] > 0):
        lines = (lines[0] - 1, *lines[1:])

    result: list[bytes] | list[str]
    # itertools.islice does not support negative indices, but want to allow
    # getting the last "N" lines from a file.
    if any(val < 0 for val in lines if val):
        if isinstance(file, (bytes, str, os.PathLike)):
            with Path(os.fsdecode(file)).open(encoding=encoding, errors=errors) as f:
                result = [line.rstrip() for line in f]
        else:
            position = file.tell()
            result = [line.rstrip() for line in file]  # type: ignore[assignment]  # pyright: ignore[reportAssignmentType]
            _ = file.seek(position)

        assert lines  # noqa: S101
        if len(lines) == 1:
            result = result[lines[0] :]
        elif len(lines) == 2:  # noqa: PLR2004
            result = result[lines[0] : lines[1]]
        else:
            result = result[lines[0] : lines[1] : lines[2]]

    else:
        if not lines:
            lines = (None,)

        if isinstance(file, (bytes, str, os.PathLike)):
            with Path(os.fsdecode(file)).open(encoding=encoding, errors=errors) as f:
                result = [line.rstrip() for line in itertools.islice(f, *lines)]
        else:
            position = file.tell()
            result = [line.rstrip() for line in itertools.islice(file, *lines)]  # type: ignore[attr-defined]  # pyright: ignore[reportAssignmentType]
            _ = file.seek(position)

    if remove_empty_lines:
        return [line for line in result if line]  # type: ignore[return-value]  # pyright: ignore[reportReturnType]
    return result

git_head ¤

git_head(directory)

Get information about the HEAD of a repository.

This function requires that git is installed and that it is available on the PATH environment variable.

Parameters:

Name Type Description Default
directory PathLike

A directory that is under version control.

required

Returns:

Type Description
GitHead | None

Information about the most recent commit on the current branch. If directory is not a directory that is under version control then returns None.

Source code in src/msl/io/utils.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
def git_head(directory: PathLike) -> GitHead | None:
    """Get information about the [HEAD]{:target="_blank"} of a repository.

    This function requires that [git](https://git-scm.com/){:target="_blank"} is installed and
    that it is available on the `PATH` environment variable.

    [HEAD]: https://git-scm.com/docs/gitglossary#def_HEAD

    Args:
        directory: A directory that is under version control.

    Returns:
        Information about the most recent commit on the current branch.
            If `directory` is not a directory that is under version control
            then returns `None`.
    """
    cmd = ["git", "show", "-s", "--format=%H %ct", "HEAD"]
    try:
        out = subprocess.check_output(cmd, cwd=directory, stderr=subprocess.PIPE)  # noqa: S603
    except subprocess.CalledProcessError:
        return None

    sha, timestamp = out.split()
    return GitHead(hash=sha.decode("ascii"), timestamp=datetime.fromtimestamp(int(timestamp)))  # noqa: DTZ006

is_admin ¤

is_admin()

Check if the current process is being run as an administrator.

Returns:

Type Description
bool

True if the current process is being run as an administrator, otherwise False.

Source code in src/msl/io/utils.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def is_admin() -> bool:
    """Check if the current process is being run as an administrator.

    Returns:
        `True` if the current process is being run as an administrator, otherwise `False`.
    """
    import ctypes  # noqa: PLC0415

    try:
        is_admin: int = ctypes.windll.shell32.IsUserAnAdmin()
    except AttributeError:
        if sys.platform != "win32":
            return os.geteuid() == 0
        return False
    else:
        return is_admin == 1

is_dir_accessible ¤

is_dir_accessible(path, *, strict=False)

Check if a directory exists and is accessible.

An accessible directory is one that the user has permission to access.

Parameters:

Name Type Description Default
path PathLike

The directory to check.

required
strict bool

Whether to raise an exception if the directory is not accessible.

False

Returns:

Type Description
bool

Whether the directory exists and is accessible.

Source code in src/msl/io/utils.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def is_dir_accessible(path: PathLike, *, strict: bool = False) -> bool:
    """Check if a directory exists and is accessible.

    An accessible directory is one that the user has permission to access.

    Args:
        path: The directory to check.
        strict: Whether to raise an exception if the directory is not accessible.

    Returns:
        Whether the directory exists and is accessible.
    """
    cwd = Path.cwd()
    try:
        os.chdir(path)
    except (OSError, TypeError):
        if strict:
            raise
        return False
    else:
        os.chdir(cwd)
        return True

is_file_readable ¤

is_file_readable(file, *, strict=False)

Check if a file exists and is readable.

Parameters:

Name Type Description Default
file PathLike

The file to check.

required
strict bool

Whether to raise an exception if the file does not exist or is not readable.

False

Returns:

Type Description
bool

Whether the file exists and is readable.

Source code in src/msl/io/utils.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def is_file_readable(file: PathLike, *, strict: bool = False) -> bool:
    """Check if a file exists and is readable.

    Args:
        file: The file to check.
        strict: Whether to raise an exception if the file does not exist or is not readable.

    Returns:
        Whether the file exists and is readable.
    """
    try:
        with Path(os.fsdecode(file)).open("rb"):
            return True
    except (OSError, TypeError):
        if strict:
            raise
        return False

remove_write_permissions ¤

remove_write_permissions(path)

Remove all write permissions of a file.

On Windows, this function will set the file attribute to be read only.

On Linux and macOS, write permission is removed for the User, Group and Others. The read and execute permissions are preserved.

Parameters:

Name Type Description Default
path PathLike

The path to remove the write permissions of.

required
Source code in src/msl/io/utils.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def remove_write_permissions(path: PathLike) -> None:
    """Remove all write permissions of a file.

    On Windows, this function will set the file attribute to be read only.

    On Linux and macOS, write permission is removed for the User,
    Group and Others. The read and execute permissions are preserved.

    Args:
        path: The path to remove the write permissions of.
    """
    import stat  # noqa: PLC0415

    current_permissions = stat.S_IMODE(os.lstat(path).st_mode)
    disable_writing = ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH
    os.chmod(path, current_permissions & disable_writing)  # noqa: PTH101

run_as_admin ¤

run_as_admin(
    args=None,
    *,
    executable=None,
    cwd=None,
    capture_stderr=False,
    blocking=True,
    show=False,
    **kwargs,
)

Run a process as an administrator and return its output.

Parameters:

Name Type Description Default
args PathLike | Sequence[PathLike] | None

A sequence of program arguments or else a command string. Providing a sequence of arguments is generally preferred, as it allows the subprocess to take care of any required escaping and quoting of arguments (e.g., to permit spaces in file names).

None
executable PathLike | None

The executable to pass the args to.

None
cwd PathLike | None

The working directory to use for the elevated process.

None
capture_stderr bool

Whether to send the stderr stream to stdout.

False
blocking bool

Whether to wait for the process to finish before returning to the calling program.

True
show bool

Whether to show the elevated console (Windows only). If True, the stdout stream of the process is not captured.

False
kwargs Any

If the current process already has admin privileges or if the operating system is not Windows then all additional keyword arguments are passed to subprocess.check_output. Otherwise, only a timeout keyword argument is used (Windows).

{}

Returns:

Type Description
int | bytes | Popen[Any]

The returned object depends on whether the process is executed in blocking or non-blocking mode and whether Python is already running with admin privileges. If blocking, bytes are returned (the stdout stream of the process). If non-blocking, the returned object will either be the subprocess.Popen instance that is running the process (POSIX) or an int which is the process ID (Windows).

Source code in src/msl/io/utils.py
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def run_as_admin(  # noqa: C901, PLR0912, PLR0913, PLR0915
    args: PathLike | Sequence[PathLike] | None = None,
    *,
    executable: PathLike | None = None,
    cwd: PathLike | None = None,
    capture_stderr: bool = False,
    blocking: bool = True,
    show: bool = False,
    **kwargs: Any,
) -> int | bytes | Popen[Any]:
    """Run a process as an administrator and return its output.

    Args:
        args: A sequence of program arguments or else a command string. Providing a sequence of
            arguments is generally preferred, as it allows the subprocess to take care of any required
            escaping and quoting of arguments (e.g., to permit spaces in file names).
        executable: The executable to pass the `args` to.
        cwd: The working directory to use for the elevated process.
        capture_stderr: Whether to send the _stderr_ stream to _stdout_.
        blocking: Whether to wait for the process to finish before returning to the calling program.
        show: Whether to show the elevated console (Windows only). If `True`, the _stdout_ stream of
            the process is not captured.
        kwargs: If the current process already has admin privileges or if the operating system is
            not Windows then all additional keyword arguments are passed to [subprocess.check_output][].
            Otherwise, only a `timeout` keyword argument is used (Windows).

    Returns:
        The returned object depends on whether the process is executed in blocking or non-blocking mode
            and whether Python is already running with admin privileges. If blocking, [bytes][] are returned
            (the _stdout_ stream of the process). If non-blocking, the returned object will either be the
            [subprocess.Popen][] instance that is running the process (POSIX) or an [int][] which is the
            process ID (Windows).
    """
    if not args and not executable:
        msg = "Must specify the args and/or an executable"
        raise ValueError(msg)

    stderr = subprocess.STDOUT if capture_stderr else None
    process = subprocess.check_output if blocking else subprocess.Popen

    if is_admin():
        if not args:
            assert executable is not None  # noqa: S101
            return process(executable, cwd=cwd, stderr=stderr, **kwargs)  # pyright: ignore[reportUnknownVariableType]
        return process(args, executable=executable, cwd=cwd, stderr=stderr, **kwargs)  # pyright: ignore[reportUnknownVariableType]

    exe = "" if executable is None else subprocess.list2cmdline([os.fsdecode(executable)])

    if os.name != "nt":
        if not args:
            command = ["sudo", exe]
        elif isinstance(args, (str, bytes, os.PathLike)):
            command = ["sudo", exe, os.fsdecode(args)]
        else:
            command = ["sudo", exe, *list(map(os.fsdecode, args))]
        return process(command, cwd=cwd, stderr=stderr, **kwargs)  # pyright: ignore[reportUnknownVariableType]

    # Windows is more complicated

    if args is None:
        args = ""
    elif isinstance(args, (bytes, os.PathLike)):
        args = os.fsdecode(args)

    if not isinstance(args, str):
        args = subprocess.list2cmdline(args)

    cwd = os.getcwd() if not cwd else os.fsdecode(cwd)  # noqa: PTH109

    # the 'runas' verb starts in C:\WINDOWS\system32
    cd = subprocess.list2cmdline(["cd", "/d", cwd, "&&"])

    # check if a Python environment needs to be activated
    activate = ""
    if exe == sys.executable or args.startswith(sys.executable):
        conda = os.getenv("CONDA_PREFIX")  # conda
        venv = os.getenv("VIRTUAL_ENV")  # venv
        if conda:
            env = os.getenv("CONDA_DEFAULT_ENV")
            if not env:
                msg = "CONDA_DEFAULT_ENV environment variable does not exist"
                raise ValueError(msg)
            if env == "base":
                bat = Path(conda) / "Scripts" / "activate.bat"
            else:
                bat = Path(conda).parent.parent / "Scripts" / "activate.bat"
            if not bat.is_file():
                msg = f"Cannot find {bat}"
                raise FileNotFoundError(msg)
            activate = subprocess.list2cmdline([bat, env, "&&"])
        elif venv:
            bat = Path(venv) / "Scripts" / "activate.bat"
            if not bat.is_file():
                msg = f"Cannot find {bat}"
                raise FileNotFoundError(msg)
            activate = subprocess.list2cmdline([bat, "&&"])

    # redirect stdout (stderr) to a file
    redirect = None
    stdout_file = None
    if not show:
        import tempfile  # noqa: PLC0415
        import uuid  # noqa: PLC0415

        stdout_file = Path(tempfile.gettempdir()) / str(uuid.uuid4())
        r = [">", str(stdout_file)]
        if capture_stderr:
            r.append("2>&1")
        redirect = subprocess.list2cmdline(r)
        if re.search(r"\d$", args):
            # this number is also considered as a file handle, so add a space
            redirect = " " + redirect

    # the string that is passed to cmd.exe
    params = f'/S /C "{cd} {activate} {exe} {args}"{redirect}'

    import ctypes  # noqa: PLC0415
    from ctypes.wintypes import DWORD, HANDLE, HINSTANCE, HKEY, HWND, INT, LPCWSTR, ULONG  # noqa: PLC0415

    class ShellExecuteInfoW(ctypes.Structure):
        _fields_ = (  # pyright: ignore[reportUnannotatedClassAttribute]
            ("cbSize", DWORD),
            ("fMask", ULONG),
            ("hwnd", HWND),
            ("lpVerb", LPCWSTR),
            ("lpFile", LPCWSTR),
            ("lpParameters", LPCWSTR),
            ("lpDirectory", LPCWSTR),
            ("nShow", INT),
            ("hInstApp", HINSTANCE),
            ("lpIDList", ctypes.c_void_p),
            ("lpClass", LPCWSTR),
            ("hkeyClass", HKEY),
            ("dwHotKey", DWORD),
            ("hIcon", HANDLE),
            ("hProcess", HANDLE),
        )

    sei = ShellExecuteInfoW()
    sei.fMask = 0x00000040 | 0x00008000  # SEE_MASK_NOCLOSEPROCESS | SEE_MASK_NO_CONSOLE
    sei.lpVerb = kwargs.get("verb", "runas")  # change the verb when running the tests
    sei.lpFile = "cmd.exe"
    sei.lpParameters = params
    sei.lpDirectory = f"{cwd}" if cwd else None
    sei.nShow = int(show)
    sei.cbSize = ctypes.sizeof(sei)
    if not ctypes.windll.Shell32.ShellExecuteExW(ctypes.byref(sei)):
        raise ctypes.WinError()

    if not blocking:
        return cast("int", sei.hProcess)

    kernel32 = ctypes.windll.kernel32
    timeout = kwargs.get("timeout", -1)  # INFINITE = -1
    milliseconds = int(timeout * 1e3) if timeout > 0 else timeout

    ret = kernel32.WaitForSingleObject(sei.hProcess, milliseconds)
    if ret == 0:  # WAIT_OBJECT_0
        stdout = b""
        if stdout_file is not None and stdout_file.is_file():
            stdout = stdout_file.read_bytes()
            stdout_file.unlink()

        code = DWORD()
        if not kernel32.GetExitCodeProcess(sei.hProcess, ctypes.byref(code)):
            raise ctypes.WinError()

        if code.value != 0:
            msg = ctypes.FormatError(code.value)
            out_str = stdout.decode("utf-8", "ignore").rstrip()
            if show:
                msg += "\nSet show=False to capture the stdout stream."
            else:
                if not capture_stderr:
                    msg += "\nSet capture_stderr=True to see if more information is available."
                if out_str:
                    msg += f"\n{out_str}"
            raise ctypes.WinError(code.value, msg)

        kernel32.CloseHandle(sei.hProcess)
        return stdout

    if ret == 0xFFFFFFFF:  # WAIT_FAILED  # noqa: PLR2004
        raise ctypes.WinError()

    if ret == 0x00000080:  # WAIT_ABANDONED  # noqa: PLR2004
        msg = (
            "The specified object is a mutex object that was not "
            "released by the thread that owned the mutex object before "
            "the owning thread terminated. Ownership of the mutex "
            "object is granted to the calling thread and the mutex state "
            "is set to non-signalled. If the mutex was protecting persistent "
            "state information, you should check it for consistency."
        )
    elif ret == 0x00000102:  # WAIT_TIMEOUT  # noqa: PLR2004
        msg = f"The timeout interval elapsed after {timeout} second(s) and the object's state is non-signalled."
    else:
        msg = f"Unknown return value 0x{ret:x}"

    msg = f"WaitForSingleObject: {msg}"
    raise OSError(msg)

search ¤

search(
    directory,
    *,
    depth=0,
    include=None,
    exclude=None,
    flags=0,
    ignore_os_error=True,
    ignore_hidden_folders=True,
    follow_symlinks=True,
)

Search for files starting from a root directory.

Parameters:

Name Type Description Default
directory PathLike

The root directory to begin searching for files.

required
depth int | None

The number of sub-directories to recursively search for files. If 0, only files in directory are searched, if 1 then files in directory and in one sub-directory are searched, etc. If None, search directory and recursively search all sub-directories.

0
include str | Pattern[str] | None

A regular-expression pattern to use to include files. If None, no filtering is applied and all files are yielded (that are not excluded). For example,

  • r"data" → find files with the word data in the file path
  • r"\.png$" → find files with the extension .png
  • r"\.jpe*g$" → find files with the extension .jpeg or .jpg
None
exclude str | Pattern[str] | None

A regular-expression pattern to use to exclude files. The exclude pattern has precedence over the include pattern. For example,

  • r"bin" → exclude all files that contain the word bin in the file path
  • r"bin|lib" → exclude all files that contain the word bin or lib in the file path
None
flags int

The flags to use to compile the regular-expression pattern (if it is a str type).

0
ignore_os_error bool

Whether to ignore an OSError, if one occurs, while iterating through a directory. This type of error can occur if a directory does not have the appropriate read permission.

True
ignore_hidden_folders bool

Whether to ignore a hidden directory from the search. A hidden directory starts with a . (a dot).

True
follow_symlinks bool

Whether to search for files by following symbolic links.

True

Yields:

Type Description
Generator[Path]

The path to a file.

Source code in src/msl/io/utils.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def search(  # noqa: C901, PLR0913
    directory: PathLike,
    *,
    depth: int | None = 0,
    include: str | re.Pattern[str] | None = None,
    exclude: str | re.Pattern[str] | None = None,
    flags: int = 0,
    ignore_os_error: bool = True,
    ignore_hidden_folders: bool = True,
    follow_symlinks: bool = True,
) -> Generator[Path]:
    r"""Search for files starting from a root directory.

    Args:
        directory: The root directory to begin searching for files.
        depth: The number of sub-directories to recursively search for files.
            If `0`, only files in `directory` are searched, if `1` then files in `directory`
            and in one sub-directory are searched, etc. If `None`, search `directory` and
            recursively search all sub-directories.
        include: A regular-expression pattern to use to include files. If `None`, no filtering
            is applied and all files are yielded (that are not `exclude`d). For example,

            * `r"data"` &#8594; find files with the word `data` in the file path
            * `r"\.png$"` &#8594; find files with the extension `.png`
            * `r"\.jpe*g$"` &#8594; find files with the extension `.jpeg` or `.jpg`

        exclude: A regular-expression pattern to use to exclude files. The `exclude` pattern
            has precedence over the `include` pattern. For example,

            * `r"bin"` &#8594; exclude all files that contain the word `bin` in the file path
            * `r"bin|lib"` &#8594; exclude all files that contain the word `bin` or `lib` in the file path

        flags: The flags to use to compile the regular-expression pattern (if it is a [str][] type).
        ignore_os_error: Whether to ignore an [OSError][], if one occurs, while iterating through a directory.
            This type of error can occur if a directory does not have the appropriate read permission.
        ignore_hidden_folders: Whether to ignore a hidden directory from the search. A hidden directory
            starts with a `.` (a dot).
        follow_symlinks: Whether to search for files by following symbolic links.

    Yields:
        The path to a file.
    """
    if depth is not None and depth < 0:
        return

    folder = Path(os.fsdecode(directory))

    if ignore_hidden_folders and folder.name.startswith("."):
        logger.debug("search ignored hidden folder '%s'", folder)
        return

    if isinstance(exclude, str):
        exclude = re.compile(exclude, flags=flags)

    if isinstance(include, str):
        include = re.compile(include, flags=flags)

    try:
        with os.scandir(folder) as it:
            for entry in it:
                if entry.is_file():
                    path = entry.path
                    if exclude and exclude.search(path):
                        logger.debug("search excluded file %r", path)
                    elif include is None or include.search(path):
                        yield Path(path)
                elif entry.is_dir(follow_symlinks=follow_symlinks):
                    yield from search(
                        entry,
                        depth=None if depth is None else depth - 1,
                        include=include,
                        exclude=exclude,
                        flags=flags,
                        ignore_os_error=ignore_os_error,
                        ignore_hidden_folders=ignore_hidden_folders,
                        follow_symlinks=follow_symlinks,
                    )
    except OSError:
        logger.debug("search raised OSError for '%s'", folder)
        if not ignore_os_error:
            raise

send_email ¤

send_email(
    config, recipients, sender=None, subject=None, body=None
)

Send an email.

Parameters:

Name Type Description Default
config PathLike | SupportsRead[AnyStr]

An INI-style configuration file that contains information on how to send an email. There are two ways to send an email — Gmail API or SMTP server.

An example INI file to use the Gmail API is the following (see GMail for more details). Although all key-value pairs are optional, a [gmail] section must exist to use the Gmail API. If a key is omitted, the value passed to GMail is None

[gmail]
account = work
credentials = path/to/client_secrets.json
scopes =
    https://www.googleapis.com/auth/gmail.send
    https://www.googleapis.com/auth/gmail.metadata
domain = @gmail.com

An example INI file for an SMTP server is the following. Only the host and port key-value pairs are required.

[smtp]
host = hostname or IP address of the SMTP server
port = port number to connect to on the SMTP server (e.g., 25)
starttls = true|yes|1|on -or- false|no|0|off (default: false)
username = the username to authenticate with (default: None)
password = the password for username (default: None)
domain = @company.com (default: None)

Warning

Since this information is specified in plain text in the configuration file, you should set the file permissions provided by your operating system to ensure that your authentication credentials are safe.

required
recipients str | list[str]

The email address(es) of the recipient(s). Can omit the @domain.com part if a domain key is specified in the config file. Can use the value 'me' if sending an email to yourself via Gmail.

required
sender str | None

The email address of the sender. Can omit the @domain.com part if a domain key is specified in the config file. If sender is not specified, it becomes the value of the first recipient if using SMTP or the value 'me' if using Gmail.

None
subject str | None

The text to include in the subject field.

None
body str | None

The text to include in the body of the email. The text can be enclosed in <html></html> tags to use HTML elements to format the message.

None
Source code in src/msl/io/utils.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def send_email(
    config: PathLike | SupportsRead[AnyStr],
    recipients: str | list[str],
    sender: str | None = None,
    subject: str | None = None,
    body: str | None = None,
) -> None:
    """Send an email.

    Args:
        config: An INI-style configuration file that contains information on how to send
            an email. There are two ways to send an email &mdash; Gmail API or SMTP server.

            An example INI file to use the Gmail API is the following (see
            [GMail][msl.io.google_api.GMail] for more details). Although all
            key-value pairs are optional, a `[gmail]` section must exist to use
            the Gmail API. If a key is omitted, the value passed to
            [GMail][msl.io.google_api.GMail] is `None`

            ```ini
            [gmail]
            account = work
            credentials = path/to/client_secrets.json
            scopes =
                https://www.googleapis.com/auth/gmail.send
                https://www.googleapis.com/auth/gmail.metadata
            domain = @gmail.com
            ```

            An example INI file for an SMTP server is the following. Only the `host`
            and `port` key-value pairs are required.

            ```ini
            [smtp]
            host = hostname or IP address of the SMTP server
            port = port number to connect to on the SMTP server (e.g., 25)
            starttls = true|yes|1|on -or- false|no|0|off (default: false)
            username = the username to authenticate with (default: None)
            password = the password for username (default: None)
            domain = @company.com (default: None)
            ```

            !!! warning
                Since this information is specified in plain text in the configuration
                file, you should set the file permissions provided by your operating
                system to ensure that your authentication credentials are safe.

        recipients: The email address(es) of the recipient(s). Can omit the `@domain.com`
            part if a `domain` key is specified in the `config` file. Can use the value
            `'me'` if sending an email to yourself via Gmail.
        sender: The email address of the sender. Can omit the `@domain.com` part
            if a `domain` key is specified in the `config` file. If `sender` is not
            specified, it becomes the value of the first `recipient` if using SMTP
            or the value `'me'` if using Gmail.
        subject: The text to include in the subject field.
        body: The text to include in the body of the email. The text can be enclosed
            in `<html></html>` tags to use HTML elements to format the message.
    """
    cfg = _prepare_email(config, recipients, sender)
    if isinstance(cfg, _SMTPConfig):
        from email.mime.multipart import MIMEMultipart  # noqa: PLC0415
        from email.mime.text import MIMEText  # noqa: PLC0415
        from smtplib import SMTP  # noqa: PLC0415

        with SMTP(host=cfg.host, port=cfg.port) as server:
            if cfg.starttls:
                _ = server.ehlo()
                _ = server.starttls()
                _ = server.ehlo()
            if cfg.username and cfg.password:
                _ = server.login(cfg.username, cfg.password)
            msg = MIMEMultipart()
            msg["From"] = cfg.frm
            msg["To"] = ", ".join(cfg.to)
            msg["Subject"] = subject or "(no subject)"
            text = body or ""
            subtype = "html" if text.startswith("<html>") else "plain"
            msg.attach(MIMEText(text, subtype))
            _ = server.sendmail(cfg.frm, cfg.to, msg.as_string())
    else:
        with GMail(account=cfg.account, credentials=cfg.credentials, scopes=cfg.scopes) as gmail:
            gmail.send(cfg.to, sender=cfg.frm, subject=subject, body=body)