File size: 4,799 Bytes
0b11a42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import pandas as pd
import os
import errno
from pathlib import Path
from Bio.SeqIO.FastaIO import SimpleFastaParser
from datetime import datetime
from getpass import getuser

import logging
from rich.logging import RichHandler
from functools import wraps
from time import perf_counter
from typing import Callable

default_path = '../outputs/'

def humanize_time(time_in_seconds: float, /) -> str:
    """Return a nicely human-readable string of a time_in_seconds.

    Parameters
    ----------
    time_in_seconds : float
        Time in seconds, (not full seconds).

    Returns
    -------
    str
        A description of the time in one of the forms:
        - 300.1 ms
        - 4.5 sec
        - 5 min 43.1 sec
    """
    sgn = "" if time_in_seconds >= 0 else "- "
    time_in_seconds = abs(time_in_seconds)
    if time_in_seconds < 1:
        return f"{sgn}{time_in_seconds*1e3:.1f} ms"
    elif time_in_seconds < 60:
        return f"{sgn}{time_in_seconds:.1f} sec"
    else:
        return f"{sgn}{int(time_in_seconds//60)} min {time_in_seconds%60:.1f} sec"


class log_time:
    """A decorator / context manager to log the time a certain function / code block took.

    Usage either with:

        @log_time(log)
        def function_getting_logged_every_time(…):


    producing:

        function_getting_logged_every_time took 5 sec.

    or:

        with log_time(log, "Name of this codeblock"):


    producing:

        Name of this codeblock took 5 sec.
    """

    def __init__(self, logger: logging.Logger, name: str = None):
        """
        Parameters
        ----------
        logger : logging.Logger
            The logger to use for logging the time, if None use print.
        name : str, optional
            The name in the message, when used as a decorator this defaults to the function name, by default None
        """
        self.logger = logger
        self.name = name

    def __call__(self, func: Callable):
        if self.name is None:
            self.name = func.__qualname__

        @wraps(func)
        def inner(*args, **kwds):
            with self:
                return func(*args, **kwds)

        return inner

    def __enter__(self):
        self.start_time = perf_counter()

    def __exit__(self, *exc):
        self.exit_time = perf_counter()

        time_delta = humanize_time(self.exit_time - self.start_time)
        if self.logger is None:
            print(f"{self.name} took {time_delta}.")
        else:
            self.logger.info(f"{self.name} took {time_delta}.")


def write_2_log(log_file):
    # Setup logging
    log_file_handler = logging.FileHandler(log_file)
    log_file_handler.setLevel(logging.INFO)
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
    log_rich_handler = RichHandler()
    log_rich_handler.setLevel(logging.INFO) #cli_args.log_level
    log_rich_handler.setFormatter(logging.Formatter("%(message)s"))
    logging.basicConfig(level=logging.INFO, datefmt="[%X]", handlers=[log_file_handler, log_rich_handler])


def fasta2df(path):
    with open(path) as fasta_file:
        identifiers = []
        seqs = []
        for header, sequence in SimpleFastaParser(fasta_file):
            identifiers.append(header)
            seqs.append(sequence)

    fasta_df = pd.DataFrame(seqs, identifiers, columns=['sequence'])
    fasta_df['sequence'] = fasta_df.sequence.apply(lambda x: x.replace('U','T'))
    return fasta_df



def fasta2df_subheader(path, id_pos):
    with open(path) as fasta_file:
        identifiers = []
        seqs = []
        for header, sequence in SimpleFastaParser(fasta_file):
            identifiers.append(header.split(None)[id_pos])
            seqs.append(sequence)

    fasta_df = pd.DataFrame(seqs, identifiers, columns=['sequence'])
    fasta_df['sequence'] = fasta_df.sequence.apply(lambda x: x.replace('U','T'))
    return fasta_df



def build_bowtie_index(bowtie_index_file):
    #index_example = Path(bowtie_index_file + '.1.ebwt')
    #if not index_example.is_file():
    print('-------- index is build --------')
    os.system(f"bowtie-build {bowtie_index_file + '.fa'} {bowtie_index_file}")
    #else: print('-------- previously built index is used --------')



def make_output_dir(fasta_file):
    output_dir = default_path + datetime.now().strftime('%Y-%m-%d') + ('__') + fasta_file.replace('.fasta', '').replace('.fa', '') + '/'
    try:
        os.makedirs(output_dir)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise # This was not a "directory exist" error..
    return output_dir


def reverse_complement(seq):
    complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}
    return ''.join([complement[base] for base in seq[::-1]])