
    tf                       d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dlmZ d dlmZ d d	lmZmZmZmZ d d
lmZ d dlmZ d dlmZmZmZm Z   G d d      Z! G d de      Z" G d de"      Z# G d de"      Z$ G d de      Z%d$dZ&d Z'd Z( G d de      Z) G d de)      Z* G d de      Z+ G d  d!e      Z, G d" d#e      Z-y)%    annotationsN)defaultdict)Callable)product)Any)map)tokenize)	BlockwiseBlockwiseDepBlockwiseDepDictblockwise_token)flatten)Layer)applycached_cumsumconcreteinsertc                      e Zd ZdZd Zd Zy)CallableLazyImportzFunction Wrapper for Lazy Importing.

    This Class should only be used when materializing a graph
    on a distributed scheduler.
    c                    || _         y N)function_path)selfr   s     U/var/www/html/software/conda/envs/higlass/lib/python3.12/site-packages/dask/layers.py__init__zCallableLazyImport.__init__"   s
    *    c                >    ddl m}   || j                        |i |S )Nr   )import_term)distributed.utilsr   r   )r   argskwargsr   s       r   __call__zCallableLazyImport.__call__%   s"    1.{4--.???r   N)__name__
__module____qualname____doc__r   r#    r   r   r   r      s    +@r   r   c                  D    e Zd ZU dZded<   ded<   dZded<   dd	Zdd
Zy)ArrayBlockwiseDepzg
    Blockwise dep for array-likes, which only needs chunking
    information to compute its data.
    tuple[tuple[int, ...], ...]chunkstuple[int, ...]	numblocksFboolproduces_tasksc                N    || _         t        d |D              | _        d| _        y )Nc              3  2   K   | ]  }t        |        y wr   )len).0chunks     r   	<genexpr>z-ArrayBlockwiseDep.__init__.<locals>.<genexpr>>   s     >es5z>s   F)r,   tupler.   r0   )r   r,   s     r   r   zArrayBlockwiseDep.__init__<   s#    >v>>#r   c                    t        d      )Nz%Subclasses must implement __getitem__)NotImplementedErrorr   idxs     r   __getitem__zArrayBlockwiseDep.__getitem__A   s    !"IJJr   Nr,   r+   r;   r-   )r$   r%   r&   r'   __annotations__r0   r   r<   r(   r   r   r*   r*   2   s*    
 (' ND $
Kr   r*   c                      e Zd ZdZddZy)ArrayChunkShapeDepz(Produce chunk shapes given a chunk indexc                N    t        d t        || j                        D              S )Nc              3  ,   K   | ]  \  }}||     y wr   r(   )r4   ir5   s      r   r6   z1ArrayChunkShapeDep.__getitem__.<locals>.<genexpr>I   s     D(!UU1XDs   )r7   zipr,   r:   s     r   r<   zArrayChunkShapeDep.__getitem__H   s    Dc#t{{.CDDDr   Nr>   )r$   r%   r&   r'   r<   r(   r   r   rA   rA   E   s    2Er   rA   c                  8     e Zd ZU dZded<   d fdZddZ xZS )ArraySliceDepz>Produce slice(s) into the full-sized array given a chunk indexr+   startsc                R    t         |   |       t        d |D              | _        y )Nc              3  6   K   | ]  }t        |d         yw)T)initial_zeroN)r   )r4   cs     r   r6   z)ArraySliceDep.__init__.<locals>.<genexpr>S   s     PAM!$??P   )superr   r7   rH   )r   r,   	__class__s     r   r   zArraySliceDep.__init__Q   s"     PPPr   c                r    t        d t        || j                        D              }t        d |D              S )Nc              3  <   K   | ]  \  }}||   ||d z      f  yw)   Nr(   )r4   rD   starts      r   r6   z,ArraySliceDep.__getitem__.<locals>.<genexpr>V   s%     SEU1XuQU|,S   c              3  6   K   | ]  }t        g |d    y wr   slice)r4   ss     r   r6   z,ArraySliceDep.__getitem__.<locals>.<genexpr>W   s     2U_A_t_2rM   )r7   rE   rH   )r   r;   locs      r   r<   zArraySliceDep.__getitem__U   s.    SSdkk=RSS2c222r   r=   )r;   r7   )r$   r%   r&   r'   r?   r   r<   __classcell__rO   s   @r   rG   rG   L   s    H''Q3r   rG   c                  d     e Zd ZdZ fdZd Zed        Zd Zd Z	d Z
d Zd	 Zd
 ZddZ xZS )ArrayOverlapLayera\  Simple HighLevelGraph array overlap layer.

    Lazily computed High-level graph layer for a array overlap operations.

    Parameters
    ----------
    name : str
        Name of new output overlap array.
    array : Dask array
    axes: Mapping
        Axes dictionary indicating overlap in each dimension,
        e.g. ``{'0': 1, '1': 1}``
    c                v    t         |           || _        || _        || _        || _        || _        d | _        y r   )rN   r   nameaxesr,   r.   token_cached_keys)r   r_   r`   r,   r.   ra   rO   s         r   r   zArrayOverlapLayer.__init__i   s;     			"
 r   c                "    d| j                    dS )NzArrayOverlapLayer<name=''r_   r   s    r   __repr__zArrayOverlapLayer.__repr__y   s    )$))A66r   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S z$Materialize full dict representation_cached_dicthasattrrj   _construct_graphr   dsks     r   _dictzArrayOverlapLayer._dict|   <     4($$$'')C #D   r   c                     | j                   |   S r   rp   r   keys     r   r<   zArrayOverlapLayer.__getitem__       zz#r   c                ,    t        | j                        S r   iterrp   rf   s    r   __iter__zArrayOverlapLayer.__iter__       DJJr   c                ,    t        | j                        S r   r3   rp   rf   s    r   __len__zArrayOverlapLayer.__len__       4::r   c                    t        | d      S Nrj   rl   rf   s    r   is_materializedz!ArrayOverlapLayer.is_materialized       t^,,r   c                "    | j                         S r   )keysrf   s    r   get_output_keysz!ArrayOverlapLayer.get_output_keys   s    yy{r   c                    | j                   | j                   S | j                  | j                  | j                  cfd        x| _         }|S )Nc                     sfgS t        |       }|dz   t              k(  r%t        |         D cg c]  }f| z   |fz    }}|S t        |         D cg c]  } | |fz     }}|S c c}w c c}w NrR   )r3   range)r!   indrD   resultr,   r   r_   r.   s       r   r   z*ArrayOverlapLayer._dask_keys.<locals>.keys   s    y d)CQw#i.(9>y~9NOA4'D.A4/OO M 9>in8MN1$.NNM PNs   A-A2)rb   r_   r,   r.   )r   r   r,   r   r_   r.   s     @@@@r   
_dask_keyszArrayOverlapLayer._dask_keys   sR    ($$$"&))T[[$..fi	 &*V+Fr   c                   | j                   }| j                  }| j                  }| j                         }d| j                  z   }d| j                  z   }|rt        d      }nddlm} t        t        t        |            }	t        j                  t        |	|      }
t        j                  |t         t        |
      t        t               t        j"                  t              }i }i }|D ]N  }t%        |f|z   |      }|f|z   |k7  r
|||f|z   <   &|f|z   ||f|z   <   |t&         |
d|z   |      ff||f|z   <   P t        j(                  ||      }|S )	z/Construct graph for a simple overlap operation.zgetitem-zoverlap-zdask.array.core.concatenate3r   )concatenate3)dimsr`   r   re   )r`   r,   r_   r   ra   r   dask.array.corer   listr	   r3   	functoolspartial_expand_keys_around_centertoolzpiper   concatfractional_slicer   merge)r   deserializingr`   r,   r_   	dask_keysgetitem_nameoverlap_namer   r   expand_key2interior_keysinterior_slicesoverlap_blocksk
frac_slicero   s                    r   rm   z"ArrayOverlapLayer._construct_graph   sO   yyyyOO%	!DJJ.!DJJ. ..LML 5CV$%''&T

 

