o
    Nrf                     @  sx  d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dlmZ d dlmZ d d	lmZmZmZmZ d d
lmZ d dlmZ d dlmZmZmZm Z  G dd dZ!G dd deZ"G dd de"Z#G dd de"Z$G dd deZ%d'ddZ&dd Z'dd Z(G dd deZ)G dd  d e)Z*G d!d" d"eZ+G d#d$ d$eZ,G d%d& d&eZ-dS )(    annotationsN)defaultdict)Callable)product)Any)map)tokenize)	BlockwiseBlockwiseDepBlockwiseDepDictblockwise_token)flatten)Layer)applycached_cumsumconcreteinsertc                   @  s    e Zd ZdZdd Zdd ZdS )CallableLazyImportzFunction Wrapper for Lazy Importing.

    This Class should only be used when materializing a graph
    on a distributed scheduler.
    c                 C  s
   || _ d S N)function_path)selfr    r   T/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/dask/layers.py__init__"      
zCallableLazyImport.__init__c                 O  s    ddl m} || j|i |S )Nr   )import_term)Zdistributed.utilsr   r   )r   argskwargsr   r   r   r   __call__%   s   zCallableLazyImport.__call__N)__name__
__module____qualname____doc__r   r   r   r   r   r   r      s    r   c                   @  sB   e Zd ZU dZded< ded< dZded< dd	d
ZdddZdS )ArrayBlockwiseDepzg
    Blockwise dep for array-likes, which only needs chunking
    information to compute its data.
    tuple[tuple[int, ...], ...]chunkstuple[int, ...]	numblocksFboolproduces_tasksc                 C  s$   || _ tdd |D | _d| _d S )Nc                 s  s    | ]}t |V  qd S r   )len).0chunkr   r   r   	<genexpr>>   s    z-ArrayBlockwiseDep.__init__.<locals>.<genexpr>F)r&   tupler(   r*   r   r&   r   r   r   r   <   s   
zArrayBlockwiseDep.__init__idxc                 C  s   t d)Nz%Subclasses must implement __getitem__)NotImplementedErrorr   r1   r   r   r   __getitem__A      zArrayBlockwiseDep.__getitem__Nr&   r%   r1   r'   )r    r!   r"   r#   __annotations__r*   r   r4   r   r   r   r   r$   2   s   
 
r$   c                   @  s   e Zd ZdZdddZdS )ArrayChunkShapeDepz(Produce chunk shapes given a chunk indexr1   r'   c                 C  s   t dd t|| jD S )Nc                 s  s    | ]	\}}|| V  qd S r   r   )r,   ir-   r   r   r   r.   I       z1ArrayChunkShapeDep.__getitem__.<locals>.<genexpr>)r/   zipr&   r3   r   r   r   r4   H   s   zArrayChunkShapeDep.__getitem__Nr7   )r    r!   r"   r#   r4   r   r   r   r   r9   E   s    r9   c                      s6   e Zd ZU dZded< d fddZdd	d
Z  ZS )ArraySliceDepz>Produce slice(s) into the full-sized array given a chunk indexr%   startsr&   c                   s$   t  | tdd |D | _d S )Nc                 s  s    | ]	}t |d dV  qdS )T)Zinitial_zeroN)r   )r,   cr   r   r   r.   S   r;   z)ArraySliceDep.__init__.<locals>.<genexpr>)superr   r/   r>   r0   	__class__r   r   r   Q   s   zArraySliceDep.__init__r1   r/   c                 C  s,   t dd t|| jD }t dd |D S )Nc                 s  s(    | ]\}}|| ||d   fV  qdS    Nr   )r,   r:   startr   r   r   r.   V   s   & z,ArraySliceDep.__getitem__.<locals>.<genexpr>c                 s  s"    | ]}t g |d R  V  qd S r   slicer,   sr   r   r   r.   W   s     )r/   r<   r>   )r   r1   locr   r   r   r4   U   s   zArraySliceDep.__getitem__r6   )r1   r/   )r    r!   r"   r#   r8   r   r4   __classcell__r   r   rA   r   r=   L   s
   
 r=   c                      sn   e Zd ZdZ fddZdd Zedd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd ZdddZ  ZS )ArrayOverlapLayera\  Simple HighLevelGraph array overlap layer.

    Lazily computed High-level graph layer for a array overlap operations.

    Parameters
    ----------
    name : str
        Name of new output overlap array.
    array : Dask array
    axes: Mapping
        Axes dictionary indicating overlap in each dimension,
        e.g. ``{'0': 1, '1': 1}``
    c                   s2   t    || _|| _|| _|| _|| _d | _d S r   )r@   r   nameaxesr&   r(   token_cached_keys)r   rM   rN   r&   r(   rO   rA   r   r   r   i   s   

zArrayOverlapLayer.__init__c                 C  s   d| j  dS )NzArrayOverlapLayer<name=''rM   r   r   r   r   __repr__y   s   zArrayOverlapLayer.__repr__c                 C  $   t | dr| jS |  }|| _| jS z$Materialize full dict representation_cached_dicthasattrrW   _construct_graphr   dskr   r   r   _dict|   
   
