
    DUf#                        d Z ddlmZ ddlmZmZ ddlmZmZm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ g dZ	  e            ZeZd&dZ G d d          Z G d d          Zeddfd'd%ZdS )(zS
Experimental API for developing split-apply-combine style algorithms on
coolers.

    )annotations)partialreduce)AnyCallableIterableIteratorSequence)Lock   )
MapFunctor)Cooler)get)	partition)r   splitlockfuncslist[Callable]prepareCallable[[Any], Any] | Noner   Callable[[KeyType], Any]keyKeyTypereturnr   c                ~     ||          }| ||          }| D ]} |||          }n|}| D ]} ||          }|S N )r   r   r   r   chunkdatafuncs          L/var/www/html/software/conda/lib/python3.11/site-packages/cooler/parallel.pyapply_pipeliner"   4   su     CHHEwu~~ 	% 	%D4t$$DD	%  	 	D4::DDK    c                  ^    e Zd ZdZd dZd!d
Zd"dZd#dZd$dZd%dZ	d&dZ
efd'dZd(dZdS ))MultiplexDataPipea*  
    Create an extendable pipeline of callables to be applied independently to
    each of a collection of inputs and produce a collection of outputs.

    New tasks are appended with the ``pipe`` method. Pipeline execution can be
    multiplexed using any ``map`` implementation, e.g. multiprocessing Pool.map
    or ipyparallel view.map for distributed execution.

    Depending on the ``map`` implementation results may be

    * yielded sequentially or online:
        Python 3 ``map`` or ``itertools.imap``, ``Pool.imap``,
        ``Pool.imap_unordered``

    * gathered and returned once all outputs are finished:
        Python 2 ``map``, ``Pool.map``, ``Pool.map_async``

    The pipeline can be run using one of:

    * ``gather``:
        Results are gathered and combined after all pipelines complete.

    * ``reduce``:
        Results are sequentially folded using a binary operator. This can
        save on memory when using a sequential or online ``map``
        implementation.

    Both methods are blocking, regardless of the blocking behavior of the
    ``map`` implementation (e.g., ``Pool.map_async``).

    Notes
    -----
    Python's multiprocessing module uses Pickle for serialization, which has
    several limitations. Consider using a parallel map implementation that uses
    a more versatile serializer, such as dill or cloudpickle.

    See also
    --------
    http://stackoverflow.com/a/26521507 for a discussion of the differences
    between multiprocessing Pool implementations.

    Examples
    --------
    >>> X = np.arange(30)
    >>> spans = [(0, 10), (10, 20), (20, 30)]
    >>> dp = MultiplexDataPipe(lambda span: X[span[0]:span[1]], spans, map)
    >>> dp = dp.pipe(lambda x: x - 1).pipe(sum)
    >>> dp.gather()
    [35, 135, 235]
    >>> dp.reduce(lambda x, y: x + y, 0)
    405

    r   r   keysIterable[KeyType]mapr   c                f    || _         t          |          | _        || _        g | _        d| _        dS )aN  

        Parameters
        ----------
        get : callable
            Callable used to be used by workers that fetches the data
            corresponding to any of the provided keys

        keys : iterable
            Keys corresponding to input data

        map : callable
            Implementation of a map functor

        N)r   listr&   r(   r   _prepare)selfr   r&   r(   s       r!   __init__zMultiplexDataPipe.__init__}   s1    * JJ	
