
    tf#                        d Z ddlmZ ddlmZmZ ddlmZmZm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ g dZ	  e       ZeZ	 	 	 	 	 	 	 	 	 	 ddZ G d d      Z G d d      Zeddf	 	 	 	 	 	 	 	 	 ddZy)zS
Experimental API for developing split-apply-combine style algorithms on
coolers.

    )annotations)partialreduce)AnyCallableIterableIteratorSequence)Lock   )
MapFunctor)Cooler)get)	partition)r   splitlockc                p     ||      }| ||      }| D ]  } |||      } |S |}| D ]
  } ||      } |S N )funcspreparer   keychunkdatafuncs          Y/var/www/html/software/conda/envs/higlass/lib/python3.12/site-packages/cooler/parallel.pyapply_pipeliner   4   sb     HEu~ 	%Dt$D	% K  	D:D	K    c                      e Zd ZdZ	 	 	 	 	 	 ddZddZddZddZddZ	 	 	 	 ddZ	ddZ
ef	 	 	 dd	Z	 	 	 	 	 	 dd
Zy)MultiplexDataPipea*  
    Create an extendable pipeline of callables to be applied independently to
    each of a collection of inputs and produce a collection of outputs.

    New tasks are appended with the ``pipe`` method. Pipeline execution can be
    multiplexed using any ``map`` implementation, e.g. multiprocessing Pool.map
    or ipyparallel view.map for distributed execution.

    Depending on the ``map`` implementation results may be

    * yielded sequentially or online:
        Python 3 ``map`` or ``itertools.imap``, ``Pool.imap``,
        ``Pool.imap_unordered``

    * gathered and returned once all outputs are finished:
        Python 2 ``map``, ``Pool.map``, ``Pool.map_async``

    The pipeline can be run using one of:

    * ``gather``:
        Results are gathered and combined after all pipelines complete.

    * ``reduce``:
        Results are sequentially folded using a binary operator. This can
        save on memory when using a sequential or online ``map``
        implementation.

    Both methods are blocking, regardless of the blocking behavior of the
    ``map`` implementation (e.g., ``Pool.map_async``).

    Notes
    -----
    Python's multiprocessing module uses Pickle for serialization, which has
    several limitations. Consider using a parallel map implementation that uses
    a more versatile serializer, such as dill or cloudpickle.

    See also
    --------
    http://stackoverflow.com/a/26521507 for a discussion of the differences
    between multiprocessing Pool implementations.

    Examples
    --------
    >>> X = np.arange(30)
    >>> spans = [(0, 10), (10, 20), (20, 30)]
    >>> dp = MultiplexDataPipe(lambda span: X[span[0]:span[1]], spans, map)
    >>> dp = dp.pipe(lambda x: x - 1).pipe(sum)
    >>> dp.gather()
    [35, 135, 235]
    >>> dp.reduce(lambda x, y: x + y, 0)
    405

    c                \    || _         t        |      | _        || _        g | _        d| _        y)aN  

        Parameters
        ----------
        get : callable
            Callable used to be used by workers that fetches the data
            corresponding to any of the provided keys

        keys : iterable
            Keys corresponding to input data

        map : callable
            Implementation of a map functor

        N)r   listkeysmapr   _prepare)selfr   r#   r$   s       r   __init__zMultiplexDataPipe.__init__}   s+    * J	
r   c                    | j                  | j                  | j                  | j                        }t	        | j
                        |_        | j                  |_        |S r   )	__class__r   r#   r$   r"   r   r%   )r&   others     r   __copy__zMultiplexDataPipe.__copy__   s@    txxDHH=4::&r   c                ^    | j                   j                         }|j                  dd        |S )Nr$   )__dict__copypop)r&   ds     r   
