| | """ |
| | Parallel processing utilities for data pipeline optimization. |
| | |
| | Supports: |
| | - Parallel data ingestion |
| | - Batch processing with multiprocessing |
| | - Distributed processing support (foundation for Dask/Beam) |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import multiprocessing as mp |
| | from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed |
| | from functools import partial |
| | from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar |
| |
|
| | T = TypeVar("T") |
| | R = TypeVar("R") |
| |
|
| |
|
| | def parallel_map( |
| | func: Callable[[T], R], |
| | items: Iterable[T], |
| | max_workers: Optional[int] = None, |
| | use_threads: bool = False, |
| | chunk_size: int = 1, |
| | **func_kwargs, |
| | ) -> List[R]: |
| | """ |
| | Parallel map function. |
| | |
| | Args: |
| | func: Function to apply to each item |
| | items: Iterable of items to process |
| | max_workers: Maximum number of workers (default: CPU count) |
| | use_threads: Use threads instead of processes (for I/O-bound tasks) |
| | chunk_size: Number of items per chunk (for ProcessPoolExecutor) |
| | **func_kwargs: Additional kwargs to pass to func |
| | |
| | Returns: |
| | List of results in the same order as items |
| | """ |
| | if max_workers is None: |
| | max_workers = mp.cpu_count() |
| | |
| | items_list = list(items) |
| | if not items_list: |
| | return [] |
| | |
| | |
| | if func_kwargs: |
| | func = partial(func, **func_kwargs) |
| | |
| | executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor |
| | |
| | with executor_class(max_workers=max_workers) as executor: |
| | if use_threads: |
| | |
| | futures = [executor.submit(func, item) for item in items_list] |
| | else: |
| | |
| | futures = executor.map(func, items_list, chunksize=chunk_size) |
| | return list(futures) |
| | |
| | |
| | results = [] |
| | for future in as_completed(futures): |
| | try: |
| | results.append(future.result()) |
| | except Exception as e: |
| | |
| | print(f"Error in parallel_map: {e}") |
| | results.append(None) |
| | |
| | |
| | |
| | return results |
| |
|
| |
|
| | def batch_process( |
| | func: Callable[[List[T]], List[R]], |
| | items: Iterable[T], |
| | batch_size: int = 32, |
| | max_workers: Optional[int] = None, |
| | **func_kwargs, |
| | ) -> List[R]: |
| | """ |
| | Process items in batches in parallel. |
| | |
| | Args: |
| | func: Function that processes a batch and returns list of results |
| | items: Iterable of items to process |
| | batch_size: Number of items per batch |
| | max_workers: Maximum number of parallel batches |
| | **func_kwargs: Additional kwargs to pass to func |
| | |
| | Returns: |
| | Flattened list of results |
| | """ |
| | items_list = list(items) |
| | batches = [ |
| | items_list[i : i + batch_size] for i in range(0, len(items_list), batch_size) |
| | ] |
| | |
| | if not batches: |
| | return [] |
| | |
| | |
| | if func_kwargs: |
| | func = partial(func, **func_kwargs) |
| | |
| | if max_workers is None: |
| | max_workers = min(len(batches), mp.cpu_count()) |
| | |
| | results = parallel_map( |
| | func, |
| | batches, |
| | max_workers=max_workers, |
| | use_threads=False, |
| | ) |
| | |
| | |
| | flattened = [] |
| | for batch_results in results: |
| | if batch_results: |
| | flattened.extend(batch_results) |
| | |
| | return flattened |
| |
|
| |
|
| | class ParallelProcessor: |
| | """Wrapper for parallel processing with configuration.""" |
| |
|
| | def __init__( |
| | self, |
| | max_workers: Optional[int] = None, |
| | use_threads: bool = False, |
| | chunk_size: int = 1, |
| | ): |
| | self.max_workers = max_workers or mp.cpu_count() |
| | self.use_threads = use_threads |
| | self.chunk_size = chunk_size |
| |
|
| | def map(self, func: Callable[[T], R], items: Iterable[T], **func_kwargs) -> List[R]: |
| | """Apply function to items in parallel.""" |
| | return parallel_map( |
| | func, |
| | items, |
| | max_workers=self.max_workers, |
| | use_threads=self.use_threads, |
| | chunk_size=self.chunk_size, |
| | **func_kwargs, |
| | ) |
| |
|
| | def batch_map( |
| | self, |
| | func: Callable[[List[T]], List[R]], |
| | items: Iterable[T], |
| | batch_size: int = 32, |
| | **func_kwargs, |
| | ) -> List[R]: |
| | """Process items in batches in parallel.""" |
| | return batch_process( |
| | func, |
| | items, |
| | batch_size=batch_size, |
| | max_workers=self.max_workers, |
| | **func_kwargs, |
| | ) |
| |
|