zArrayOverlapLayer._dictc                 C  
   | j | S r   r]   r   keyr   r   r   r4      r   zArrayOverlapLayer.__getitem__c                 C  
   t | jS r   iterr]   rS   r   r   r   __iter__   r   zArrayOverlapLayer.__iter__c                 C  rc   r   r+   r]   rS   r   r   r   __len__   r   zArrayOverlapLayer.__len__c                 C  
   t | dS NrW   rY   rS   r   r   r   is_materialized   r   z!ArrayOverlapLayer.is_materializedc                 C  s   |   S r   )keysrS   r   r   r   get_output_keys   r5   z!ArrayOverlapLayer.get_output_keysc                   sH   | j d ur| j S | j| j| j  fdd  | _ }|S )Nc                    sd   sfgS t  }|d t kr" fddt| D }|S  fddt| D }|S )NrD   c                   s   g | ]
}f  |f qS r   r   r,   r:   )r   rM   r   r   
<listcomp>   s    z>ArrayOverlapLayer._dask_keys.<locals>.keys.<locals>.<listcomp>c                   s   g | ]	} |f  qS r   r   ro   )r   rm   r   r   rp      s    )r+   range)r   indresultr&   rm   rM   r(   )r   r   rm      s   z*ArrayOverlapLayer._dask_keys.<locals>.keys)rP   rM   r&   r(   )r   rs   r   rt   r   
_dask_keys   s   

zArrayOverlapLayer._dask_keysFc                 C  s
  | j }| j}| j}|  }d| j }d| j }|rtd}nddlm} tt	t
|}	tjt|	|d}
t|tt	|
t	ttjt}i }i }|D ]4}t|f| |}|f| |kra|||f| < qH|f| ||f| < |t|
d| |dff||f| < qHt||}|S )	z/Construct graph for a simple overlap operation.zgetitem-zoverlap-zdask.array.core.concatenate3r   )concatenate3)dimsrN   r   rR   )rN   r&   rM   ru   rO   r   Zdask.array.corerv   listr   r+   	functoolspartial_expand_keys_around_centertoolzpiper   concatfractional_slicer   merge)r   deserializingrN   r&   rM   Z	dask_keysZgetitem_nameZoverlap_namerv   rw   Zexpand_key2Zinterior_keysZinterior_slicesZoverlap_blockskZ
frac_slicer\   r   r   r   rZ      s8   


z"ArrayOverlapLayer._construct_graphF)r    r!   r"   r#   r   rT   propertyr]   r4   rf   rh   rl   rn   ru   rZ   rK   r   r   rA   r   rL   Z   s    
	rL   c                   s   fddg }t | dd D ]\}}d}|dkr|d7 }|| d k r*|d7 }|| q fddt | dd D }|durJ|gg| }tt| }	 fddt |D }
t|
|	}|S )	a  Get all neighboring keys around center

    Parameters
    ----------
    k: Key
        The key around which to generate new keys
    dims: Sequence[int]
        The number of chunks in each dimension
    name: Option[str]
        The name to include in the output keys, or none to include no name
    axes: Dict[int, int]
        The axes active in the expansion.  We don't expand on non-active axes

    Examples
    --------
    >>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
     [('y',   2, 2.1), ('y',   2, 3), ('y',   2, 3.9)],
     [('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]

    >>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y',   0, 3.1), ('y',   0,   4)],
     [('y', 0.9, 3.1), ('y', 0.9,   4)]]
    c                   sN   g }|d dkr| |d  | | |d  |  d k r%| |d  |S )Ng?r   rD   )append)r:   rr   rv)rw   r   r   inds   s   
