o
    Nrf#                     @  s   d Z ddlmZ ddlmZmZ ddlmZmZm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ g dZ	 e ZeZd(ddZG dd dZG dd dZeddfd)d&d'ZdS )*zS
Experimental API for developing split-apply-combine style algorithms on
coolers.

    )annotations)partialreduce)AnyCallableIterableIteratorSequence)Lock   )
MapFunctor)Cooler)get)	partition)r   splitlockfuncslist[Callable]prepareCallable[[Any], Any] | Noner   Callable[[KeyType], Any]keyKeyTypereturnr   c                 C  sJ   ||}|d ur||}| D ]}|||}q|S |}| D ]}||}q|S N )r   r   r   r   chunkdatafuncr   r   X/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/cooler/parallel.pyapply_pipeline4   s   
r    c                   @  sn   e Zd ZdZd)dd	Zd*ddZd+ddZd,ddZd-ddZd.ddZ	d/ddZ
efd0d d!Zd1d&d'Zd(S )2MultiplexDataPipea*  
    Create an extendable pipeline of callables to be applied independently to
    each of a collection of inputs and produce a collection of outputs.

    New tasks are appended with the ``pipe`` method. Pipeline execution can be
    multiplexed using any ``map`` implementation, e.g. multiprocessing Pool.map
    or ipyparallel view.map for distributed execution.

    Depending on the ``map`` implementation results may be

    * yielded sequentially or online:
        Python 3 ``map`` or ``itertools.imap``, ``Pool.imap``,
        ``Pool.imap_unordered``

    * gathered and returned once all outputs are finished:
        Python 2 ``map``, ``Pool.map``, ``Pool.map_async``

    The pipeline can be run using one of:

    * ``gather``:
        Results are gathered and combined after all pipelines complete.

    * ``reduce``:
        Results are sequentially folded using a binary operator. This can
        save on memory when using a sequential or online ``map``
        implementation.

    Both methods are blocking, regardless of the blocking behavior of the
    ``map`` implementation (e.g., ``Pool.map_async``).

    Notes
    -----
    Python's multiprocessing module uses Pickle for serialization, which has
    several limitations. Consider using a parallel map implementation that uses
    a more versatile serializer, such as dill or cloudpickle.

    See also
    --------
    http://stackoverflow.com/a/26521507 for a discussion of the differences
    between multiprocessing Pool implementations.

    Examples
    --------
    >>> X = np.arange(30)
    >>> spans = [(0, 10), (10, 20), (20, 30)]
    >>> dp = MultiplexDataPipe(lambda span: X[span[0]:span[1]], spans, map)
    >>> dp = dp.pipe(lambda x: x - 1).pipe(sum)
    >>> dp.gather()
    [35, 135, 235]
    >>> dp.reduce(lambda x, y: x + y, 0)
    405

    r   r   keysIterable[KeyType]mapr   c                 C  s&   || _ t|| _|| _g | _d| _dS )aN  

        Parameters
        ----------
        get : callable
            Callable used to be used by workers that fetches the data
            corresponding to any of the provided keys

        keys : iterable
            Keys corresponding to input data

        map : callable
            Implementation of a map functor

        N)r   listr"   r$   r   _prepare)selfr   r"   r$   r   r   r   __init__}   s
   

zMultiplexDataPipe.__init__r   c                 C  s,   |  | j| j| j}t| j|_| j|_|S r   )	__class__r   r"   r$   r%   r   r&   )r'   otherr   r   r   __copy__   s   zMultiplexDataPipe.__copy__dictc                 C  s   | j  }|dd  |S )Nr$   )__dict__copypop)r'   dr   r   r   
__reduce__   s   
zMultiplexDataPipe.__reduce__Iterator[Any]c                 C  s   t |  S r   iterrun)r'   r   r   r   __iter__   s   zMultiplexDataPipe.__iter__r   Callable[[Any], Any]c                 C  s
   || _ | S )ae  
        Prepend a task that initializes the data for transformation.

        This optional step allows one to keep the original data chunk pristine.
        The callable ``func`` should return an initial object to pass along the
        pipeline for transformation. Subsequent callables in the pipeline will
        take two arguments instead of one:

        * chunk: original data chunk
        * data: transformed data passed along the pipeline

        Parameters
        ----------
        func : function/callable

        Returns
        -------
        A new datapipe with the initializer set.

        )r&   )r'   r   r   r   r   r      s   zMultiplexDataPipe.prepare0Callable[[Any], Any] | Callable[[Any, Any], Any]c                 O  sb   |   }|s|rt|g|R i |g}nzt|}W n ty'   |g}Y nw | j|7  _|S )a  
        Append new task(s) to the pipeline

        Parameters
        ----------
        func : function/callable or sequence of callables
            If a single function is provided, additional positional and keyword
            arguments can be provided and will be curried into the function.

        Returns
        -------
        A new datapipe with the additional task(s) appended.

        )r+   r   r%   	TypeErrorr   )r'   r   argskwargsr*   Zaddonr   r   r   pipe   s   
