File size: 2,167 Bytes
eaa3d8a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import logging
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Callable
LOGS_DIRECTORY_PATH = Path('../logs')
# Number of processes to use when executing functions in parallel.
MAX_PROCESS_COUNT = 20
def set_up_logging(log_name):
"""Set up a logger that logs to both the console and a file."""
log_path = LOGS_DIRECTORY_PATH / log_name
log_path.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# Remove the default handler.
logger.handlers.clear()
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S')
stream_handler.setFormatter(stream_formatter)
logger.addHandler(stream_handler)
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def strip_and_remove_empty_strings(strings):
"""Strip a list of strings and remove empty strings."""
strings = [string.strip() for string in strings]
strings = [string for string in strings if string]
return strings
def execute_function_in_parallel(function: Callable, *argument_lists,
logger=None):
"""Execute a function in parallel using multiple processes."""
with ProcessPoolExecutor(max_workers=MAX_PROCESS_COUNT) as executor:
futures = [executor.submit(function, *arguments)
for arguments in zip(*argument_lists)]
results = []
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as exception:
if logger:
logger.exception('Exception')
raise exception
return results |