z(_expand_keys_around_center.<locals>.indsrD   Nr   c                   s2   g | ]\}}t  |d fr||n|gqS )r   anyget)r,   r:   rr   )rN   r   r   r   rp      s    &z._expand_keys_around_center.<locals>.<listcomp>c                   s*   g | ]\}}t  |d fr|ndqS )r   rD   r   )r,   r:   d)rN   r   r   rp     s   * )	enumerater   rx   r   reshapelist)r   rw   rM   rN   shaper:   rr   numr   seqZshape2rs   r   )rN   rw   r   r   r{      s$   	
r{   c                   sB   t  dkr
t|S tt | d  } fddt||D S )zgReshape iterator to nested shape

    >>> reshapelist((2, 3), range(6))
    [[0, 1, 2], [3, 4, 5]]
    rD   r   c                   s   g | ]}t  d d |qS rC   )r   r,   partr   r   r   rp     s    zreshapelist.<locals>.<listcomp>)r+   rx   intr|   	partition)r   r   nr   r   r   r     s   r   c           
      C  s  | d ft dd | dd D  }g }tt| dd |dd D ]S\}\}}||d}t|t r=|d }|d }	n|}|}	||krO|tddd q#||k r^|	r^|td|	 q#||krn|rn|t| d q#|tdd q#t |}tdd |D r| S tj	||fS )a  

    >>> fractional_slice(('x', 5.1), {0: 2})
    (<built-in function getitem>, ('x', 5), (slice(-2, None, None),))

    >>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))

    >>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
    r   c                 s  s    | ]	}t t|V  qd S r   )r   roundro   r   r   r   r.   $  r;   z#fractional_slice.<locals>.<genexpr>rD   Nc                 s  s     | ]}|t d d d kV  qd S r   rF   )r,   rr   r   r   r   r.   :  s    )
r/   r   r<   r   
isinstancer   rG   alloperatorgetitem)
taskrN   Zroundedindexr:   trdepthZ
left_depthZright_depthr   r   r   r     s(   $*

r   c                      s   e Zd ZdZ		d  fdd	Zdd Zdd Zd	d
 Zdd Ze	dd Z
dd Zdd Zdd Zdd Zd!ddZdd Zdd Zd"ddZ  ZS )#SimpleShuffleLayera  Simple HighLevelGraph Shuffle layer

    High-level graph layer for a simple shuffle operation in which
    each output partition depends on all input partitions.

    Parameters
    ----------
    name : str
        Name of new shuffled output collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    npartitions : int
        Number of output partitions.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc
           
        sv   || _ || _|| _|| _|| _|| _|| _|pt|| _d| j  | _	|	p%i }	d | _
