
    0FieB                       U d dl mZ d dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d dlZd d
lmZ d dlZd dlmZ d dlmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z?  ej@        d          ZAerd dlBZCd dlDZEd dlFmGZG d dlHmIZI dId)ZJdJd,ZKdKd/ZL	 dLdMd3ZMeNe"eOf         ZPd4eQd5<    G d6 d7e           ZRdNd;ZSdOd?ZT G d@ dAe2eUdf                   ZV edBC           G dD dEe3eU                               ZWdPdHZXdS )Q    annotationsN)defaultdict)Callable
CollectionIterableIteratorSequence)ThreadPoolExecutor)	dataclass)partial)Path)TYPE_CHECKINGAny)IOLoop)tokenize)HighLevelGraph)Layer)Key)PooledRPCCall)
Reschedule)check_dtype_supportcheck_minimal_arrow_versionconvert_shardsdeserialize_tablelist_of_buffers_to_tableread_from_diskserialize_table)NDIndex	ShuffleId
ShuffleRunShuffleSpecbarrier_keyget_worker_pluginhandle_transfer_errorshandle_unpack_errors)ResourceLimiter)ShuffleSchedulerPlugin)ShuffleWorkerPlugin)sizeofzdistributed.shuffle)	TypeAlias)	DataFrameinputpd.DataFrameidr    input_partitionintnpartitionscolumnstrmeta	parts_outset[int]diskboolreturnc                    t          |          5  t                                          | |t          ||||||                    cd d d            S # 1 swxY w Y   d S )N)r/   r2   r3   r5   r6   r8   )spec)r%   r$   add_partitionDataFrameShuffleSpec)r-   r/   r0   r2   r3   r5   r6   r8   s           <lib/python3.11/site-packages/distributed/shuffle/_shuffle.pyshuffle_transferr@   ;   s     
 	#	# 
 
 ""00%'#   1 
 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
s   7AAAoutput_partitionbarrier_run_idc                    t          |           5  t                                          | ||          cd d d            S # 1 swxY w Y   d S N)r&   r$   get_output_partition)r/   rA   rB   s      r?   shuffle_unpackrF   T   s     
b	!	! 
 
 ""77 0
 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
s   #A  AArun_ids	list[int]c                    	 t                                          | |          S # t          $ r}|d }~wt          $ r}t	          d|            |d }~ww xY w)Nz&shuffle_barrier failed during shuffle )r$   barrierr   	ExceptionRuntimeError)r/   rG   es      r?   shuffle_barrierrN   ]   s{    Q ""**2w777    Q Q QHBHHIIqPQs   !$ 
A0AAAdfr,   
int | Nonec           	     j   dd l }ddlm} | j        }|j        j                            ||         j                  s!t          d|d||         j         d          t          |           |p| j
        }t          | ||          }t          d |j        D                       r#d |j        D             }t          d|           d	| }t          j                            d
          }	t#          |||| j
        | j        ||	          }
 |t'          j        ||
| g          ||d g|dz   z            S )Nr   )new_dd_objectzExpected meta column=z to be an integer column, is .c              3  B   K   | ]}t          |t                     V  d S rD   )
isinstancer4   .0cs     r?   	<genexpr>z*rearrange_by_column_p2p.<locals>.<genexpr>x   s/      
8
8az!S!!!
8
8
8
8
8
8    c                X    i | ]'}t          |t                    |t          |          (S  )rU   r4   typerV   s     r?   
<dictcomp>z+rearrange_by_column_p2p.<locals>.<dictcomp>y   s/    RRRaz!S?Q?QRq$q''RRRrZ   z0p2p requires all column names to be str, found: zshuffle_p2p-zdistributed.p2p.disk)npartitions_input
name_input
meta_inputr8      )pandasdask.dataframe.corerR   _metaapitypesis_integer_dtypedtype	TypeErrorr   r2   r   anycolumnsdaskconfiggetP2PShuffleLayer_namer   from_collections)rO   r3   r2   pdrR   r5   tokenunsupportednamer8   layers              r?   rearrange_by_column_p2prx   f   s   
 1111118D6<((f);<< 
XVXX4<CUXXX
 
 	
 /KR--E

8
84<
8
8
888 
RR4<RRRL{LL
 
 	
 "%!!D!788D.8  E ='ebT::	+/"	  rZ   r+   _T_LowLevelGraphc                       e Zd ZU ded<   ded<   ded<   ded<   ded<   ded	<   d
ed<   ded<   	 	 d+d, fdZd-dZd.dZd/dZed0d            Z	d1dZ
d2dZd3d Zd4d"Zd5d%Zd6d)Zd0d*Z xZS )7rp   r4   rv   r3   r1   r2   r_   r`   r.   ra   r9   r8   r7   r6   NIterable[int] | Noner   dict | Nonec
                D   t                       || _        || _        || _        || _        || _        || _        |rt          |          | _        n&t          t          | j                            | _        || _
        t                                          |	           d S )Nr   )r   rv   r3   r2   r`   ra   r8   setr6   ranger_   super__init__)selfrv   r3   r2   r_   r`   ra   r8   r6   r   	__class__s             r?   r   zP2PShuffleLayer.__init__   s     	$%%%	&$$	 	: ^^DNN t'7!8!899DN!2[11111rZ   r:   c                P    t          |           j         d| j         d| j         dS )Nz<name='z', npartitions=>)r]   __name__rv   r2   r   s    r?   __repr__zP2PShuffleLayer.__repr__   s1    Dzz"XX49XXTEUXXX	
