
    0Fie4                       d dl mZ d dlmZmZ d dlmZmZ d dlZd dl	m
Z
mZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZmZ erd dlZd dlmZmZmZ d dlm Z  dZ!d;dZ"d<dZ#	 	 	 	 d=d>d&Z$e$Z%dZ!d?d/Z&d@d8Z' G d9 d:e          Z(dS )A    annotations)IterableSequence)TYPE_CHECKINGAnyN)is_dask_collectiontokenize)HighLevelGraph)Layer)check_minimal_arrow_version)	ShuffleIdbarrier_keyget_worker_plugin)shuffle_barriershuffle_transfer)
IndexLabelMergeHowSuffixes_Frame__hash_partitiondfpd.DataFrameindexr   c                .   dd l }ddlm} |j        j                            |          ot          |           }t          ||          s'|rt          |          }| 	                    |          }n$t          |d          r|                                }|S )Nr   r   to_frame)pandasdask.dataframe.corer   apitypesis_list_liker	   
isinstancelist_select_columns_or_indexhasattrr   )r   r   pdr   	list_likes        :lib/python3.11/site-packages/distributed/shuffle/_merge.py_prepare_index_for_partitioningr*      s    ******))%00R9KE9R9R5RIeV$$ 	! 	 KKE++E22	
	#	# !   L    npartitionsintc                B   t          | |          }ddlm} | j                            dg          }| j        j        d d         |_        |                    ||p| j        |d          } | j	        di t          |i}| j        j        j        |j        j        _        |S )Nr   )partitioning_index   F)r,   metatransform_divisions )r*   dask.dataframe.shuffler/   _meta_constructor_sliced_meta_nonemptyr   map_partitionsr,   assign_HASH_COLUMN_NAMEname)r   r   r,   r/   r1   
partitionsdf2s          r)   _calculate_partitionsr>   .   s    +B66E9999998'',,D "(!,DJ%%12>!	 &  J ")
6
6(*5
6
6C8>.CIOJr+   inner_x_yFlhsr   left_onIndexLabel | Nonerhsright_onhowr   
int | Nonesuffixesr   	indicatorboolc           	     4   ddl m}m}	 |t          | j        |j                  }t          ||          rd }
d}nd}|}
t          ||          rd }d}nd}|}t          ||
|||||          }t          | j                  r| j	        n| j
        }t          |j                  r|j	        n|j
        } |j        |fi |}t          | ||          } t          |||          }dt          | |fi |z   }t          j                            d          }t#          di d|d	| j        d
| j
        d|
d| j        d|j        d|j
        d|d|j        d|d|d|d|d|d|d|d|}t'          j        ||| |g          } |	|||d g|dz   z            S )Nr   )Indexnew_dd_objectTF)rH   rD   rG   
left_indexright_indexrJ   rK   z
hash-join-zdistributed.p2p.diskr;   name_input_leftmeta_input_leftrD   n_partitions_leftname_input_rightmeta_input_rightrG   n_partitions_rightmeta_outputrH   r,   rJ   rK   rP   rQ   disk)dependenciesr0   r3   )r   rN   rO   maxr,   r#   dictlencolumnsr7   r5   merger>   r
   daskconfiggetHashJoinP2PLayer_namer   from_collections)rC   rD   rF   rG   rH   r,   rJ   rK   rN   rO   _left_onrP   	_right_onrQ   merge_kwargs	_lhs_meta	_rhs_metar1   
merge_namerY   
join_layergraphs                         r)   hash_join_p2prn   A   s    98888888#/3?;;'5!! 


(E"" 		  L '*#+&6&6E""CII&)#+&6&6E""CII9?95555D
Wk
:
:C
X{
;
;Cc B B\ B BBJ!788D!   Z		 		 	
 //    ?? D C  K  ) :   K!" T#J& +Jc3Z  E =
D4&K!O2LMMMr+   inputidr   input_partitionr1   	parts_outset[int]rY   c           
     :    t          | |||t          |||          S )N)ro   rp   rq   r,   columnr1   rr   rY   )r   r:   )ro   rp   rq   r,   r1   rr   rY   s          r)   merge_transferrv      s3     ' 	 	 	 	r+   shuffle_id_leftshuffle_id_rightoutput_partitionbarrier_leftbarrier_rightresult_metarP   rQ   c                   ddl m} t                      }|                    | ||                              t
          d          }|                    |||                              t
          d          } ||||||||	|