d|	vr2| j|	d< t j|	d d S )Nsplit-priorityr   )rM   columnnpartitionsnpartitions_inputignore_index
name_input
meta_inputrq   	parts_out
split_nameZ_split_keys_key_priorityr@   r   )
r   rM   r   r   r   r   r   r   r   r   rA   r   r   r   d  s   
zSimpleShuffleLayer.__init__c                 C  s$   t |tsJ |d | jkrdS dS )Nr   rD   )r   r/   r   ra   r   r   r   r     s   z SimpleShuffleLayer._key_priorityc                       fdd j D S )Nc                      h | ]} j |fqS r   rR   r   rS   r   r   	<setcomp>      z5SimpleShuffleLayer.get_output_keys.<locals>.<setcomp>r   rS   r   rS   r   rn        z"SimpleShuffleLayer.get_output_keysc                 C  s   d | j| jS )Nz-SimpleShuffleLayer<name='{}', npartitions={}>)formatrM   r   rS   r   r   r   rT     s   zSimpleShuffleLayer.__repr__c                 C  ri   rj   rk   rS   r   r   r   rl     r   z"SimpleShuffleLayer.is_materializedc                 C  rU   rV   rX   r[   r   r   r   r]     r^   zSimpleShuffleLayer._dictc                 C  r_   r   r`   ra   r   r   r   r4     r   zSimpleShuffleLayer.__getitem__c                 C  rc   r   rd   rS   r   r   r   rf     r   zSimpleShuffleLayer.__iter__c                 C  rc   r   rg   rS   r   r   r   rh     r   zSimpleShuffleLayer.__len__c              	   C  J   t  }|D ]}z|\}}W n	 ty   Y qw || jkrq|| q|S z4Simple utility to convert keys to partition indices.set
ValueErrorrM   addr   rm   partsrb   _name_partr   r   r   _keys_to_parts     
z!SimpleShuffleLayer._keys_to_partsc                   sN   t t}|p
 |}|D ]}| j|f   fddt jD O  < q|S )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        c                   r   r   r   ro   rS   r   r   r         
z8SimpleShuffleLayer._cull_dependencies.<locals>.<setcomp>)r   r   r   rM   rq   r   )r   rm   r   depsr   r   rS   r   _cull_dependencies  s   z%SimpleShuffleLayer._cull_dependenciesc              
   C  s&   t | j| j| j| j| j| j| j|dS Nr   )r   rM   r   r   r   r   r   r   r   r   r   r   r   _cull  s   zSimpleShuffleLayer._cullc                 C  @   |  |}| j||d}|t| jkr| |}||fS | |fS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   r   r   r   r   r   r   rm   all_keysr   Zculled_depsculled_layerr   r   r   cull     

zSimpleShuffleLayer.cullFc           
   
     s   dj  }|rtd}td}nddlm} ddlm} i }jD ]L  fddtjD }||j	f|j  f< |D ]/\}}}	t
j||	f|f|j||	f< ||	f|vrl|j|	fjdjjj	jf|||	f< q=q!|S )	z/Construct graph for a simple shuffle operation.group-dask.dataframe.core._concat$dask.dataframe.shuffle.shuffle_groupr   _concatshuffle_groupc                   s   g | ]}j  |fqS r   )r   )r,   Zpart_inZpart_outr   r   r   rp     s    
z7SimpleShuffleLayer._construct_graph.<locals>.<listcomp>)rM   r   dask.dataframe.corer   dask.dataframe.shuffler   r   rq   r   r   r   r   r   r   r   r   )
r   r   shuffle_group_nameconcat_funcshuffle_group_funcr\   _concat_list_Z	_part_outZ_part_inr   r   r   rZ     sF   

z#SimpleShuffleLayer._construct_graphNNr   r   )r    r!   r"   r#   r   r   rn   rT   rl   r   r]   r4   rf   rh   r   r   r   r   rZ   rK   r   r   rA   r   r   G  s&    %*
	
r   c                      sJ   e Zd ZdZ		d fdd	Zdd ZdddZd	d
 ZdddZ  Z	S )ShuffleLayera"  Shuffle-stage HighLevelGraph layer

    High-level graph layer corresponding to a single stage of
    a multi-stage inter-partition shuffle operation.

    Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)

    Parameters
    ----------
    name : str
        Name of new (partially) shuffled collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    inputs : list of tuples
        Each tuple dictates the data movement for a specific partition.
    stage : int
        Index of the current shuffle stage.
    npartitions : int
        Number of output partitions for the full (multi-stage) shuffle.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    k : int
        A partition is split into this many groups during each stage.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc                   s@   || _ || _|| _t j||||||	|
|ptt||d	 d S )N)r   r   )inputsstagensplitsr@   r   rq   r+   )r   rM   r   r   r   r   r   r   r   r   r   r   r   rA   r   r   r   A  s   
zShuffleLayer.__init__c                 C     d | j| j| j| jS )Nz=ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>)r   rM   r   r   r   rS   r   r   r   rT   _     zShuffleLayer.__repr__c           
      C  s   t t}|p
| |}dd t| jD }|D ]B}| j| }t| jD ]5}t|| j|}|| }	| jdkrK|	| j	krK|| j
|f d| j
 |df q#|| j
|f | j|	f q#q|S )zqDetermine the necessary dependencies to produce `keys`.

        Does not require graph materialization.
        c                 S     i | ]\}}||qS r   r   r,   r:   inpr   r   r   
<dictcomp>k  r   z3ShuffleLayer._cull_dependencies.<locals>.<dictcomp>r   r   empty)r   r   r   r   r   rq   r   r   r   r   rM   r   r   )
r   rm   r   r   inp_part_mapr   outr   _inpr   r   r   r   r   d  s   
"zShuffleLayer._cull_dependenciesc                 C  s2   t | j| j| j| j| j| j| j| j| j	| j
|dS r   )r   rM   r   r   r   r   r   r   r   r   r   r   r   r   r   r   w  s   zShuffleLayer._cullFc              
   C  sf  d| j  }|rtd}td}nddlm} ddlm} i }dd t| jD }| jD ]}| j| }g }	t	| j
D ]}
t|| j|
}|| j }|	| j||f q9||	| jf|| j |f< |	D ]Q\}}}tj||f|f|| j||f< ||f|vr|| }| jdkr|| jk r| j|f}n||d	f}| j||< n| j|f}||| j| j| j
| j| j| jf|||f< q^q+|S )
z2Construct graph for a "rearrange-by-column" stage.r   r   r   r   r   r   c                 S  r   r   r   r   r   r   r   r     r   z1ShuffleLayer._construct_graph.<locals>.<dictcomp>r   )rM   r   r   r   r   r   r   r   r   rq   r   r   r   r   r   r   r   r   r   r   r   r   r   )r   r   r   r   r   r\   r   r   r   r   r:   r   Z_idxr   r   Z	input_keyr   r   r   rZ     s\   







!zShuffleLayer._construct_graphr   r   r   )
r    r!   r"   r#   r   rT   r   r   rZ   rK   r   r   rA   r   r     s    0
r   c                      s   e Zd ZdZ				d  fdd	Zdd Zdd Zd	d
 Zedd Z	dd Z
dd Zdd Zdd Zedd Zd!ddZdd Zdd Zd"ddZ  ZS )#BroadcastJoinLayera;  Broadcast-based Join Layer

    High-level graph layer for a join operation requiring the
    smaller collection to be broadcasted to every partition of
    the larger collection.

    Parameters
    ----------
    name : str
        Name of new (joined) output collection.
    lhs_name: string
        "Left" DataFrame collection to join.
    lhs_npartitions: int
        Number of partitions in "left" DataFrame collection.
    rhs_name: string
        "Right" DataFrame collection to join.
    rhs_npartitions: int
        Number of partitions in "right" DataFrame collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations.
    **merge_kwargs : **dict
        Keyword arguments to be passed to chunkwise merge func.
    Nc                   s   t  j|d || _|| _|| _|| _|| _|| _|p!tt	| j| _
t|	tr,t|	n|	| _t|
tr8t|
n|
| _|| _| jd| _| j| jd< | j| jd< d S )Nr   howleft_onright_on)r@   r   rM   r   lhs_namelhs_npartitionsrhs_namerhs_npartitionsr   rq   r   r   rx   r/   r   r   merge_kwargsr   r   )r   rM   r   r   r   r   r   r   r   r   r   r   rA   r   r   r     s   zBroadcastJoinLayer.__init__c                   r   )Nc                   r   r   rR   r   rS   r   r   r     r   z5BroadcastJoinLayer.get_output_keys.<locals>.<setcomp>r   rS   r   rS   r   rn     r   z"BroadcastJoinLayer.get_output_keysc                 C  r   )Nz5BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>)r   rM   r   r   r   rS   r   r   r   rT   	  r   zBroadcastJoinLayer.__repr__c                 C  ri   rj   rk   rS   r   r   r   rl     r   z"BroadcastJoinLayer.is_materializedc                 C  rU   rV   rX   r[   r   r   r   r]     r^   zBroadcastJoinLayer._dictc                 C  r_   r   r`   ra   r   r   r   r4     r   zBroadcastJoinLayer.__getitem__c                 C  rc   r   rd   rS   r   r   r   rf     r   zBroadcastJoinLayer.__iter__c                 C  rc   r   rg   rS   r   r   r   rh   !  r   zBroadcastJoinLayer.__len__c              	   C  r   r   r   r   r   r   r   r   $  r   z!BroadcastJoinLayer._keys_to_partsc                 C  s4   | j | jk r| j| j | j| jfS | j| j| j| jfS r   )r   r   r   r   r   r   rS   r   r   r   _broadcast_plan1  s   		z"BroadcastJoinLayer._broadcast_planc                   s|   | j dd \ }}tt}|p| |}|D ]$}|| j|f   fddt|D O  < || j|f  ||fhO  < q|S )zDetermine the necessary dependencies to produce `keys`.

        For a broadcast join, output partitions always depend on
        all partitions of the broadcasted collection, but only one
        partition of the "other" collection.
        N   c                   s   h | ]} |fqS r   r   ro   
