Skip to content

kaptive.utils

check_file(file, panic=False)

Checks a file exists and is non-empty and returns the absolute path

Source code in kaptive/utils.py
43
44
45
46
47
48
49
50
51
52
53
def check_file(file: str | os.PathLike, panic: bool = False) -> os.PathLike | None:
    """Checks a file exists and is non-empty and returns the absolute path"""
    func = quit_with_error if panic else warning
    if not os.path.exists(file):
        return func(f'{file} does not exist')
    if not os.path.isfile(file):
        return func(f'{file} is not a file')
    elif not os.path.getsize(file):
        return func(f'{file} is empty')
    else:
        return os.path.abspath(file)

check_out(path, mode='at', exist_ok=True)

Check if the user wants to create/append a file or directory. If it looks like/is already a file (has an extension), return the file object. If it looks like/is already a directory, return the directory path.

Source code in kaptive/utils.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def check_out(path: str | os.PathLike, mode: str = "at", exist_ok: bool = True) -> os.PathLike | TextIO:
    """
    Check if the user wants to create/append a file or directory.
    If it looks like/is already a file (has an extension), return the file object.
    If it looks like/is already a directory, return the directory path.
    """
    if path == '-':  # If the path is '-', return stdout
        return sys.stdout
    if os.path.splitext(path)[1]:  # If the path has an extension, it's probably a file
        try:
            return open(path, mode)  # Open the file
        except Exception as e:
            quit_with_error(f'Could not open {path}: {e}')
    if not os.path.exists(path):  # Assume directory
        try:
            os.makedirs(path, exist_ok=exist_ok)  # Create the directory if it doesn't exist
        except Exception as e:
            quit_with_error(f'Could not create {path}: {e}')
    return path

check_programs(progs, verbose=False)

Check if programs are installed and executable

Source code in kaptive/utils.py
29
30
31
32
33
34
35
36
37
38
39
40
def check_programs(progs: list[str], verbose: bool = False):
    """Check if programs are installed and executable"""
    bins = {  # Adapted from: https://unix.stackexchange.com/a/261971/375975
        binary: x for path in filter(
            os.path.isdir, os.environ["PATH"].split(os.path.pathsep)
        ) for binary in os.listdir(path) if os.access((x := os.path.join(path, binary)), os.X_OK)
    }
    for program in progs:
        if program in bins:
            log(f'{program}: {bins[program]}', verbose=verbose)
        else:
            quit_with_error(f'{program} not found')

merge_ranges(ranges, tolerance=0, skip_sort=False)

Merge overlapping ranges :param ranges: List of tuples of start and end positions :param tolerance: Integer or float of tolerance for merging ranges :param skip_sort: Skip sorting the ranges before merging :return: List of merged ranges

Source code in kaptive/utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def merge_ranges(ranges: list[tuple[int | float, int | float]], tolerance: int | float = 0, skip_sort: bool = False
                 ) -> Generator[tuple[int | float, int | float], None, None]:
    """
    Merge overlapping ranges
    :param ranges: List of tuples of start and end positions
    :param tolerance: Integer or float of tolerance for merging ranges
    :param skip_sort: Skip sorting the ranges before merging
    :return: List of merged ranges
    """
    if not ranges:
        return None
    if len(ranges) == 1:
        yield ranges[0]
        return None
    current_range = (ranges := ranges if skip_sort else sorted(ranges, key=itemgetter(0)))[0]  # Start with the first range
    for start, end in ranges[1:]:  # Iterate through the ranges
        if start - tolerance <= current_range[1]:  # Overlap, merge the ranges
            current_range = (current_range[0], max(current_range[1], end))
        else:  # No overlap, add the current range to the merged list and start a new range
            yield current_range  # Yield the current range
            current_range = (start, end)   # Start a new range
    yield current_range  # Yield the last range

opener(file, verbose=False, *args, **kwargs)

Opens a file with the appropriate open function based on the magic bytes at the beginning of the data :param file: File to open :param verbose: Print log messages to stderr :return: File handle

Source code in kaptive/utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def opener(file: str | os.PathLike, verbose: bool = False, *args, **kwargs) -> TextIO | BinaryIO:
    """
    Opens a file with the appropriate open function based on the magic bytes at the beginning of the data
    :param file: File to open
    :param verbose: Print log messages to stderr
    :return: File handle
    """
    try:
        file = check_file(file)
    except FileNotFoundError as e:
        raise e
    basename = os.path.basename(file)
    with open(file, 'rb') as f:  # Open the file to read bytes
        first_bytes = f.read(_MIN_N_BYTES)  # Get the bytes necessary to guess the compression type
    for magic, compression in _MAGIC_BYTES.items():
        if first_bytes.startswith(magic):
            log(f"Assuming {basename} is compressed with {compression}", verbose=verbose)
            try:
                return _OPEN[compression](file, *args, **kwargs)
            except Exception as e:
                return warning(f"Error opening {basename} with {compression}; {first_bytes=}\n{e}")
    log(f"Assuming {basename} is uncompressed", verbose=verbose)
    return open(file, *args, **kwargs)

range_overlap(range1, range2, skip_sort=False)

Returns the overlap between two ranges :param range1: Tuple of start and end positions :param range2: Tuple of start and end positions :param skip_sort: Skip sorting each range before calculating the overlap :return: Integer of overlap

Source code in kaptive/utils.py
143
144
145
146
147
148
149
150
151
152
153
154
155
def range_overlap(range1: tuple[int, int], range2: tuple[int, int], skip_sort: bool = False) -> int:
    """
    Returns the overlap between two ranges
    :param range1: Tuple of start and end positions
    :param range2: Tuple of start and end positions
    :param skip_sort: Skip sorting each range before calculating the overlap
    :return: Integer of overlap
    """
    start1, end1 = range1 if skip_sort else sorted(range1)
    start2, end2 = range2 if skip_sort else sorted(range2)
    overlap_start = max(start1, start2)
    overlap_end = min(end1, end2)
    return max(0, overlap_end - overlap_start)