zMultiplexDataPipe.pipeIterable[Any]c                 C  s"   t t| j| j| j}| || jS )zR
        Run the pipeline

        Output depends on map implementation.

        )r   r    r   r&   r   r$   r"   )r'   Zpipeliner   r   r   r5      s   zMultiplexDataPipe.runcombineCallable[[Iterable], Sequence]Sequence[Any]c                 O  s   |t |  g|R i |S )a  
        Run the pipeline and gather outputs

        Parameters
        ----------
        combine : callable, optional
            Callable to consume the output. Default is builtin list.

        Returns
        -------
        Output of ``combine``

        r3   )r'   r>   r:   r;   r   r   r   gather   s   zMultiplexDataPipe.gatherbinopCallable[[Any, Any], Any]initr   c                 C  s   t |t|  |S )ad  
        Run the pipeline and fold outputs cumulatively as they are returned

        Parameters
        ----------
        binop : binary operation
            A function of two arguments that returns a single value.
        init : object
            The initial value of the accumulation.

        Returns
        -------
        Reduced output

        )r   r4   r5   )r'   rB   rD   r   r   r   r      s   zMultiplexDataPipe.reduceN)r   r   r"   r#   r$   r   )r   r!   )r   r,   )r   r2   )r   r7   r   r!   )r   r8   r   r!   )r   r=   )r>   r?   r   r@   )rB   rC   rD   r   r   r   )__name__
__module____qualname____doc__r(   r+   r1   r6   r   r<   r5   r%   rA   r   r   r   r   r   r!   F   s    
6





r!   c                   @  s(   e Zd Z			ddd	d
ZdddZdS )chunkgetterFTclrr   include_chromsboolinclude_binsuse_lockc                 C  s   || _ || _|| _|| _d S r   )coolerrK   rM   rN   )r'   rJ   rK   rM   rN   r   r   r   r(     s   
zchunkgetter.__init__spantuple[int, int]r   dict[str, Any]c                 C  s   |\}}i }zM| j rt  | jd.}| jr"t|d dd|d< | jr/t|d dd|d< t|d ||dd|d< W d    n1 sEw   Y  W | j rRt  |S | j r\t  w w )NrZchromsT)as_dictZbinsZpixels)	rN   r   acquirerO   openrK   r   rM   release)r'   rP   lohir   grpr   r   r   __call__   s&   
zchunkgetter.__call__N)FTF)rJ   r   rK   rL   rM   rL   rN   rL   )rP   rQ   r   rR   )rE   rF   rG   r(   r[   r   r   r   r   rI     s    rI   i NrJ   r   r$   r   	chunksizeintspans Iterable[tuple[int, int]] | Nonec                 K  s8   |d u rt dt| jd |}tt| fi |||dS )Nr   Znnz)r   r"   r$   )r   r]   infor!   rI   )rJ   r$   r\   r^   r;   r   r   r   r   2  s   r   )
r   r   r   r   r   r   r   r   r   r   )
rJ   r   r$   r   r\   r]   r^   r_   r   r!   )rH   
__future__r   	functoolsr   r   typingr   r   r   r   r	   Zmultiprocessr
   Z_typingr   apir   corer   utilr   __all__r   r   r    r!   rI   r$   r   r   r   r   r   <module>   s*    
 N!