wK 0#g,d
  		A)4'A+t<Jw{j(7A! 348<w{! 34 {7Q;\JK723		 kk/>:
r   F)r$   r%   r&   r'   r   rg   propertyrp   r<   rz   r~   r   r   r   rm   rZ   r[   s   @r   r]   r]   Z   sI    ! 7 ! ! -&)r   r]   c           
        fd}g }t        | dd       D ]2  \  }}d}|dkD  r|dz  }||   dz
  k  r|dz  }|j                  |       4 t        | dd       D cg c]-  \  }}t        |j                  |d      f      r	 |||      n|g/ }	}}||gg|	z   }	t	        t        |	       }
t        |      D cg c]%  \  }}t        |j                  |d      f      r|nd' }}}t        ||
      }|S c c}}w c c}}w )a  Get all neighboring keys around center

    Parameters
    ----------
    k: Key
        The key around which to generate new keys
    dims: Sequence[int]
        The number of chunks in each dimension
    name: Option[str]
        The name to include in the output keys, or none to include no name
    axes: Dict[int, int]
        The axes active in the expansion.  We don't expand on non-active axes

    Examples
    --------
    >>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
     [('y',   2, 2.1), ('y',   2, 3), ('y',   2, 3.9)],
     [('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]

    >>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y',   0, 3.1), ('y',   0,   4)],
     [('y', 0.9, 3.1), ('y', 0.9,   4)]]
    c                    g }|dz
  dkD  r|j                  |dz
         |j                  |       |dz   |    dz
  k  r|j                  |dz          |S )Ng?r   rR   )append)rD   r   rvr   s      r   indsz(_expand_keys_around_center.<locals>.inds   sW    9q=IIcCi 
		#9tAw{"IIcCi 	r   rR   Nr   )	enumerater   anygetr   r   reshapelist)r   r   r_   r`   r   shaperD   r   numr!   seqdshape2r   s    `            r   r   r      s-   4 EAabE" 371HCa11HCS IRRSTUTVRWHX>DaTXXa^-.QSE9D  x$
w~
C=Fu=MNTQ3A()aq0NFN%FM Os   2C5:*C;c                    t        |       dk(  rt        |      S t        t        |      | d   z        }t        j                  ||      D cg c]  }t        | dd |       c}S c c}w )zgReshape iterator to nested shape

    >>> reshapelist((2, 3), range(6))
    [[0, 1, 2], [3, 4, 5]]
    rR   r   N)r3   r   intr   	partitionr   )r   r   nparts       r   r   r     s\     5zQCyC58#$9>C9PQE!"It,QQQs   A%c                X   | d   ft        d | dd D              z   }g }t        t        | dd |dd             D ]  \  }\  }}|j                  |d      }t	        |t               r|d   }|d   }	n|}|}	||k(  r|j                  t        ddd             \||k  r|	r|j                  t        d|	             ||kD  r|r|j                  t        | d             |j                  t        dd              t        |      }t        d |D              r| S t        j                  ||fS )a  

    >>> fractional_slice(('x', 5.1), {0: 2})
    (<built-in function getitem>, ('x', 5), (slice(-2, None, None),))

    >>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))

    >>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
    r   c              3  D   K   | ]  }t        t        |              y wr   )r   round)r4   rD   s     r   r6   z#fractional_slice.<locals>.<genexpr>$  s      A1U1X As    rR   Nc              3  <   K   | ]  }|t        d d d       k(    y wr   rV   )r4   r   s     r   r6   z#fractional_slice.<locals>.<genexpr>:  s     
