
    0Fie@                       U d Z ddlmZ ddlZddlZddlmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZmZmZ ddlmZ ddlZddlmZ ddlmZmZ ddlmZ ddl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z- ddl.m/Z/m0Z0 ddl1m2Z2 ddl3m4Z4 erddl5Z6ddl7m8Z8 ddl9m:Z; e<e=df         Z>de?d<   e<e>df         Z@de?d<   e<eAdf         ZBde?d<   dDd(ZCdEd+ZDdFd/ZE G d0 d1e          ZFeGeF         ZHde?d2<   eGeH         ZIde?d3<   eGeI         ZJde?d4<   dGd5ZKdHd8ZL G d9 d:e#e!df                   ZM ed;<           G d= d>e$e!                               ZNdIdCZOdS )Ju[  
Utilities for rechunking arrays through p2p shuffles
====================================================

Tensors (or n-D arrays) in dask are split up across the workers as
regular n-D "chunks" or bricks. These bricks are stacked up to form
the global array.

A key algorithm for these tensors is to "rechunk" them. That is to
reassemble the same global representation using differently shaped n-D
bricks.

For example, to take an FFT of an n-D array, one uses a sequence of 1D
FFTs along each axis. The implementation in dask (and indeed almost
all distributed array frameworks) requires that 1D
axis along which the FFT is taken is local to a single brick. So to
perform the global FFT we need to arrange that each axis in turn is
local to bricks.

This can be achieved through all-to-all communication between the
workers to exchange sub-pieces of their individual bricks, given a
"rechunking" scheme.

To perform the redistribution, each input brick is cut up into some
number of smaller pieces, each of which contributes to one of the
output bricks. The mapping from input brick to output bricks
decomposes into the Cartesian product of axis-by-axis mappings. To
see this, consider first a 1D example.

Suppose our array is split up into three equally sized bricks::

    |----0----|----1----|----2----|

And the requested output chunks are::

    |--A--|--B--|----C----|---D---|

So brick 0 contributes to output bricks A and B; brick 1 contributes
to B and C; and brick 2 contributes to C and D.

Now consider a 2D example of the same problem::

    +----0----+----1----+----2----+
    |         |         |         |
    α         |         |         |
    |         |         |         |
    +---------+---------+---------+
    |         |         |         |
    β         |         |         |
    |         |         |         |
    +---------+---------+---------+
    |         |         |         |
    γ         |         |         |
    |         |         |         |
    +---------+---------+---------+

Each brick can be described as the ordered pair of row and column
1D bricks, (0, α), (0, β), ..., (2, γ). Since the rechunking does
not also reshape the array, axes do not "interfere" with one another
when determining output bricks::

    +--A--+--B--+----C----+---D---+
    |     |     |         |       |
    Σ     |     |         |       |
    |     |     |         |       |
    +-----+ ----+---------+-------+
    |     |     |         |       |
    |     |     |         |       |
    |     |     |         |       |
    Π     |     |         |       |
    |     |     |         |       |
    |     |     |         |       |
    |     |     |         |       |
    +-----+-----+---------+-------+

Consider the output (B, Σ) brick. This is contributed to by the
input (0, α) and (1, α) bricks. Determination of the subslices is
just done by slicing the the axes separately and combining them.

The key thing to note here is that we never need to create, and
store, the dense 2D mapping, we can instead construct it on the fly
for each output brick in turn as necessary.

The implementation here uses :func:`split_axes` to construct these
1D rechunkings. The output partitioning in
:meth:`~.ArrayRechunkRun.add_partition` then lazily constructs the
subsection of the Cartesian product it needs to determine the slices
of the current input brick.

This approach relies on the generic p2p buffering machinery to
ensure that there are not too many small messages exchanged, since
no special effort is made to minimise messages between workers when
a worker might have two adjacent input bricks that are sliced into
the same output brick.
    )annotationsN)defaultdict)CallableSequence)ThreadPoolExecutor)	dataclass)product)Path)TYPE_CHECKINGAny