|	  	        S )Nr   )merge_chunkignore)r^   errors)rH   r|   rD   rG   rJ   rP   rQ   )dask.dataframe.multir~   r   get_output_partitiondropr:   )rw   rx   ry   rz   r{   rH   rD   rG   r|   rJ   rP   rQ   r~   extleftrights                   r)   merge_unpackr      s     100000


C##'7 
d$Xd66 	 $$-)9 
d$Xd66 
 ;
 
 
 
r+   c                  >    e Zd ZU ded<   ded<   ded<   ded<   d	ed
<   ded<   ded<   ded<   ded<   ded<   ded<   d	ed<   ded<   ded<   ded<   ded<   d	ed<   	 	 	 	 	 d:d; fd$Z	 d<d=d(Zd>d*Zd+ Zd, Zd- Z	d. Z
d/ Zd0 Zed1             Zd?d3Zd@d7ZdAd9Z xZS )Brc   strr;   r-   r,   r   rH   r   rJ   rL   rK   r   rX   zSequence[int]rr   rR   rS   rT   rE   rD   rP   rU   rV   rW   rG   rQ   r?   r@   FNrY   Sequence | Noner   dict | NonereturnNonec                   t                       || _        || _        || _        || _        || _        || _        |	| _        || _        || _	        || _
        || _        |
| _        |pt          t          |                    | _        || _        || _        || _        || _        || _        t+                                          |           d S )Nr   )r   r;   rR   rS   rD   rU   rV   rG   rH   r,   rJ   rK   rX   r$   rangerr   rT   rW   rP   rQ   rY   super__init__)selfr;   rR   rS   rD   rT   rW   rU   rV   rG   rX   rP   rQ   r,   rY   rH   rJ   rK   rr   r   	__class__s                       r)   r   zHashJoinP2PLayer.__init__   s    , 	$%%%	.. 0 0 & "&">d5+=+=&>&>!2"4$&	[11111r+   keysIterable[str]Iterable[str] | Nonec                     i }|p                      |          } fdt           j                  D             }| fdt           j                  D             z  }t          |          }|D ]}|| j        |f<   |S )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        c                "    h | ]}j         |fS r3   )rR   .0ir   s     r)   	<setcomp>z6HashJoinP2PLayer._cull_dependencies.<locals>.<setcomp>  s!    KKKa%q)KKKr+   c                "    h | ]}j         |fS r3   )rU   r   s     r)   r   z6HashJoinP2PLayer._cull_dependencies.<locals>.<setcomp>  s!    MMM$'+MMMr+   )_keys_to_partsr   r,   	frozensetr;   )r   r   rr   depsparts   `    r)   _cull_dependenciesz#HashJoinP2PLayer._cull_dependencies  s     :!4!4T!:!:	KKKK59I3J3JKKKMMMMU4;K5L5LMMMM 	+ 	+D&*D$)T"##r+   set[str]c                    t                      }|D ]:}	 |\  }}n# t          $ r Y w xY w|| j        k    r%|                    |           ;|S )z4Simple utility to convert keys to partition indices.)set
ValueErrorr;   add)r   r   partskeyrd   _parts         r)   r   zHashJoinP2PLayer._keys_to_parts  st     	 	C"uu   	!!IIes   
''c                *      fd j         D             S )Nc                "    h | ]}j         |fS r3   )r;   )r   r   r   s     r)   r   z3HashJoinP2PLayer.get_output_keys.<locals>.<setcomp>(  s     ===dD!===r+   rr   r   s   `r)   get_output_keysz HashJoinP2PLayer.get_output_keys'  s    ====dn====r+   c                (    d| j          d| j         dS )NzHashJoin<name='z', npartitions=>)r;   r,   r   s    r)   __repr__zHashJoinP2PLayer.__repr__*  s     NNN4;KNNNNr+   c                "    t          | d          S )N_cached_dict)r&   r   s    r)   is_materializedz HashJoinP2PLayer.is_materialized-  s    t^,,,r+   c                    | j         |         S N)_dict)r   r   s     r)   __getitem__zHashJoinP2PLayer.__getitem__0  s    z#r+   c                *    t          | j                  S r   )iterr   r   s    r)   __iter__zHashJoinP2PLayer.__iter__3  s    DJr+   c                *    t          | j                  S r   )r]   r   r   s    r)   __len__zHashJoinP2PLayer.__len__6  s    4:r+   c                t    t          | d          r| j        S |                                 }|| _        | j        S )z$Materialize full dict representationr   )r&   r   _construct_graph)r   dsks     r)   r   zHashJoinP2PLayer._dict9  sA     4(( 	$$$''))C #D  r+   Sequence[str]c                <   t          di d| j        d| j        d| j        d| j        d| j        d| j        d| j        d| j        d	| j	        d
| j
        d| j        d| j        d|d| j        d| j        d| j        d| j        d| j        d| j        S )Nr;   rR   rS   rD   rU   rV   rG   rH   r,   rJ   rK   rX   rr   rP   rQ   rY   r   rT   rW   r3   )rc   r;   rR   rS   rD   rU   rV   rG   rH   r,   rJ   rK   rX   rP   rQ   rY   r   rT   rW   )r   rr   s     r)   _cullzHashJoinP2PLayer._cullC  s1    
 
 

 00
 !00
 LL	

 "22
 "22
 ]]
 
 ((
 ]]
 nn
 ((
  i
 
 ((
  !
" ((#
$ #44%
&  $66'
 	
r+   all_keysr   tuple[HashJoinP2PLayer, dict]c                    |                      |          }|                     ||          }|t          | j                  k    r|                     |          }||fS | |fS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   )r   r   r   rr   r   )r   r   r   rr   culled_depsculled_layers         r)   cullzHashJoinP2PLayer.cullZ  sk     ''--	--di-HHDN++++::i00L,,$$r+   dict[tuple | str, tuple]c                   t          d| j        | j        | j        | j                  }t          d| j        | j        | j        | j                  }i }d|z   }d|z   }t                      }t                      }t          | j	                  D ]H}|
                    ||f           t          | j        |f||| j        | j        | j        | j        f|||f<   It          | j                  D ]H}|
                    ||f           t          | j        |f||| j        | j        | j        | j        f|||f<   It!          t#          |                    }	t!          t#          |                    }