;c3%dD))
;rT   )
r7   r   rE   r   
isinstancer   rW   alloperatorgetitem)
taskr`   roundedindexrD   trdepth
left_depthright_depths
             r   r   r     s'    Awj5 AQR AAAGEs48WQR[9: &	6AqAeU#qJ(KJK6LLtT401U{LLq+./UzLL
{D12LLq!%!&" %LE

;U
;;  '511r   c                       e Zd ZdZ	 	 d fd	Zd Zd Zd Zd Ze	d        Z
d Zd	 Zd
 Zd ZddZd Zd ZddZ xZS )SimpleShuffleLayera  Simple HighLevelGraph Shuffle layer

    High-level graph layer for a simple shuffle operation in which
    each output partition depends on all input partitions.

    Parameters
    ----------
    name : str
        Name of new shuffled output collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    npartitions : int
        Number of output partitions.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    c
                   || _         || _        || _        || _        || _        || _        || _        |xs t        |      | _        d| j                   z   | _	        |	xs i }	d | _
        d|	vr| j                  |	d<   t        
| 5  |	       y )Nsplit-priorityr   )r_   columnnpartitionsnpartitions_inputignore_index
name_input
meta_inputr   	parts_out
split_name_split_keys_key_priorityrN   r   )r   r_   r   r   r   r   r   r   r   r   rO   s             r   r   zSimpleShuffleLayer.__init__d  s     	&!2($$"8eK&8"TYY. "'R[(&*&8&8K
#[1r   c                N    t        |t              sJ |d   | j                  k(  ryy)Nr   rR   )r   r7   r   rt   s     r   r   z SimpleShuffleLayer._key_priority  s(    #u%%%q6T__$r   c                X    | j                   D ch c]  }| j                  |f c}S c c}w r   r   r_   r   r   s     r   r   z"SimpleShuffleLayer.get_output_keys  #    .2nn=dD!===   'c                N    dj                  | j                  | j                        S )Nz-SimpleShuffleLayer<name='{}', npartitions={}>)formatr_   r   rf   s    r   rg   zSimpleShuffleLayer.__repr__  s$    >EEIIt''
 	
r   c                    t        | d      S r   r   rf   s    r   r   z"SimpleShuffleLayer.is_materialized  r   r   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S ri   rk   rn   s     r   rp   zSimpleShuffleLayer._dict  rq   r   c                     | j                   |   S r   rs   rt   s     r   r<   zSimpleShuffleLayer.__getitem__  rv   r   c                ,    t        | j                        S r   rx   rf   s    r   rz   zSimpleShuffleLayer.__iter__  r{   r   c                ,    t        | j                        S r   r}   rf   s    r   r~   zSimpleShuffleLayer.__len__  r   r   c                    t               }|D ])  }	 |\  }}|| j                  k7  r|j                  |       + |S # t        $ r Y 9w xY wz4Simple utility to convert keys to partition indices.set
ValueErrorr_   addr   r   partsru   _name_parts         r   _keys_to_partsz!SimpleShuffleLayer._keys_to_parts  ^     	C"u 		!IIe	      ;	AAc           	         t        t              }|xs | j                  |      }|D ]H  }|| j                  |fxx   t	        | j
                        D ch c]  }| j                  |f c}z  cc<   J |S c c}w )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        )r   r   r   r_   r   r   r   )r   r   r   depsr   rD   s         r   _cull_dependenciesz%SimpleShuffleLayer._cull_dependencies  s~     3:!4!4T!:	 	D$))T"#.3D4J4J.K()*!$( #	 (s   A4c           
         t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  |      S Nr   )r   r_   r   r   r   r   r   r   r   r   s     r   _cullzSimpleShuffleLayer._cull  sI    !IIKK""OOOO	
 		
r   c                    | j                  |      }| j                  ||      }|t        | j                        k7  r| j	                  |      }||fS | |fS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   r   r   r   r   r   r   r   all_keysr   culled_depsculled_layers         r   cullzSimpleShuffleLayer.cull  ^     ''-	--di-HDNN++::i0L,,$$r   c           
     @   d| j                   z   }|rt        d      }t        d      }nddlm} ddlm} i }| j                  D ]  }t        | j                        D cg c]  }| j                  ||f }}||| j                  f|| j                   |f<   |D ]  \  }	}
}t        j                  ||f|
f|| j                  |
|f<   ||f|vs3|| j                  |f| j                  d| j                  | j                  | j                  | j                  f|||f<     |S c c}w )z/Construct graph for a simple shuffle operation.group-dask.dataframe.core._concat$dask.dataframe.shuffle.shuffle_groupr   _concatshuffle_group)r_   r   dask.dataframe.corer	  dask.dataframe.shuffler  r   r   r   r   r   r   r   r   r   r   )r   r   shuffle_group_nameconcat_funcshuffle_group_funcro   part_outpart_in_concat_list_	_part_out_part_ins               r   rm   z#SimpleShuffleLayer._construct_graph  sZ    &		1 --JKK!36"
 CR 	H  %T%;%;< (G4L 
 !!*CH%&
 +7 &9h$$'2?T__i:;
 '1<*(3(((())((	;C+X67	8 
7s   DNNr   r   )r$   r%   r&   r'   r   r   r   rg   r   r   rp   r<   rz   r~   r   r   r   r  rm   rZ   r[   s   @r   r   r   G  sg    J (2T>

- ! ! 

% .r   r   c                  D     e Zd ZdZ	 	 d fd	Zd ZddZd Zd	dZ xZ	S )
ShuffleLayera"  Shuffle-stage HighLevelGraph layer

    High-level graph layer corresponding to a single stage of
    a multi-stage inter-partition shuffle operation.

    Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)

    Parameters
    ----------
    name : str
        Name of new (partially) shuffled collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    inputs : list of tuples
        Each tuple dictates the data movement for a specific partition.
    stage : int
        Index of the current shuffle stage.
    npartitions : int
        Number of output partitions for the full (multi-stage) shuffle.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    k : int
        A partition is split into this many groups during each stage.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    c                    || _         || _        || _        t        |   ||||||	|
|xs t        t        |            |	       y )N)r   r   )inputsstagensplitsrN   r   r   r3   )r   r_   r   r  r  r   r   r  r   r   r   r   r   rO   s                r   r   zShuffleLayer.__init__A  sU     
55V#5# 	 
	
r   c                z    dj                  | j                  | j                  | j                  | j                        S )Nz=ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>)r   r_   r  r  r   rf   s    r   rg   zShuffleLayer.__repr___  s0    NUUIItzz4<<1A1A
 	
r   c                8   t        t              }|xs | j                  |      }t        | j                        D ci c]  \  }}||
 }}}|D ]  }| j                  |   }t        | j                        D ]  }	t        || j                  |	      }
||
   }| j                  dk(  r@|| j                  k\  r1|| j                  |f   j                  d| j                  z   |
df       n|| j                  |f   j                  | j                  |f         |S c c}}w )zqDetermine the necessary dependencies to produce `keys`.

        Does not require graph materialization.
        r   r  empty)r   r   r   r   r  r   r  r   r  r   r_   r   r   )r   r   r   r   rD   inpinp_part_mapr   outr   _inpr   s               r   r   zShuffleLayer._cull_dependenciesd  s   
 3:!4!4T!:	-6t{{-CD61cQDD 	JD++d#C4<<( Jc4::q1$T*::?u0F0F'F$))T*+//DII1EtW0UV$))T*+//%0HIJ	J  Es   Dc                    t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  | j                  | j                  | j                  |      S r   )r  r_   r   r  r  r   r   r  r   r   r   r   s     r   r   zShuffleLayer._cullw  s^    IIKKKKJJ""LLOOOO
 	
r   c           
        d| j                   z   }|rt        d      }t        d      }nddlm} ddlm} i }t        | j                        D ci c]  \  }}||
 }}}| j                  D ]g  }	| j                  |	   }
g }t        | j                        D ]F  }t        |
| j                  |      }|
| j                     }|j                  | j                  ||f       H ||| j                  f|| j                   |	f<   |D ]  \  }}}t         j"                  ||f|f|| j                  ||f<   ||f|vs3||   }| j                  dk(  r3|| j$                  k  r| j&                  |f}n#||df}| j(                  ||<   n| j&                  |f}||| j*                  | j                  | j                  | j$                  | j                  | j,                  f|||f<    j |S c c}}w )z2Construct graph for a "rearrange-by-column" stage.r  r  r  r   r  r
  r   )r_   r   r  r	  r  r  r   r  r   r   r  r   r  r   r   r   r   r   r   r   r   r   r   )r   r   r  r  r  ro   rD   r!  r"  r   r#  r  r$  _idxr  r   	input_keys                    r   rm   zShuffleLayer._construct_graph  s    &		1 --JKK!36"
 CR-6t{{-CD61cQDDNN 0	D++d#CL4<<( Cc4::q14::##T__dD$AB	C !!&CD!" ". 4$$'.6T__dD12 '-S8(.EzzQ 4#9#99)-%(@I *<T7(KI-1__C	N%)__e$<	 +!

..))((	7C+T23-#0	d 
g Es   Gr  r   r   )
r$   r%   r&   r'   r   rg   r   r   rm   rZ   r[   s   @r   r  r    s+    "` 
<