r#   r   c                    |                      | j        | j        | j                  }t	          | j                  |_        | j        |_        |S r   )	__class__r   r&   r(   r*   r   r+   )r,   others     r!   __copy__zMultiplexDataPipe.__copy__   s<    txDH==4:&&r#   dictc                d    | j                                         }|                    dd            |S )Nr(   )__dict__copypop)r,   ds     r!   
__reduce__zMultiplexDataPipe.__reduce__   s.    M  	eTr#   Iterator[Any]c                D    t          |                                           S r   iterrun)r,   s    r!   __iter__zMultiplexDataPipe.__iter__   s    DHHJJr#   r    Callable[[Any], Any]c                    || _         | S )ae  
        Prepend a task that initializes the data for transformation.

        This optional step allows one to keep the original data chunk pristine.
        The callable ``func`` should return an initial object to pass along the
        pipeline for transformation. Subsequent callables in the pipeline will
        take two arguments instead of one:

        * chunk: original data chunk
        * data: transformed data passed along the pipeline

        Parameters
        ----------
        func : function/callable

        Returns
        -------
        A new datapipe with the initializer set.

        )r+   )r,   r    s     r!   r   zMultiplexDataPipe.prepare   s    * r#   0Callable[[Any], Any] | Callable[[Any, Any], Any]c                    |                                  }|s|rt          |g|R i |g}n$	 t          |          }n# t          $ r |g}Y nw xY w|xj        |z  c_        |S )a  
        Append new task(s) to the pipeline

        Parameters
        ----------
        func : function/callable or sequence of callables
            If a single function is provided, additional positional and keyword
            arguments can be provided and will be curried into the function.

        Returns
        -------
        A new datapipe with the additional task(s) appended.

        )r1   r   r*   	TypeErrorr   )r,   r    argskwargsr0   addons         r!   pipezMultiplexDataPipe.pipe   s    (  	6 	T3D333F334EET

   us   < AAIterable[Any]c                    t          t          | j        | j        | j                  }|                     || j                  S )zR
        Run the pipeline

        Output depends on map implementation.

        )r   r"   r   r+   r   r(   r&   )r,   pipelines     r!   r=   zMultiplexDataPipe.run   s3     >4:t}dhOOxx$),,,r#   combineCallable[[Iterable], Sequence]Sequence[Any]c                X     |t          |                                           g|R i |S )a  
        Run the pipeline and gather outputs

        Parameters
        ----------
        combine : callable, optional
            Callable to consume the output. Default is builtin list.

        Returns
        -------
        Output of ``combine``

        r;   )r,   rK   rD   rE   s       r!   gatherzMultiplexDataPipe.gather   s4    & wtDHHJJ''9$999&999r#   binopCallable[[Any, Any], Any]initr   c                b    t          |t          |                                           |          S )ad  
        Run the pipeline and fold outputs cumulatively as they are returned

        Parameters
        ----------
        binop : binary operation
            A function of two arguments that returns a single value.
        init : object
            The initial value of the accumulation.

        Returns
        -------
        Reduced output

        )r   r<   r=   )r,   rP   rR   s      r!   r   zMultiplexDataPipe.reduce   s&    ( eT$((**--t444r#   N)r   r   r&   r'   r(   r   )r   r%   )r   r2   )r   r9   )r    r?   r   r%   )r    rA   r   r%   )r   rH   )rK   rL   r   rM   )rP   rQ   rR   r   r   r   )__name__
__module____qualname____doc__r-   r1   r8   r>   r   rG   r=   r*   rO   r   r   r#   r!   r%   r%   F   s        4 4l   6      
          0   >- - - - 37: : : : :*5 5 5 5 5 5r#   r%   c                  &    e Zd Z	 	 	 ddd	ZddZdS )chunkgetterFTclrr   include_chromsboolinclude_binsuse_lockc                >    || _         || _        || _        || _        d S r   )coolerr[   r]   r^   )r,   rZ   r[   r]   r^   s        r!   r-   zchunkgetter.__init__  s&     ,( r#   spantuple[int, int]r   dict[str, Any]c                   |\  }}i }	 | j         rt                                           | j                            d          5 }| j        rt          |d         d          |d<   | j        rt          |d         d          |d<   t          |d         ||d          |d<   d d d            n# 1 swxY w Y   | j         rt                                           n&# | j         rt                                           w w xY w|S )NrchromsT)as_dictbinspixels)	r^   r   acquirer`   openr[   r   r]   release)r,   ra   lohir   grps         r!   __call__zchunkgetter.__call__   sQ   B	} !!#&& K#& G&)#h-&F&F&FE(O$ C$'FT$B$B$BE&M"%c(mRT"J"J"JhK K K K K K K K K K K K K K K }  } s0   :C AB."C .B22C 5B26C #C=N)FTF)rZ   r   r[   r\   r]   r\   r^   r\   )ra   rb   r   rc   )rT   rU   rV   r-   rp   r   r#   r!   rY   rY     sL          %!
! 
! 
! 
! 
!     r#   rY   i NrZ   r   r(   r   	chunksizeintspans Iterable[tuple[int, int]] | Nonec                    |)t          dt          | j        d                   |          }t          t	          | fi |||          S )Nr   nnz)r   r&   r(   )r   rr   infor%   rY   )rZ   r(   rq   rs   rE   s        r!   r   r   2  sX     }!S%119==&&v&&   r#   )
r   r   r   r   r   r   r   r   r   r   )
rZ   r   r(   r   rq   rr   rs   rt   r   r%   )rW   
__future__r   	functoolsr   r   typingr   r   r   r	   r
   multiprocessr   _typingr   apir   corer   utilr   __all__r   r   r"   r%   rY   r(   r   r   r#   r!   <module>r      s}   
 # " " " " " % % % % % % % % > > > > > > > > > > > > > >                              
(
(
(6 tvv
   $J5 J5 J5 J5 J5 J5 J5 J5Z       B .2	      r#   