t$          ||f||	<   t$          ||f||
<   | j        }| j        D ]>}t(          ||||	|
| j        | j        | j        | j        | j        | j        | j        f|||f<   ?|S )Nz	hash-joinzhash-join-transfer-)r
   rR   rD   r,   rr   rU   rG   r$   r   rT   appendrv   rS   rY   rW   rV   r   r   r   r;   r   rH   rX   rJ   rP   rQ   )r   
token_lefttoken_rightr   	name_left
name_righttransfer_keys_lefttransfer_keys_rightr   _barrier_key_left_barrier_key_rightr;   part_outs                r)   r   z!HashJoinP2PLayer._construct_graphj  s!    LN
 

 !MN
 
 )+)J6	*[8
!VV"fft-.. 	 	A%%y!n555%q) $		#CA t.// 	 	A&&
A777&* %		$CQ   (	*(=(=>>(;)?)?@@"1:?Q!R#2KAT"Uy 	 	H!"  %Cx !! 
r+   )r?   r@   FNN)(r;   r   rR   r   rS   r   rD   rE   rT   r-   rW   r-   rU   r   rV   r   rG   rE   rX   r   rP   rL   rQ   rL   r,   r-   rY   rL   rH   r   rJ   r   rK   rL   rr   r   r   r   r   r   r   )r   r   rr   r   )r   r   r   r   )rr   r   )r   r   r   r   r   r   )r   r   )__name__
__module____qualname____annotations__r   r   r   r   r   r   r   r   r   propertyr   r   r   r   __classcell__)r   s   @r)   rc   rc      s	        IIIMMMOOO!!!!""""$  )%)#'))2 )2 )2 )2 )2 )2 )2X FJ    &   > > >O O O- - -          ! ! X!
 
 
 
.% % % % C C C C C C C Cr+   rc   )r   r   r   r   )r   r   r   r   r,   r-   )r?   Nr@   F)rC   r   rD   rE   rF   r   rG   rE   rH   r   r,   rI   rJ   r   rK   rL   )ro   r   rp   r   rq   r-   r,   r-   r1   r   rr   rs   rY   rL   )rw   r   rx   r   ry   r-   rz   r-   r{   r-   rH   r   rD   r   rG   r   r|   r   rJ   r   rP   rL   rQ   rL   ))
__future__r   collections.abcr   r   typingr   r   r`   	dask.baser	   r
   dask.highlevelgraphr   dask.layersr   distributed.shuffle._arrowr   distributed.shuffle._corer   r   r   distributed.shuffle._shuffler   r   r   r'   pandas._typingr   r   r   r   r   r:   r*   r>   rn   	hash_joinrv   r   rc   r3   r+   r)   <module>r      s   " " " " " " . . . . . . . . % % % % % % % %  2 2 2 2 2 2 2 2 . . . . . .       B B B B B B O O O O O O O O O O J J J J J J J J +==========****** '    (   0 "%DN DN DN DN DNN 	&    *" " " "Jf f f f fu f f f f fr+   