&
Er   r  c                       e Zd ZdZ	 	 	 	 d fd	Zd Zd Zd Zed        Z	d Z
d Zd	 Zd
 Zed        ZddZd Zd ZddZ xZS )BroadcastJoinLayera;  Broadcast-based Join Layer

    High-level graph layer for a join operation requiring the
    smaller collection to be broadcasted to every partition of
    the larger collection.

    Parameters
    ----------
    name : str
        Name of new (joined) output collection.
    lhs_name: string
        "Left" DataFrame collection to join.
    lhs_npartitions: int
        Number of partitions in "left" DataFrame collection.
    rhs_name: string
        "Right" DataFrame collection to join.
    rhs_npartitions: int
        Number of partitions in "right" DataFrame collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations.
    **merge_kwargs : **dict
        Keyword arguments to be passed to chunkwise merge func.
    c                   t         |   |       || _        || _        || _        || _        || _        || _        |xs t        t        | j                              | _
        t        |	t              rt        |	      n|	| _        t        |
t              rt        |
      n|
| _        || _        | j                   j#                  d      | _        | j                  | j                   d<   | j                  | j                   d<   y )Nr   howleft_onright_on)rN   r   r_   r   lhs_namelhs_npartitionsrhs_namerhs_npartitionsr   r   r   r   r   r7   r-  r.  merge_kwargsr   r,  )r   r_   r   r/  r0  r1  r2  r   r   r-  r.  r3  rO   s               r   r   zBroadcastJoinLayer.__init__  s     	[1	& . ."Bc%0@0@*A&B)3GT)BuW~+5h+Eh8($$((/'+||)$(,*%r   c                X    | j                   D ch c]  }| j                  |f c}S c c}w r   r   r   s     r   r   z"BroadcastJoinLayer.get_output_keys  r   r   c                z    dj                  | j                  | j                  | j                  | j                        S )Nz5BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>)r   r_   r,  r/  r1  rf   s    r   rg   zBroadcastJoinLayer.__repr__	  s.    FMMIItxx
 	