bcast_namer   r   r   X  s    z8BroadcastJoinLayer._cull_dependencies.<locals>.<setcomp>)r   r   r   r   rM   rq   )r   rm   r   
bcast_size
other_namer   r   r   r   r   r   K  s   (
z%BroadcastJoinLayer._cull_dependenciesc                 C  s0   t | j| j| j| j| j| jf| j|d| jS )N)r   r   )	r   rM   r   r   r   r   r   r   r   r   r   r   r   r   ^  s   	zBroadcastJoinLayer._cullc                 C  r   )a  Cull a BroadcastJoinLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   r   r   r   r   r   r   k  r   zBroadcastJoinLayer.cullFc                 C  s2  d| j  }d| j  }|rtd}td}td}nddlm} ddlm} dd	lm} | j\}}}	}
| j| jk r:d
nd}i }| j	D ]U}| j
dkrT||	|f|
|f|||f< g }t|D ]2}| j
dkritj||f|fn|	|f||fg}|d
kry|  |||f}t||| jf||< || qZ||f|| j |f< qA|S )z/Construct graph for a broadcast join operation.zinter-r   z%dask.dataframe.multi._split_partitionz$dask.dataframe.multi._concat_wrapperz)dask.dataframe.multi._merge_chunk_wrapperr   )_concat_wrapper)_merge_chunk_wrapper)_split_partitionleftrightinner)rM   r   Zdask.dataframe.multir   r   r   r   r   r   r   r   rq   r   r   reverser   r   r   )r   r   Z
inter_namer   Zsplit_partition_funcr   Zmerge_chunk_funcr   r   r   Zother_onZ
bcast_sider\   r:   r   jZ_merge_argsZ	inter_keyr   r   r   rZ   {  sX   






z#BroadcastJoinLayer._construct_graph)NNNNr   r   )r    r!   r"   r#   r   rn   rT   rl   r   r]   r4   rf   rh   r   r   r   r   r   rZ   rK   r   r   rA   r   r     s,    "
	

r   c                      sF   e Zd ZdZ				d fdd	Zedd Zdd	 Zd
d Z  Z	S )DataFrameIOLayera  DataFrame-based Blockwise Layer with IO

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    columns : str, list or None
        Field name(s) to read in as columns in the output.
    inputs : list or BlockwiseDep
        List of arguments to be passed to ``io_func`` so
        that the materialized task to produce partition ``i``
        will be: ``(<io_func>, inputs[i])``.  Note that each
        element of ``inputs`` is typically a tuple of arguments.
    io_func : callable
        A callable function that takes in a single tuple
        of arguments, and outputs a DataFrame partition.
        Column projection will be supported for functions
        that satisfy the ``DataFrameIOFunction`` protocol.
    label : str (optional)
        String to use as a prefix in the place-holder collection
        name. If nothing is specified (default), "subset-" will
        be used.
    produces_tasks : bool (optional)
        Whether one or more elements of `inputs` is expected to
        contain a nested task. This argument in only used for
        serialization purposes, and will be deprecated in the
        future. Default is False.
    creation_info: dict (optional)
        Dictionary containing the callable function ('func'),
        positional arguments ('args'), and key-word arguments
        ('kwargs') used to produce the dask collection with
        this underlying ``DataFrameIOLayer``.
    annotations: dict (optional)
        Layer annotations to pass through to Blockwise.
    NFc	                   s   || _ || _|| _|| _|| _|| _|| _|| _t|t	s-t