NamedTuple)IOLoop)tokenize)HighLevelGraphMaterializedLayer)PooledRPCCall)NDIndex	ShuffleId
ShuffleRunShuffleSpecget_worker_pluginhandle_transfer_errorshandle_unpack_errors)ResourceLimiter)unpickle_bytestream)ShuffleSchedulerPlugin)barrier_keyshuffle_barrier)ShuffleWorkerPlugin)sizeof)	TypeAlias.r!   ChunkedAxisChunkedAxesNDSliceinput
np.ndarrayidr   input_chunkr   newolddiskboolreturnintc                    t          |          5  t                                          | |t          ||||                    cd d d            S # 1 swxY w Y   d S )N)r'   r)   r*   r+   )partition_idspec)r   r   add_partitionArrayRechunkSpec)r%   r'   r(   r)   r*   r+   s         <lib/python3.11/site-packages/distributed/shuffle/_rechunk.pyrechunk_transferr5      s     
 	#	# 
 
 ""00$!RScEEE 1 
 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
s   5AAAoutput_chunkbarrier_run_idc                    t          |           5  t                                          | ||          cd d d            S # 1 swxY w Y   d S N)r   r   get_output_partition)r'   r6   r7   s      r4   rechunk_unpackr;      s     
b	!	! 
 
 ""77
 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
s   #A  AAxda.Arraychunksc                V   dd l }dd lm} | j        dk    r |j        | j        || j                  S i }t          | |          }t          t          |                    }d| }t          j                            d          }g }	 |j        t          d | j        D                                 D ]>}
|	                    |f|
z              t"          | j        f|
z   ||
|| j        |f||f|
z   <   ?t&          ||	f||<   d| } |j        t          d |D                                 D ]}
t(          ||
|f||f|
z   <   t          j        d 	          5  t-          |          }t/          j        ||| g
          } |j        ||||           cd d d            S # 1 swxY w Y   d S )Nr   )r>   dtypezrechunk-transfer-zdistributed.p2p.diskc              3  4   K   | ]}t          |          V  d S r9   len.0dims     r4   	<genexpr>zrechunk_p2p.<locals>.<genexpr>   s(      !?!?s#c((!?!?!?!?!?!?    zrechunk-p2p-c              3  4   K   | ]}t          |          V  d S r9   rB   rD   s     r4   rG   zrechunk_p2p.<locals>.<genexpr>   s(      !=!=s#c((!=!=!=!=!=!=rH   c                    | dd          S )N    keys    r4   <lambda>zrechunk_p2p.<locals>.<lambda>   s    3qrr7 rH   )shuffle)dependencies)meta)numpy
dask.arrayarraysizeemptyshaper@   r   r   r   daskconfiggetndindextupler>   appendr5   namer   r;   annotater   r   from_collectionsArray)r<   r>   npdadsktoken_barrier_keyr_   r+   transfer_keysindexlayergraphs                r4   rechunk_p2prl      sC   v{{rxag>>>>CQEy//00L&u&&D!788DME!?!?ah!?!?!???@@ 

 

dWu_---VIH 
TGeO )%?C!%!!DE!=!=f!=!=!===>> L L .ulKTGeO	22	3	3	3 5 5!#&&/e1#NNNrxtV!444	5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5s   ;FF"%F"c                  2    e Zd ZU dZded<   ded<   ded<   dS )Splita"  Slice of a chunk that is concatenated with other splits to create a new chunk

    Splits define how to slice an input chunk on a single axis into small pieces
    that can be concatenated together with splits from other input chunks to create
    output chunks of a rechunk operation.
    r.   chunk_indexsplit_indexsliceN)__name__
__module____qualname____doc____annotations__rL   rH   r4   rn   rn      sA             LLLLLrH   rn   
SplitChunk	SplitAxis	SplitAxesc           
        ddl m}  || |          }g }t          |          D ]\  }}d | |         D             }t          |          D ]G\  }}	t          |	          D ]2\  }