rZ   set[Key]c                *      fd j         D             S )Nc                "    h | ]}j         |fS r\   rv   )rW   partr   s     r?   	<setcomp>z2P2PShuffleLayer.get_output_keys.<locals>.<setcomp>   s     ===dD!===rZ   r6   r   s   `r?   get_output_keyszP2PShuffleLayer.get_output_keys   s    ====dn====rZ   c                "    t          | d          S )N_cached_dict)hasattrr   s    r?   is_materializedzP2PShuffleLayer.is_materialized   s    t^,,,rZ   ry   c                x    |  t          | d          r| j        S |                                 }|| _        | j        S )z$Materialize full dict representationr   )r   r   _construct_graph)r   dsks     r?   _dictzP2PShuffleLayer._dict   sG     	+4(( 	$$$''))C #D  rZ   keyr   tuplec                    | j         |         S rD   )r   )r   r   s     r?   __getitem__zP2PShuffleLayer.__getitem__   s    z#rZ   Iterator[Key]c                *    t          | j                  S rD   )iterr   r   s    r?   __iter__zP2PShuffleLayer.__iter__   s    DJrZ   c                *    t          | j                  S rD   )lenr   r   s    r?   __len__zP2PShuffleLayer.__len__   s    4:rZ   Iterable[int]c           
     v    t          | j        | j        | j        | j        | j        | j        | j        |          S )Nr   )rp   rv   r3   r2   r_   r`   ra   r8   )r   r6   s     r?   _cullzP2PShuffleLayer._cull   sA    IK"OOI	
 	
 	
 		
rZ   keysIterable[Key]c                    t                      }|D ]f}t          |t                    rOt          |          dk    r<|\  }}|| j        k    r,t          |t
                    sJ |                    |           g|S )z4Simple utility to convert keys to partition indices.   )r~   rU   r   r   rv   r1   add)r   r   partsr   rv   r   s         r?   _keys_to_partszP2PShuffleLayer._keys_to_parts   s{     	$ 	$C#u%% $#c((a-- 
d49$$%dC00000IIdOOOrZ   all_keysCollection[Key]tuple[P2PShuffleLayer, dict]c                                          |          }t           fdt           j                  D                        fd|D             }|t	           j                  k    r                     |          }||fS  |fS )a  Cull a P2PShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        c                "    h | ]}j         |fS r\   )r`   )rW   ir   s     r?   r   z'P2PShuffleLayer.cull.<locals>.<setcomp>   s     IIIadoq!IIIrZ   c                $    i | ]}j         |fS r\   r   )rW   r   input_partsr   s     r?   r^   z(P2PShuffleLayer.cull.<locals>.<dictcomp>   s"    LLL$	4(+LLLrZ   )r   	frozensetr   r_   r~   r6   r   )r   r   r   r6   culled_depsculled_layerr   s   `     @r?   cullzP2PShuffleLayer.cull   s     ''--	IIII51G+H+HIII
 
 MLLLL)LLLDN++++::i00L,,$$rZ   c           
        t          | j        | j        | j        | j                  }i }t          t          |                    }d|z   }t                      }t          | j	                  D ]N}|
                    ||f           t          | j        |f||| j        | j        | j        | j        | j        f	|||f<   Ot          ||f||<   | j        }| j        D ]}t           |||f|||f<   |S )Nzshuffle-transfer-)r   r`   r3   r2   r6   r#   r    listr   r_   appendr@   ra   r8   rN   rv   rF   )r   rt   r   _barrier_keyrv   transfer_keysr   part_outs           r?   r   z P2PShuffleLayer._construct_graph  s   $+t7GXX ""9U#3#344"U*t-.. 	 	A  $+++ !$ 	
Cq	NN -e]CLy 	 	H	%Cx !! 
rZ   )NN)rv   r4   r3   r4   r2   r1   r_   r1   r`   r4   ra   r.   r8   r9   r6   r{   r   r|   )r:   r4   )r:   r   )r:   r9   )r:   ry   )r   r   r:   r   )r:   r   )r:   r1   )r6   r   r:   rp   )r   r   r:   r7   )r   r   r   r   r:   r   )r   
__module____qualname____annotations__r   r   r   r   propertyr   r   r   r   r   r   r   r   __classcell__r   s   @r?   rp   rp      s        IIIKKKOOOJJJ +/#'2 2 2 2 2 2 24
 
 
 