dd t| jD | jd}	n|}	| j |tdfi}
t j| j d|
|	dfgi |d d S )Nc                 S  s   i | ]\}}|f|qS r   r   r   r   r   r   r     s    z-DataFrameIOLayer.__init__.<locals>.<dictcomp>)r*   r   r:   )outputZoutput_indicesr\   indicesr(   r   )rM   _columnsr   io_funclabelr*   r   creation_infor   r   r   r   r   r@   r   )r   rM   columnsr   r  r  r*   r	  r   Z
io_arg_mapr\   rA   r   r   r     s.   

zDataFrameIOLayer.__init__c                 C  s   | j S )z(Current column projection for this layer)r  rS   r   r   r   r
    s   zDataFrameIOLayer.columnsc              	   C  s   ddl m} t|}| jdu st| j|rCt| j|r$| j|}n| j}t	| j
p,dd t| j| || j|| j
| j| jd}|S | S )zProduce a column projection for this IO layer.
        Given a list of required output columns, this method
        returns the projected layer.
        r   )DataFrameIOFunctionNZsubset-)r  r*   r   )Zdask.dataframe.io.utilsr  rx   r
  r   
issupersetr   r  project_columnsr  r  r	   rM   r   r*   r   )r   r
  r  r  layerr   r   r   r    s"   	z DataFrameIOLayer.project_columnsc                 C  s   d | jt| j| jS )Nz3DataFrameIOLayer<name='{}', n_parts={}, columns={}>)r   rM   r+   r   r
  rS   r   r   r   rT   <  r   zDataFrameIOLayer.__repr__)NFNN)