\  }}||                             t	          ||
|                     3H|D ]}|                    d            |                    |           |S )a  Calculate how to split the old chunks on each axis to create the new chunks

    Parameters
    ----------
    old : ChunkedAxes
        Chunks along each axis of the old array
    new : ChunkedAxes
        Chunks along each axis of the new array

    Returns
    -------
    SplitAxes
        Splits along each axis that determine how to slice the input chunks to create
        the new chunks by concatenating the resulting shards.
    r   )
old_to_newc                    g | ]}g S rL   rL   )rE   _s     r4   
<listcomp>zsplit_axes.<locals>.<listcomp>   s    ;;;ar;;;rH   c                    | j         j        S r9   )rq   start)splits    r4   rO   zsplit_axes.<locals>.<lambda>  s    U[-> rH   rM   )dask.array.rechunkr{   	enumerater^   rn   sort)r*   r)   r{   _old_to_newaxes
axis_indexnew_axisold_axisnew_chunk_index	new_chunkrp   old_chunk_indexrq   	old_chunks                 r4   
split_axesr      s     .-----*S#&&KD )+ 6 6 	 	
H;;3z?;;;*3H*=*= 	 	&OY9B99M9M  55ou)00/;>>    " 	@ 	@INN>>N????HKrH   shards&list[list[tuple[NDIndex, np.ndarray]]]c                   dd l }ddlm} i }| D ]}|D ]
\  }}|||<   d t          |                                 D             }t          |           |j        |          k    sJ  |j        |d          }|                                D ]\  }}||t          |          <   |
                                }	 ||	          S )Nr   )concatenate3c                2    g | ]}t          |          d z   S )rK   )maxrD   s     r4   r~   z!convert_chunk.<locals>.<listcomp>  s"    ===C1===rH   O)r@   )rS   dask.array.corer   zipkeysrC   prodrW   itemsr]   tolist)