r   c                    t        | d      S r   r   rf   s    r   r   z"BroadcastJoinLayer.is_materialized  r   r   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S ri   rk   rn   s     r   rp   zBroadcastJoinLayer._dict  rq   r   c                     | j                   |   S r   rs   rt   s     r   r<   zBroadcastJoinLayer.__getitem__  rv   r   c                ,    t        | j                        S r   rx   rf   s    r   rz   zBroadcastJoinLayer.__iter__  r{   r   c                ,    t        | j                        S r   r}   rf   s    r   r~   zBroadcastJoinLayer.__len__!  r   r   c                    t               }|D ])  }	 |\  }}|| j                  k7  r|j                  |       + |S # t        $ r Y 9w xY wr   r   r   s         r   r   z!BroadcastJoinLayer._keys_to_parts$  r   r   c                    | j                   | j                  k  r.| j                  | j                   | j                  | j                  fS | j                  | j                  | j                  | j
                  fS r   )r0  r2  r/  r1  r.  r-  rf   s    r   _broadcast_planz"BroadcastJoinLayer._broadcast_plan1  sl     $"6"66 $$	  $$	 r   c           	     (   | j                   dd \  }}}t        t              }|xs | j                  |      }|D ]P  }|| j                  |fxx   t        |      D ch c]  }||f c}z  cc<   || j                  |fxx   ||fhz  cc<   R |S c c}w )zDetermine the necessary dependencies to produce `keys`.

        For a broadcast join, output partitions always depend on
        all partitions of the broadcasted collection, but only one
        partition of the "other" collection.
        N   )r=  r   r   r   r_   r   )	r   r   r   