> > > >- - - - 	! 	! 	! X	!             

 

 

 

	 	 	 	% % % %.       rZ   rp   
worker_for	pd.Seriesdict[Any, pa.Table]c                   ddl }ddlm} | j        }t	          |t
                    sJ  |          |                     j        j        	                    d          |dd          } t          |           }|si S  || d                              d          |                    d                   }                    dg          ~ |                    |d	d         |dd
         k              d         d	z   }	|                    dg|	g          }	fdt!          j        d|	          D             }
|
                                        |	d
         d                     ||	         }fdt)          ||
          D             }t+          t-          t          |                                                    |k    sJ |S )zO
    Split data into many arrow batches, partitioned by destination worker
    r   N)to_pyarrow_table_dispatch_workerTinner)rightleft_onright_indexhow)preserve_indexrb   c                J    g | ]\  }}                     |||z
              S offsetlengthslicerW   abts      r?   
<listcomp>z#split_by_worker.<locals>.<listcomp>N  >       ,0AqqQ''  rZ   r   r   c                <    i | ]\  }}j         j        |         |S r\   )cat
categories)rW   codeshardr   s      r?   r^   z#split_by_worker.<locals>.<dictcomp>T  s;        D% 	!$'  rZ   )numpydask.dataframe.dispatchr   _constructor_slicedrU   r]   merger   codesrenamer   sort_byasarraydropwhereconcatenatetoolzsliding_windowr   r   zipsummapvalues)rO   r3   r5   r   npr   constructornrowsr   splitsshardsunique_codesoutr   s      `         @r?   split_by_workerr   (  s
    AAAAAA (Kk4(((((Z((J	n")))44	 
 
 
B GGE 	 	"!"T:::A			)AJJq|$$E		{A
XXeABBi5":-..q1A5F^^aS&M**F   494HF4S4S  F MM!''D'99:::=L    |V44  C
 s3

%%&&%////JrZ   r   pa.Tabledict[int, pa.Table]c                2    ddl }                     |g                                          |                                         }|                                                      |           |                     |                   }|                    |dd         |dd         k              d         dz   }|                    dg|g          } fdt          j
        d|          D             }|                                         |d         d                     t                     t          t          t          |                    k    sJ t          |          t          |          k    sJ t!          t#          ||                    S )zL
    Split data into many arrow batches, partitioned by final partition
    r   Nrb   r   c                J    g | ]\  }}                     |||z
              S r   r   r   s      r?   r   z&split_by_partition.<locals>.<listcomp>k  r   rZ   r   r   )r   select	to_pandasuniquesortr   r   r   r   r   r   r   r   r   r   r   dictr   )r   r3   r   
partitions	partitionr   r   s   `      r?   split_by_partitionr	  ]  sy    6(##--//7>>@@JOO			&A

1V9%%IXXimy"~566q9A=F^^aS&M**F   494HF4S4S  F MM!''D'99:::q66SS&))******z??c&kk))))J''(((rZ   c                       e Zd ZU dZded<   ded<   ded<   ded	<   d5 fdZd6d#Zd7d&Zd8d+Zd9d-Z	d:d.Z
d;d2Zd<d4Z xZS )=DataFrameShuffleRuna  State for a single active shuffle execution

    This object is responsible for splitting, sending, receiving and combining
    data shards.

    It is entirely agnostic to the distributed system and can perform a shuffle
    with other run instances using `rpc`.

    The user of this needs to guarantee that only `DataFrameShuffleRun`s of the
    same unique `ShuffleID` and `run_id` interact.

    Parameters
    ----------
    worker_for:
        A mapping partition_id -> worker_address.
    column:
        The data column we split the input partition by.
    id:
        A unique `ShuffleID` this belongs to.
    run_id:
        A unique identifier of the specific execution of the shuffle this belongs to.
    local_address:
        The local address this Shuffle can be contacted by using `rpc`.
    directory:
        The scratch directory to buffer data in.
    executor:
        Thread pool to use for offloading compute.
    rpc:
        A callable returning a PooledRPCCall to contact other Shuffle instances.
        Typically a ConnectionPool.
    scheduler:
        A PooledRPCCall to to contact the scheduler.
    memory_limiter_disk:
    memory_limiter_comm:
        A ``ResourceLimiter`` limiting the total amount of memory used in either
        buffer.
    r4   r3   r.   r5   zdict[str, list[int]]partitions_ofr   r   dict[int, str]r/   r    run_idr1   local_address	directoryexecutorr   rpcCallable[[str], PooledRPCCall]	schedulerr   memory_limiter_diskr'   memory_limiter_commsr8   r9   loopr   c                   dd l }t                                          ||||||	|
||||           || _        || _        t          t                    }|                                D ] \  }}||                             |           !t          |          | _
         |j        |d                              d          | _        d S )Nr   )r/   r  r  r  r  r  r  r  r  r8   r  _workersr   category)rc   r   r   r3   r5   r   r   itemsr   r  r  Seriesastyper   )r   r   r3   r5   r/   r  r  r  r  r  r  r  r  r8   r  rs   r  r   addrr   s                      r?   r   zDataFrameShuffleRun.__init__  s    " 	'!5 3 	 	
 	
 	
 	#D))$**,, 	- 	-JD$$&&t,,,,!-00#")JZ@@@GG
SSrZ   datalist[tuple[int, bytes]]r:   Nonec                  K   |                                   g }|D ]i}|d         | j        vrX|                    |d                    | j                            |d                    | xj        t          |          z  c_        j~|sd S 	 |                     | j        |           d {V }~|                     |           d {V  d S # t          $ r}|| _
         d }~ww xY w)Nr   rb   )raise_if_closedreceivedr   r   total_recvdr*   offload_repartition_buffers_write_to_diskrK   
_exception)r   r  filtereddgroupsrM   s         r?   _receivezDataFrameShuffleRun._receive  s      	. 	.At4=((!%%%!!!A$'''  F1II-   	F	<<(A8LLLLLLLLF%%f----------- 	 	 	DO	s   =C
 

C!CC!list[bytes]dict[NDIndex, bytes]c                   t          |          }t          || j                  }t          |          t	          t          t          |                                                    k    sJ ~d |                                D             S )Nc                6    i | ]\  }}|ft          |          S r\   r   )rW   kvs      r?   r^   z<DataFrameShuffleRun._repartition_buffers.<locals>.<dictcomp>  s(    DDDTQoa((DDDrZ   )r   r	  r3   r   r   r   r   r  )r   r  tabler,  s       r?   r'  z(DataFrameShuffleRun._repartition_buffers  sp    (..#E4;775zzSS&--//!:!:;;;;;;DDV\\^^DDDDrZ   partition_idkwargsr   dict[str, tuple[int, bytes]]c                    t          || j        | j        | j                  }fd|                                D             }|S )Nc                :    i | ]\  }}|t          |          fS r\   r2  )rW   r3  r   r6  s      r?   r^   z8DataFrameShuffleRun._shard_partition.<locals>.<dictcomp>  s,    MMMAq<!3!34MMMrZ   )r   r3   r5   r   r  )r   r  r6  r7  r   s     `  r?   _shard_partitionz$DataFrameShuffleRun._shard_partition  sO     KIO	
 
 NMMMMMM
rZ   r   c                    	 |                      |f          }t          || j                  S # t          $ r | j                                        cY S w xY wrD   )_read_from_diskr   r5   KeyErrorcopy)r   r6  r   r7  r  s        r?   _get_output_partitionz)DataFrameShuffleRun._get_output_partition  sa    	$''88D!$	222 	$ 	$ 	$9>>#####	$s   *- #AAc                    | j         |         S rD   )r   )r   r/   s     r?   _get_assigned_workerz(DataFrameShuffleRun._get_assigned_worker  s    r""rZ   pathr   tuple[pa.Table, int]c                     t          |          S rD   )r   )r   rC  s     r?   readzDataFrameShuffleRun.read  s    d###rZ   bufferc                     t          |          S rD   )r   )r   rG  s     r?   deserializezDataFrameShuffleRun.deserialize  s     (((rZ   )r   r  r3   r4   r5   r.   r/   r    r  r1   r  r4   r  r4   r  r   r  r  r  r   r  r'   r  r'   r8   r9   r  r   )r  r   r:   r!  )r  r.  r:   r/  )r  r.   r6  r1   r7  r   r:   r8  )r6  r1   r   r4   r7  r   r:   r.   )r/   r1   r:   r4   )rC  r   r:   rD  )rG  r   r:   r   )r   r   r   __doc__r   r   r-  r'  r;  r@  rB  rF  rI  r   r   s   @r?   r  r  t  s        $ $L KKK''''&T &T &T &T &T &TP   (E E E E   
$ 
$ 
$ 
$# # # #$ $ $ $) ) ) ) ) ) ) )rZ   r  T)frozenc                  H    e Zd ZU ded<   ded<   ded<   ded<   ddZddZdS )r>   r1   r2   r4   r3   r.   r5   r7   r6   pluginr(   r:   r  c                x    t          t          | j                  }|                    | j        | j        |          S rD   )r   _get_worker_for_range_shardingr2   _pin_output_workersr/   r6   )r   rM  pick_workers      r?   rP  z(DataFrameShuffleSpec._pin_output_workers  s1    <d>NOO))$'4>;OOOrZ   r  r   r)   r!   c                d   t          | j        | j        || j        |t          j                            |j        j        d| j         d|           |j	        |j        j
        |j        j        |j        j        | j        r|j        nt          d           |j        | j        |j        j                  S )Nzshuffle--)r3   r5   r   r/   r  r  r  r  r  r  r  r  r8   r  )r  r3   r5   r/   osrC  joinworkerlocal_directory	_executoraddressr  r  r8   r  r'   r  r  )r   r  r   rM  s       r?   create_run_on_workerz)DataFrameShuffleSpec.create_run_on_worker  s     #;!wgll--47--V--  % -/!m-y!' : : &&!'!<#'
 
 
 	