__reduce__zMultiplexDataPipe.__reduce__   s'    MM 	eTr   c                4    t        | j                               S r   iterrun)r&   s    r   __iter__zMultiplexDataPipe.__iter__   s    DHHJr   c                    || _         | S )ae  
        Prepend a task that initializes the data for transformation.

        This optional step allows one to keep the original data chunk pristine.
        The callable ``func`` should return an initial object to pass along the
        pipeline for transformation. Subsequent callables in the pipeline will
        take two arguments instead of one:

        * chunk: original data chunk
        * data: transformed data passed along the pipeline

        Parameters
        ----------
        func : function/callable

        Returns
        -------
        A new datapipe with the initializer set.

        )r%   )r&   r   s     r   r   zMultiplexDataPipe.prepare   s    * r   c                    | j                         }|s|rt        |g|i |g}n	 t        |      }|xj                  |z  c_        |S # t        $ r |g}Y %w xY w)a  
        Append new task(s) to the pipeline

        Parameters
        ----------
        func : function/callable or sequence of callables
            If a single function is provided, additional positional and keyword
            arguments can be provided and will be curried into the function.

        Returns
        -------
        A new datapipe with the additional task(s) appended.

        )r+   r   r"   	TypeErrorr   )r&   r   argskwargsr*   addons         r   pipezMultiplexDataPipe.pipe   sf    ( 6T3D3F34ET
 	u  s   A	 	AAc                    t        t        | j                  | j                  | j                        }| j                  || j                        S )zR
        Run the pipeline

        Output depends on map implementation.

        )r   r   r   r%   r   r$   r#   )r&   pipelines     r   r5   zMultiplexDataPipe.run   s5     >4::t}}dhhOxx$)),,r   c                H     |t        | j                               g|i |S )a  
        Run the pipeline and gather outputs

        Parameters
        ----------
        combine : callable, optional
            Callable to consume the output. Default is builtin list.

        Returns
        -------
        Output of ``combine``

        r3   )r&   combiner:   r;   s       r   gatherzMultiplexDataPipe.gather   s$    & tDHHJ'9$9&99r   c                J    t        |t        | j                               |      S )ad  
        Run the pipeline and fold outputs cumulatively as they are returned

        Parameters
        ----------
        binop : binary operation
            A function of two arguments that returns a single value.
        init : object
            The initial value of the accumulation.

        Returns
        -------
        Reduced output

        )r   r4   r5   )r&   binopinits      r   r   zMultiplexDataPipe.reduce   s    ( eT$((*-t44r   N)r   Callable[[KeyType], Any]r#   zIterable[KeyType]r$   r   )returnr    )rG   dict)rG   zIterator[Any])r   zCallable[[Any], Any]rG   r    )r   z0Callable[[Any], Any] | Callable[[Any, Any], Any]rG   r    )rG   zIterable[Any])rA   zCallable[[Iterable], Sequence]rG   zSequence[Any])rD   zCallable[[Any, Any], Any]rE   r   rG   r   )__name__
__module____qualname____doc__r'   r+   r1   r6   r   r=   r5   r"   rB   r   r   r   r   r    r    F   s    4l%   	6
 0>
 
>- 37:/:
 
:*5(5 5 
	5r   r    c                  2    e Zd Z	 	 	 d	 	 	 	 	 	 	 ddZddZy)chunkgetterc                <    || _         || _        || _        || _        y r   )coolerinclude_chromsinclude_binsuse_lock)r&   clrrQ   rR   rS   s        r   r'   zchunkgetter.__init__  s"     ,( r   c                   |\  }}i }	 | j                   rt        j                          | j                  j	                  d      5 }| j
                  rt        |d   d      |d<   | j                  rt        |d   d      |d<   t        |d   ||d      |d<   d d d        | j                   rt        j                          |S # 1 sw Y   +xY w# | j                   rt        j                          w w xY w)NrchromsT)as_dictbinspixels)	rS   r   acquirerP   openrQ   r   rR   release)r&   spanlohir   grps         r   __call__zchunkgetter.__call__   s    B	}}!!#& K#&&&)#h-&FE(O$$$'FT$BE&M"%c(mRT"JhK }}K K }} s$   ;C ACC CC #C1N)FTF)rT   r   rQ   boolrR   rc   rS   rc   )r^   ztuple[int, int]rG   zdict[str, Any])rI   rJ   rK   r'   rb   r   r   r   rN   rN     s=      %!
!
! 
! 	
!
 
!r   rN   i Nc                |    |#t        dt        | j                  d         |      }t        t	        | fi |||      S )Nr   nnz)r   r#   r$   )r   intinfor    rN   )rT   r$   	chunksizespansr;   s        r   r   r   2  sD     }!S%19=&v& r   )
r   zlist[Callable]r   zCallable[[Any], Any] | Noner   rF   r   KeyTyperG   r   )
rT   r   r$   r   rh   rf   ri   z Iterable[tuple[int, int]] | NonerG   r    )rL   
__future__r   	functoolsr   r   typingr   r   r   r	   r
   multiprocessr   _typingr   apir   corer   utilr   __all__r   rj   r   r    rN   r$   r   r   r   r   <module>rt      s   
 # % > >     
(6 v
( 
" 
	
 	$J5 J5Z B .2			  ,	 r   