bcast_name
bcast_size
other_namer   r   rD   s	            r   r   z%BroadcastJoinLayer._cull_dependenciesK  s     .2-A-A"1-E*
J
3:!4!4T!:	 	D$))T"#zAR'SAQ'SS#$))T"#T"( #	
 	 (Ts   Bc                    t        | j                  | j                  | j                  | j                  | j
                  | j                  f| j                  |d| j                  S )N)r   r   )	r*  r_   r   r/  r0  r1  r2  r   r3  r   s     r   r   zBroadcastJoinLayer._cull^  sb    !IIMM  MM  

 ((

 

 
	
r   c                    | j                  |      }| j                  ||      }|t        | j                        k7  r| j	                  |      }||fS | |fS )a  Cull a BroadcastJoinLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   r   r   s         r   r  zBroadcastJoinLayer.cullk  r  r   c                   d| j                   z   }d| j                   z   }|r"t        d      }t        d      }t        d      }nddlm} ddlm} dd	lm} | j                  \  }}}	}
| j                  | j                  k  rd
nd}i }| j                  D ]  }| j                  dk7  r||	|f|
|f|||f<   g }t        |      D ]p  }| j                  dk7  rt        j                  ||f|fn|	|f||fg}|d
k(  r|j                          |||f}t        ||| j                   f||<   |j#                  |       r ||f|| j                   |f<    |S )z/Construct graph for a broadcast join operation.zinter-r   z%dask.dataframe.multi._split_partitionz$dask.dataframe.multi._concat_wrapperz)dask.dataframe.multi._merge_chunk_wrapperr   )_concat_wrapper)_merge_chunk_wrapper)_split_partitionleftrightinner)r_   r   dask.dataframe.multirF  rG  rH  r=  r0  r2  r   r,  r   r   r   reverser   r3  r   )r   r   
inter_namer   split_partition_funcr  merge_chunk_funcr@  rA  rB  other_on
bcast_sidero   rD   r  j_merge_args	inter_keys                     r   rm   z#BroadcastJoinLayer._construct_graph{  s    		)
		)
 $67$  --STK1; 
 LUU 8<7K7K4
J
H#33d6J6JJVPW
  ,	>Axx7"(O	(ZO$ L:& / xx7*	 !((#Q %aO	 '  '')'A.	$%%	"I ##I.//4 $/"=CAY,	>\ 
r   )NNNNr   r   )r$   r%   r&   r'   r   r   rg   r   r   rp   r<   rz   r~   r   r=  r   r   r  rm   rZ   r[   s   @r   r*  r*    s|    D 6:>

- ! !   2&
% Mr   r*  c                  H     e Zd ZdZ	 	 	 	 d fd	Zed        Zd Zd Z xZ	S )DataFrameIOLayera  DataFrame-based Blockwise Layer with IO

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    columns : str, list or None
        Field name(s) to read in as columns in the output.
    inputs : list or BlockwiseDep
        List of arguments to be passed to ``io_func`` so
        that the materialized task to produce partition ``i``
        will be: ``(<io_func>, inputs[i])``.  Note that each
        element of ``inputs`` is typically a tuple of arguments.
    io_func : callable
        A callable function that takes in a single tuple
        of arguments, and outputs a DataFrame partition.
        Column projection will be supported for functions
        that satisfy the ``DataFrameIOFunction`` protocol.
    label : str (optional)
        String to use as a prefix in the place-holder collection
        name. If nothing is specified (default), "subset-" will
        be used.
    produces_tasks : bool (optional)
        Whether one or more elements of `inputs` is expected to
        contain a nested task. This argument in only used for
        serialization purposes, and will be deprecated in the
        future. Default is False.
    creation_info: dict (optional)
        Dictionary containing the callable function ('func'),
        positional arguments ('args'), and key-word arguments
        ('kwargs') used to produce the dask collection with
        this underlying ``DataFrameIOLayer``.
    annotations: dict (optional)
        Layer annotations to pass through to Blockwise.
    c	                   || _         || _        || _        || _        || _        || _        || _        || _        t        |t              s@t        t        | j                        D 	
ci c]	  \  }	}
|	f|
 c}
}	| j
                        }n|}| j                   |t        d      fi}t        | 9  | j                   d||dfgi |       y c c}
}	w )N)r0   r   rD   )outputoutput_indicesro   indicesr.   r   )r_   _columnsr  io_funclabelr0   r   creation_infor   r   r   r   r   rN   r   )r   r_   columnsr  r]  r^  r0   r_  r   rD   r!  
io_arg_mapro   rO   s                r   r   zDataFrameIOLayer.__init__  s     	
,&*&,/))24;;)?@vq#!s@#22J
  J yy7OA$67899 #&'# 	 	
 As   'C