rZ   N)rM  r(   r:   r  )r  r1   r   r  rM  r)   r:   r!   )r   r   r   r   rP  rZ  r\   rZ   r?   r>   r>     so         KKKP P P P
 
 
 
 
 
rZ   r>   workersSequence[str]c                <    t          |          |z  | z  }||         S )zKGet address of target worker for this output partition using range sharding)r   )r2   rA   r[  r   s       r?   rO  rO  -  s$     	G'';6A1:rZ   )r-   r.   r/   r    r0   r1   r2   r1   r3   r4   r5   r.   r6   r7   r8   r9   r:   r1   )r/   r    rA   r1   rB   r1   r:   r.   )r/   r    rG   rH   r:   r1   rD   )rO   r,   r3   r4   r2   rP   r:   r,   )
rO   r.   r3   r4   r5   r.   r   r   r:   r   )r   r   r3   r4   r:   r   )r2   r1   rA   r1   r[  r\  r:   r4   )Y
__future__r   loggingrT  collectionsr   collections.abcr   r   r   r	   r
   concurrent.futuresr   dataclassesr   	functoolsr   pathlibr   typingr   r   r   tornado.ioloopr   rm   	dask.baser   dask.highlevelgraphr   dask.layersr   dask.typingr   distributed.corer   distributed.exceptionsr   distributed.shuffle._arrowr   r   r   r   r   r   r   distributed.shuffle._corer   r    r!   r"   r#   r$   r%   r&   distributed.shuffle._limiterr'   %distributed.shuffle._scheduler_pluginr(   "distributed.shuffle._worker_pluginr)   distributed.sizeofr*   	getLoggerloggerrc   rs   pyarrowpatyping_extensionsr+   dask.dataframer,   r@   rF   rN   rx   r  r   ry   r   rp   r   r	  r1   r  r>   rO  r\   rZ   r?   <module>rz     s6   " " " " " " "  				 # # # # # # N N N N N N N N N N N N N N 1 1 1 1 1 1 ! ! ! ! ! !             % % % % % % % %  ! ! ! ! ! !        . . . . . .             * * * * * * - - - - - -                 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 9 8 8 8 8 8 H H H H H H B B B B B B % % % % % %		0	1	1 ) ,+++++((((((
 
 
 
2
 
 
 
Q Q Q Q #) ) ) ) )X #3:.  . . . .P P P P Pe P P Pf2 2 2 2j) ) ) ).Q) Q) Q) Q) Q)*S.%89 Q) Q) Q)h $!
 !
 !
 !
 !
;s+ !
 !
 !
H     rZ   