r   rc   r   indexedsublistri   shardsubshaperec_cat_argarrss
             r4   convert_chunkr     s   ,,,,,,)+G # ## 	# 	#LE5"GENN	# >=W\\^^(<===Hw<<7278,,,,,,"(83///K * *u$)E%LL!!D <rH   c                  T     e Zd ZdZd1 fdZd2d Zd3d%Zd4d)Zd5d+Zd6d/Z	d7d0Z
 xZS )8ArrayRechunkRuna4  State for a single active rechunk execution

    This object is responsible for splitting, sending, receiving and combining
    data shards.

    It is entirely agnostic to the distributed system and can perform a rechunk
    with other run instances using `rpc``.

    The user of this needs to guarantee that only `ArrayRechunkRun`s of the same unique
    `ShuffleID` and `run_id` interact.

    Parameters
    ----------
    worker_for:
        A mapping partition_id -> worker_address.
    old:
        Existing chunking of the array per dimension.
    new:
        Desired chunking of the array per dimension.
    id:
        A unique `ShuffleID` this belongs to.
    run_id:
        A unique identifier of the specific execution of the shuffle this belongs to.
    local_address:
        The local address this Shuffle can be contacted by using `rpc`.
    directory:
        The scratch directory to buffer data in.
    executor:
        Thread pool to use for offloading compute.
    rpc:
        A callable returning a PooledRPCCall to contact other Shuffle instances.
        Typically a ConnectionPool.
    scheduler:
        A PooledRPCCall to to contact the scheduler.
    memory_limiter_disk:
    memory_limiter_comm:
        A ``ResourceLimiter`` limiting the total amount of memory used in either
        buffer.
    
worker_fordict[NDIndex, str]r*   r#   r)   r'   r   run_idr.   local_addressstr	directoryexecutorr   rpcCallable[[str], PooledRPCCall]	schedulerr   memory_limiter_diskr   memory_limiter_commsr+   r,   loopr   c                n   t                                          ||||||	|
||||           || _        || _        t	          t
                    }|                                D ] \  }}||                             |           !t          |          | _	        || _
        t          ||          | _        d S )N)r'   r   r   r   r   r   r   r   r   r+   r   )super__init__r*   r)   r   listr   r^   dictpartitions_ofr   r   )selfr   r*   r)   r'   r   r   r   r   r   r   r   r   r+   r   r   partaddr	__class__s                     r4   r   zArrayRechunkRun.__init__K  s    " 	'!5 3 	 	
 	
 	
 #D))$**,, 	- 	-JD$$&&t,,,,!-00$$S#..rH   dataFlist[tuple[NDIndex, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]]]r-   Nonec                  K   |                                   t          t                    }|D ]k}|\  }}|| j        v r| j                            |           |D ] \  }}||                             |           !| xj        t          |          z  c_        l~|sd S 	 |                     |           d {V  d S # t          $ r}|| _
         d }~ww xY wr9   )raise_if_closedr   r   receivedaddr^   total_recvdr    _write_to_disk	Exception
_exception)	r   r   r   did1payloadid2r   es	            r4   _receivezArrayRechunkRun._receiver  s      	 T"" 	* 	*ALCdm##Mc"""% * *
Us""5))))q		) 	F	%%f----------- 	 	 	DO	s   B< <
CCCr&   r0   r   dict[str, tuple[NDIndex, Any]]c                v   t          t                    }t          d t          | j                  D              }|D ]\}t          | \  }}}||         }	|	j        |	                                }	|| j        |                                      |||	ff           ]fd|	                                D             S )Nc              3  ,   K   | ]\  }}||         V  d S r9   rL   )rE   axisis      r4   rG   z3ArrayRechunkRun._shard_partition.<locals>.<genexpr>  s*      WWqT!WWWWWWWrH   c                     i | ]
\  }}||fS rL   rL   )rE   kvr0   s      r4   
<dictcomp>z4ArrayRechunkRun._shard_partition.<locals>.<dictcomp>  s$    ===AL!$===rH   )
r   r   r	   r   r   basecopyr   r^   r   )
r   r   r0   outndsplitsndsplitro   shard_indexndslicer   s
     `       r4   _shard_partitionz ArrayRechunkRun._shard_partition  s     LWL
 L
 WWC4V4VWWWX 	 	G03W-KgME z%

,-44{E23    >=======rH   rN   kwargsr   c                    |                      |          }| j                            d          5  t          |          cd d d            S # 1 swxY w Y   d S )Nread)_read_from_disk_disk_buffertimer   )r   r0   rN   r   r   s        r4   _get_output_partitionz%ArrayRechunkRun._get_output_partition  s    
 ##L11 ##F++ 	' 	' &&	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	's   AAAbufferc                    |S r9   rL   )r   r   s     r4   deserializezArrayRechunkRun.deserialize  s    rH   pathr
   2tuple[list[list[tuple[NDIndex, np.ndarray]]], int]c                   |                     d          5 }t          t          j        |                                d                    }ddd           n# 1 swxY w Y   t	          t          |                    }||j        fS )a  Open a memory-mapped file descriptor to disk, read all metadata, and unpickle
        all arrays. This is a fast sequence of short reads interleaved with seeks.
        Do not read in memory the actual data; the arrays' buffers will point to the
        memory-mapped area.

        The file descriptor will be automatically closed by the kernel when all the
        returned arrays are dereferenced, which will happen after the call to
        concatenate3.
        zr+b)moder   N)open