c                    | j                   S )z(Current column projection for this layer)r\  rf   s    r   r`  zDataFrameIOLayer.columns  s     }}r   c           	        ddl m} t        |      }| j                  $t	        | j                        j                  |      rt        | j                  |      r| j                  j                  |      }n| j                  }t        | j                  xs ddz   t        | j                  |      z   || j                  || j                  | j                  | j                        }|S | S )zProduce a column projection for this IO layer.
        Given a list of required output columns, this method
        returns the projected layer.
        r   )DataFrameIOFunctionsubset-)r^  r0   r   )dask.dataframe.io.utilsrd  r   r`  r   
issupersetr   r]  project_columnsrW  r^  r
   r_   r  r0   r   )r   r`  rd  r]  layers        r   ri  z DataFrameIOLayer.project_columns  s    
 	@w-<<3t||#4#?#?#H $,,(;<,,66w?,,$'x3.$))W1MMjj#22 ,,E L Kr   c                v    dj                  | j                  t        | j                        | j                        S )Nz3DataFrameIOLayer<name='{}', n_parts={}, columns={}>)r   r_   r3   r  r`  rf   s    r   rg   zDataFrameIOLayer.__repr__<  s-    DKKIIs4;;'
 	
r   )NFNN)
r$   r%   r&   r'   r   r   r`  ri  rg   rZ   r[   s   @r   rW  rW    s;    "T &
P  >
r   rW  c                  <    e Zd ZU dZded<   ded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   	 	 	 	 	 	 d$	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d% fdZdddZd&dZd Zd Z	d Z
d Zd Zed        Zd Zd Zd  Zd! Zd" Zd# Z xZS )'DataFrameTreeReductionag  DataFrame Tree-Reduction Layer

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    name_input : str
        Name of the input layer that is being reduced.
    npartitions_input : str
        Number of partitions in the input layer.
    concat_func : callable
        Function used by each tree node to reduce a list of inputs
        into a single output value. This function must accept only
        a list as its first positional argument.
    tree_node_func : callable
        Function used on the output of ``concat_func`` in each tree
        node. This function must accept the output of ``concat_func``
        as its first positional argument.
    finalize_func : callable, optional
        Function used in place of ``tree_node_func`` on the final tree
        node(s) to produce the final output for each split. By default,
        ``tree_node_func`` will be used.
    split_every : int, optional
        This argument specifies the maximum number of input nodes
        to be handled by any one task in the tree. Defaults to 32.
    split_out : int, optional
        This argument specifies the number of output nodes in the
        reduction tree. If ``split_out`` is set to an integer >=1, the
        input tasks must contain data that can be indexed by a ``getitem``
        operation with a key in the range ``[0, split_out)``.
    output_partitions : list, optional
        List of required output partitions. This parameter is used
        internally by Dask for high-level culling.
    tree_node_name : str, optional
        Name to use for intermediate tree-node tasks.
    strr_   r   r   r   r   r  tree_node_funcCallable | Nonefinalize_funcsplit_every	split_outz	list[int]output_partitionstree_node_namewidthsheightc                   t         |   |       || _        || _        || _        || _        || _        || _        || _        || _	        |	"t        t        | j                  xs d            n|	| _        |
xs d| j                  z   | _        | j                  }|g| _        |dkD  rLt        j                   || j                  z        }| j                  j#                  t%        |             |dkD  rLt'        | j                        | _        y )Nr   rR   z
tree_node-)rN   r   r_   r   r   r  ro  rq  rr  rs  r   r   rt  ru  rv  mathceilr   r   r3   rw  )r   r_   r   r   r  ro  rq  rr  rs  rt  ru  r   r   rO   s                r   r   zDataFrameTreeReduction.__init__u  s     	[1	$!2&,*&" !( t~~*+," 	
 -Htyy0H &&gaiIIed&6&667EKKs5z* ai $++&r   r   splitc               *    | j                   r||fz   S |S r   )rs  )r   r|  
name_partss      r   	_make_keyz DataFrameTreeReduction._make_key  s     )-zUH$FJFr   c                    |r| j                   r| j                   }n| j                  }t        j                  || j                  |fS r   )rq  ro  r   r   r  )r   
input_keys
final_task
outer_funcs       r   _define_taskz#DataFrameTreeReduction._define_task  s<    $,,++J,,J

