Source code for idpconfgen.libs.libmulticore

"""Multi-core related objects."""
import traceback
from multiprocessing import Pool

from idpconfgen import log
from idpconfgen.core.exceptions import IDPConfGenException, ReportOnCrashError
from idpconfgen.libs.libtimer import ProgressWatcher


[docs]def starunpack(func, *args, **kwargs): """Unpack first argument of `args`.""" return func(*args[0], **kwargs)
# USED OKAY # here can't use closures because those can't be pickled
[docs]def consume_iterable_in_list(func, *args, **kwargs): """ Consumes a generator to a list. Used as a wrapper to send generators to Python multiprocessing lib. Returns ------- list of length N Where N is the number of times `func` yields. """ return list(func(*args, **kwargs))
# USED OKAY
[docs]def flat_results_from_chunk(execute, func, *args, **kwargs): """ Flattens a result coming from consume_iterable_in_list execution. Parameters ---------- execute : callable The prepared multiprocessing function. Each item of its iteration should be iterable of iterable. func : callable The function to process each yielded result from the results in the multiprocessing fragment. """ for chunk in execute(): # each result is a list coming from consume_iterable_in_list # where each element is a list yield from # extract_secondary_structure flatted = (yielded for result in chunk for yielded in result) func(flatted, *args, **kwargs)
# USED OKAY
[docs]def pool_function(func, items, method='imap_unordered', ncores=1): """ Multiprocess Pools a function. Parameters ---------- func : callable The function to execute along the iterable `items`. If this function expects additional `args and `kwargs`, prepare it previously using `functools.partial`. items : interable Elements to pass to the `func`. method : str The :class:`Pool` method to execute. Defaults to `imap_unordered`. ncores : int The number of cores to use. Defaults to `1`. """ with Pool(ncores) as pool, \ ProgressWatcher(items, suffix=f'on {ncores} cpus') as pb: imap = getattr(pool, method)(func, items) # the use of `while` here is needed, instead of for # to allo try/catch options while True: try: yield next(imap) pb.increment() except StopIteration: break except IndexError: log.error('IndexError of multiprocessing, ignoring something') except ReportOnCrashError: # nothing to do, report did it already continue except IDPConfGenException as err: log.debug(traceback.format_exc()) log.error(err)
# USED OKAY
[docs]def pool_function_in_chunks(func, items, chunks=5_000, **kwargs): """ Execute ``func`` in ``chunks`` of ``items`` using `Pool`. Yields the results after each fragment. Parameters ---------- func : callable The function to execute over ``items``. If this function expects additional `args and `kwargs`, prepare it previously using `functools.partial`. items : iterable The items to process by the ``funct``. *args : any Additional positional arguments to send to ``func``. chunks : int The size of each fragment processed multiprocessing before yielding. **kwargs : any Additional keyword arguments to send to ``func``. Yields ------ list Containing the results after each fragment. """ for i in range(0, len(items), chunks): task = items[i: i + chunks] yield list(pool_function(func, task, **kwargs))