r    r!   r"   r#   r   r   r
  r  rT   rK   r   r   rA   r   r    s    *(
r  c                      s  e Zd ZU dZded< ded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< 						d;d< fddZddddZd=d!d"Zd#d$ Zd%d& Z	d'd( Z
d)d* Zd+d, Zed-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 Zd9d: Z  ZS )>DataFrameTreeReductionag  DataFrame Tree-Reduction Layer

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    name_input : str
        Name of the input layer that is being reduced.
    npartitions_input : str
        Number of partitions in the input layer.
    concat_func : callable
        Function used by each tree node to reduce a list of inputs
        into a single output value. This function must accept only
        a list as its first positional argument.
    tree_node_func : callable
        Function used on the output of ``concat_func`` in each tree
        node. This function must accept the output of ``concat_func``
        as its first positional argument.
    finalize_func : callable, optional
        Function used in place of ``tree_node_func`` on the final tree
        node(s) to produce the final output for each split. By default,
        ``tree_node_func`` will be used.
    split_every : int, optional
        This argument specifies the maximum number of input nodes
        to be handled by any one task in the tree. Defaults to 32.
    split_out : int, optional
        This argument specifies the number of output nodes in the
        reduction tree. If ``split_out`` is set to an integer >=1, the
        input tasks must contain data that can be indexed by a ``getitem``
        operation with a key in the range ``[0, split_out)``.
    output_partitions : list, optional
        List of required output partitions. This parameter is used
        internally by Dask for high-level culling.
    tree_node_name : str, optional
        Name to use for intermediate tree-node tasks.
    strrM   r   r   r   r   r   tree_node_funcCallable | Nonefinalize_funcsplit_every	split_outz	list[int]output_partitionstree_node_namewidthsheightN    
int | Nonelist[int] | None
str | Noner   dict[str, Any] | Nonec                   s   t  j|d || _|| _|| _|| _|| _|| _|| _|| _	|	d u r,t
t| j	p)dn|	| _|
p5d| j | _| j}|g| _|dkrVt|| j }| jt| |dksBt| j| _d S )Nr   rD   z
tree_node-)r@   r   rM   r   r   r   r  r  r  r  rx   rq   r  r  r  mathceilr   r   r+   r  )r   rM   r   r   r   r  r  r  r  r  r  r   r   rA   r   r   r   u  s*   zDataFrameTreeReduction.__init__r   splitc                G  s   | j r||f S |S r   )r  )r   r#  Z
name_partsr   r   r   	_make_key  s   z DataFrameTreeReduction._make_keyFc                 C  s(   |r	| j r	| j }n| j}tj|| j|fS r   )r  r  r|   r}   r   )r   
input_keys
final_taskZ
outer_funcr   r   r   _define_task  s   
z#DataFrameTreeReduction._define_taskc                   s  i }j s|S jjr0d7 j D ]tjD ]}tjj|ff|j|d< qqjdkrj D ]{tdjD ]r tj	  D ]h}j	 d  }j
| }t|j
 |} dkrrfddt||D }n fddt||D } jd kr|dksJ d	| d