memoryviewmmapfilenor   r   nbytes)r   r   fhr   r   s        r4   r   zArrayRechunkRun.read  s     YYEY"" 	;b	"))++q 9 9::F	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; )&1122v}$$s   5AAAc                    | j         |         S r9   )r   )r   r'   s     r4   _get_assigned_workerz$ArrayRechunkRun._get_assigned_worker  s    r""rH   )r   r   r*   r#   r)   r#   r'   r   r   r.   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r+   r,   r   r   )r   r   r-   r   )r   r&   r0   r   r-   r   )r0   r   rN   r   r   r   r-   r&   )r   r   r-   r   )r   r
   r-   r   )r'   r   r-   r   )rr   rs   rt   ru   r   r   r   r   r   r   r   __classcell__)r   s   @r4   r   r   "  s        & &P%/ %/ %/ %/ %/ %/N   4> > > >,	' 	' 	' 	'   % % % %"# # # # # # # #rH   r   T)frozenc                  4    e Zd ZU ded<   ded<   ddZddZdS )r3   r#   r)   r*   pluginr   r-   r   c                t    t          d | j        D              }|                    | j        |t                    S )Nc              3  N   K   | ] }t          t          |                    V  !d S r9   )rangerC   )rE   cs     r4   rG   z7ArrayRechunkSpec._pin_output_workers.<locals>.<genexpr>  s.      >>eCFFmm>>>>>>rH   )r	   r)   _pin_output_workersr'   _get_worker_for_hash_sharding)r   r   	parts_outs      r4   r   z$ArrayRechunkSpec._pin_output_workers  s?    >>TX>>>?	))GY =
 
 	
rH   r   r.   r   r   r   c                8   t          || j        | j        | j        |t          j                            |j        j        d| j         d|           |j	        |j        j
        |j        j        |j        j        |j        |j        | j        |j        j                  S )Nzshuffle--)r   r*   r)   r'   r   r   r   r   r   r   r   r   r+   r   )r   r*   r)   r'   osr   joinworkerlocal_directory	_executoraddressr   r   r   r   r+   r   )r   r   r   r   s       r4   create_run_on_workerz%ArrayRechunkSpec.create_run_on_worker  s     !wgll--47--V--  % -/!m- & :!'!<##
 
 
 	
rH   N)r   r   r-   r   )r   r.   r   r   r   r   r-   r   )rr   rs   rt   rv   r   r  rL   rH   r4   r3   r3     sV         
 
 
 

 
 
 
 
 
rH   r3   output_partitionworkersSequence[str]r   c                P    t          |           t          |          z  }||         S )zJGet address of target worker for this output partition using hash sharding)hashrC   )r  r	  r   s      r4   r   r     s'     	W-A1:rH   )r%   r&   r'   r   r(   r   r)   r#   r*   r#   r+   r,   r-   r.   )r'   r   r6   r   r7   r.   r-   r&   )r<   r=   r>   r#   r-   r=   )r*   r#   r)   r#   r-   ry   )r   r   r-   r&   )r  r   r	  r
  r-   r   )Pru   
__future__r   r   r  collectionsr   collections.abcr   r   concurrent.futuresr   dataclassesr   	itertoolsr	   pathlibr
   typingr   r   r   tornado.ioloopr   rY   	dask.baser   dask.highlevelgraphr   r   distributed.corer   distributed.shuffle._corer   r   r   r   r   r   r   distributed.shuffle._limiterr   distributed.shuffle._pickler   %distributed.shuffle._scheduler_pluginr   distributed.shuffle._shuffler   r   "distributed.shuffle._worker_pluginr   distributed.sizeofr    rS   rc   typing_extensionsr!   rT   rU   rd   r]   floatr"   rv   r#   rq   r$   r5   r;   rl   rn   r   rw   rx   ry   r   r   r   r3   r   rL   rH   r4   <module>r"     s  ^ ^ ^@ # " " " " "  				 # # # # # # . . . . . . . . 1 1 1 1 1 1 ! ! ! ! ! !             1 1 1 1 1 1 1 1 1 1 ! ! ! ! ! !        A A A A A A A A * * * * * *                  9 8 8 8 8 8 ; ; ; ; ; ; H H H H H H E E E E E E E E B B B B B B % % % % % % ++++++ucz* * * * *{C/0 0 0 0 05#:& & & & &
 
 
 
 
 
 
 
&5 &5 &5 &5R    J   & U
 # # # #J'	 ' ' ' 'I	 & & & &   D   .`# `# `# `# `#j,!67 `# `# `#F $"
 "
 "
 "
 "
{7+ "
 "
 "
J     rH   