J(8(8*EEr   c                   i }| j                   s|S | j                  }| j                  rd|dz  }| j                   D ]P  }t        | j                        D ]6  }t
        j                  | j                  |f|f|| j                  |||      <   8 R | j                  dk\  rm| j                   D ][  }t        d| j                        D ]>  }t        | j                  |         D ]  }| j                  |dz
     }| j                  |z  }t        || j                  z   |      }	|dk(  r,t        ||	      D cg c]  }| j                  |||       }
}n9t        ||	      D cg c]$  }| j                  | j                  ||dz
  |      & }
}|| j                  dz
  k(  r3|dk(  sJ d| d       | j                  |
d	      || j                  |f<   | j                  |
d
	      || j                  | j                  |||      <   " A ^ |S | j                   D ]9  }| j                  |d|      g}
| j                  |
d	      || j                  |f<   ; |S c c}w c c}w )z%Construct graph for a tree reduction.z-splitr{     rR   r   zgroup = z%, not 0 for final tree reduction taskT)r  F)rt  r   rs  r   r   r   r   r  rw  rv  rr  minru  r  r_   )r   ro   name_input_userX   pr   groupp_maxlstartlstopr  s              r   rm   z'DataFrameTreeReduction._construct_graph  s    %%J >>h&N++ t556 A ((!,GC~qBC ;;!++ 'P"1dkk2 %PE!&t{{5'9!: #P $EAI 6!%!1!1E!9 #FT-=-=$=u E A: */vu)=*$% !%~q J*J * */vu)=	* %& !%$($7$7EAIQ !/ !"*J * !DKK!O3 !&
W!)%0UVW *262C2C *t 3D 3CA/ !% 1 1* 1 O	   $$($7$7Q !/ !"?#P%P'P\ 
	 ++ U"nn^QanHI
&*&7&7
t&7&TTYYN#U 
G**s   0I
)I
c                d    dj                  | j                  | j                  | j                        S )Nz>DataFrameTreeReduction<name='{}', input_name={}, split_out={}>)r   r_   r   rs  rf   s    r   rg   zDataFrameTreeReduction.__repr__  s(    OVVIIt
 	
r   c                X    | j                   D ch c]  }| j                  |f c}S c c}w r   )rt  r_   )r   rX   s     r   _output_keysz#DataFrameTreeReduction._output_keys  s$    (,(>(>?1A???r   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S )N_cached_output_keys)rl   r  r  )r   output_keyss     r   r   z&DataFrameTreeReduction.get_output_keys  s;    4./+++++-K'2D$'''r   c                    t        | d      S r   r   rf   s    r   r   z&DataFrameTreeReduction.is_materialized  r   r   c                x    t        | d      r| j                  S | j                         }|| _        | j                  S ri   rk   rn   s     r   rp   zDataFrameTreeReduction._dict  rq   r   c                     | j                   |   S r   rs   rt   s     r   r<   z"DataFrameTreeReduction.__getitem__  rv   r   c                ,    t        | j                        S r   rx   rf   s    r   rz   zDataFrameTreeReduction.__iter__  r{   r   c                    t        | j                  dd        xs d| j                  xs dz  }| j                  r%|| j                  t	        | j
                        z  z   S |S r   )sumrv  rs  r   r3   rt  )r   	tree_sizes     r   r~   zDataFrameTreeReduction.__len__  sW    QR).Q4>>3FQG	>>t55D<R<R8SSSSr   c                    t               }|D ])  }	 |\  }}|| j                  k7  r|j                  |       + |S # t        $ r Y 9w xY w)z;Simple utility to convert keys to output partition indices.r   )r   r   splitsru   r   _splits         r   _keys_to_output_partitionsz1DataFrameTreeReduction._keys_to_output_partitions  s^     	C #v 		!JJv	   r   c                    t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  | j                  || j                  | j                        S )N)rq  rr  rs  rt  ru  r   )rm  r_   r   r   r  ro  rq  rr  rs  ru  r   )r   rt  s     r   r   zDataFrameTreeReduction._cull,  sf    %IIOO"",,((nn/..((
 	
r   c                
   | j                   dft        | j                        D ch c]  }| j                  |f c}i}| j	                  |      }|t        | j                        k7  r| j                  |      }||fS | |fS c c}w )z2Cull a DataFrameTreeReduction HighLevelGraph layerr   )r_   r   r   r   r  r   rt  r   )r   r   r   rD   r   rt  r  s          r   r  zDataFrameTreeReduction.cull;  s     YYN.3D4J4J.K)*!$

 !;;DAD$:$: ;;::&78L%%:s   B )N    NNNN)r_   rn  r   rn  r   r   r  r   ro  r   rq  rp  rr  r   rs  z
int | Nonert  zlist[int] | Noneru  z
str | Noner   zdict[str, Any] | Noner   )r$   r%   r&   r'   r?   r   r  r  rm   rg   r  r   r   r   rp   r<   rz   r~   r  r   r  rZ   r[   s   @r   rm  rm  B  s4   #J IO""N  K *. $.2%)-1%'%' %' 	%'
 %' !%' '%' %' %' ,%' #%' +%'N ,- GFHT

@(- ! ! 
r   rm  r  ).
__future__r   r   ry  r   collectionsr   collections.abcr   	itertoolsr   typingr   tlzr   tlz.curriedr	   	dask.baser
   dask.blockwiser   r   r   r   	dask.corer   dask.highlevelgraphr   
dask.utilsr   r   r   r   r   r*   rA   rG   r]   r   r   r   r   r  r*  rW  rm  r(   r   r   <module>r     s    "    # $      U U  % = =@ @.K K&E* E3% 3w wt4n
R%2^R Rjo% odz zzt
y t
nEU Er   