j|dd|jf< qIj|dd|jj| d< qIq@q8|S j D ]jddg}j|dd|jf< q|S )z%Construct graph for a tree reduction.z-splitr"     rD   c                   s   g | ]
}j  |d qS )r"  )r$  r,   p)name_input_userI   r   r   r   rp     s    z;DataFrameTreeReduction._construct_graph.<locals>.<listcomp>c                   s$   g | ]}j j| d  dqS )rD   r"  )r$  r  r)  )r   rI   r   r   r   rp     s    r   zgroup = z%, not 0 for final tree reduction taskT)r&  F)r  r   r  rq   r   r   r   r$  r  r  r  minr'  rM   r  )r   r\   r*  groupZp_maxZlstartZlstopr%  r   )r   r+  rI   r   r   rZ     s^   





	
,
z'DataFrameTreeReduction._construct_graphc                 C  s   d | j| j| jS )Nz>DataFrameTreeReduction<name='{}', input_name={}, split_out={}>)r   rM   r   r  rS   r   r   r   rT     s   zDataFrameTreeReduction.__repr__c                   r   )Nc                   r   r   rR   rH   rS   r   r   r     r   z6DataFrameTreeReduction._output_keys.<locals>.<setcomp>)r  rS   r   rS   r   _output_keys  r   z#DataFrameTreeReduction._output_keysc                 C  rU   )N_cached_output_keys)rY   r/  r.  )r   Zoutput_keysr   r   r   rn     s
   
z&DataFrameTreeReduction.get_output_keysc                 C  ri   rj   rk   rS   r   r   r   rl     r   z&DataFrameTreeReduction.is_materializedc                 C  rU   rV   rX   r[   r   r   r   r]     r^   zDataFrameTreeReduction._dictc                 C  r_   r   r`   ra   r   r   r   r4     r   z"DataFrameTreeReduction.__getitem__c                 C  rc   r   rd   rS   r   r   r   rf     r   zDataFrameTreeReduction.__iter__c                 C  s>   t | jdd  p
d| jpd }| jr|| jt| j  S |S )NrD   )sumr  r  r   r+   r  )r   Z	tree_sizer   r   r   rh     s    zDataFrameTreeReduction.__len__c              	   C  r   )z;Simple utility to convert keys to output partition indices.r   )r   rm   Zsplitsrb   r   _splitr   r   r   _keys_to_output_partitions  r   z1DataFrameTreeReduction._keys_to_output_partitionsc                 C  s2   t | j| j| j| j| j| j| j| j|| j	| j
dS )N)r  r  r  r  r  r   )r  rM   r   r   r   r  r  r  r  r  r   )r   r  r   r   r   r   ,  s   zDataFrameTreeReduction._cullc                   sT    j df fddt jD i} |}|t jkr& |}||fS  |fS )z2Cull a DataFrameTreeReduction HighLevelGraph layerr   c                   r   r   r   ro   rS   r   r   r   >  r   z.DataFrameTreeReduction.cull.<locals>.<setcomp>)rM   rq   r   r2  r   r  r   )r   rm   r   r   r  r   r   rS   r   r   ;  s   

zDataFrameTreeReduction.cull)Nr  NNNN)rM   r  r   r  r   r   r   r   r  r   r  r  r  r   r  r  r  r  r  r  r   r  r   )r    r!   r"   r#   r8   r   r$  r'  rZ   rT   r.  rn   rl   r   r]   r4   rf   rh   r2  r   r   rK   r   r   rA   r   r  B  sH   
 %	'
J
	r  r   ).
__future__r   ry   r   r   collectionsr   collections.abcr   	itertoolsr   typingr   Ztlzr|   Ztlz.curriedr   Z	dask.baser	   Zdask.blockwiser
   r   r   r   Z	dask.corer   Zdask.highlevelgraphr   Z
dask.utilsr   r   r   r   r   r$   r9   r=   rL   r{   r   r   r   r   r   r  r  r   r   r   r   <module>   s>    	
z7/ V 3 ~w