
    0Fie5$                       d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	 d dl
mZmZmZmZ d dlmZ d dlmZ d dlmZ  ej        d	          Zerd d
lmZ ne	Z ede          Z ed          Z G d dee                   ZdS )    )annotationsN)defaultdict)IteratorSized)TYPE_CHECKINGAnyGenericTypeVar)time)ResourceLimitersizeofzdistributed.shuffle)Buffer	ShardType)boundTc                  ^   e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   	 	 d?d@d"ZdAd$ZdBd)ZdCd*ZdDd,Z	e
dEd-            ZdFd.ZdGd1ZdFd2ZdFd3ZdFd4ZdHd5ZdId:Zej        dJd=            Zd>S )KShardsBuffera  A buffer for P2P shuffle

    The objects to buffer are typically bytes belonging to certain shards.
    Typically the buffer is implemented on sending and receiving end.

    The buffer allows for concurrent writing and buffers shards to reduce overhead of writing.

    The shards are typically provided in a format like::

        {
            "bucket-0": [b"shard1", b"shard2"],
            "bucket-1": [b"shard1", b"shard2"],
        }

    Buckets typically correspond to output partitions.

    If exceptions occur during writing, the buffer is automatically closed. Subsequent attempts to write will raise the same exception.
    Flushing will not raise an exception. To ensure that the buffer finished successfully, please call `ShardsBuffer.raise_on_exception`
    z!defaultdict[str, list[ShardType]]shardszdefaultdict[str, int]sizeszdefaultdict[str, list[int]]sizes_detailintconcurrency_limitr   memory_limiterzdict[str, float]diagnosticsmax_message_sizebytes_totalbytes_memorybytes_written
bytes_readbool_accepts_input_inputs_donezNone | Exception
_exceptionzlist[asyncio.Task]_taskszasyncio.Condition_shards_availablezasyncio.Lock_flush_lock   returnNonec                    d _         t          t                     _        t          t                     _        t          t                     _        d  _        | _        d _	        | _
        t          t                     _         fdt          |          D              _        t          j                     _        t          j                     _        | _        d _        d _        d _        d _        d S )NTFc                \    g | ](}t          j                                                  )S  )asynciocreate_task_background_task).0_selfs     ;lib/python3.11/site-packages/distributed/shuffle/_buffer.py
<listcomp>z)ShardsBuffer.__init__.<locals>.<listcomp>R   s@     
 
 
  5 5 7 788
 
 
    r   )r"   r   listr   r   r   r   r$   r   r#   r   floatr   ranger%   r/   	Conditionr&   Lockr'   r   r   r   r   r    )r4   r   r   r   s   `   r5   __init__zShardsBuffer.__init__C   s     #!$'' %%
'--!2!,-8-?-?
 
 
 
,--
 
 
 ")!2!4!4"<>> 0r7   dict[str, Any]c                    | j         | j        t          | j                  | j        | j        | j        | j        j        dS )N)memorytotalbucketswrittenreadr   memory_limit)	r   r   lenr   r   r    r   r   limitr4   s    r5   	heartbeatzShardsBuffer.heartbeat_   sD    '%4;'')O+ /5
 
 	
r7   idstrlist[ShardType]sizec                2  K   	 t                      }	 |                     ||           d {V  | xj        |z  c_        n%# t          $ r}|| _        d| _        Y d }~nd }~ww xY wt                      }d| j        d         z  d|z  z   | j        d<   d| j        d         z  d||z
  z  z   | j        d<   | j                            |           d {V  | xj	        |z  c_	        d S # | j                            |           d {V  | xj	        |z  c_	        w xY w)NTg\(\?avg_sizeg{Gz?avg_duration)
r   _processr   	Exceptionr$   r#   r   r   decreaser   )r4   rJ   r   rM   startestops          r5   processzShardsBuffer.processj   s     	&FFE)mmB/////////""d*""" ) ) )"#$(!!!!!!) 66D t'
33dTkA Z( 04d6F7 0u%0&D^, %..t444444444% %..t444444444%s4   C$ ,A  C$  
A"
AC$ A""AC$ $2Dc                "   K   t                      NNotImplementedError)r4   rJ   r   s      r5   rQ   zShardsBuffer._process   s      !###r7   r   c                    t                      rY   rZ   )r4   rJ   s     r5   rD   zShardsBuffer.read   s    !###r7   c                    | j          S rY   )r   rH   s    r5   emptyzShardsBuffer.empty   s    ;r7   c                   K   d fd}	  j         4 d {V   j                             |           d {V   j        r j        s	 d d d           d {V  d S t	           j         j        j                  } j        dk    rld}g }| j        k     r[	  j        |                                         }|	                    |            j
        |                                         }||z  } j        |xx         |z  cc<   nS# t          $ rF Y  j        |         s6 j        |=  j        |         rJ  j        |=  j
        |         rJ  j
        |= nw xY w	  j        |         s6 j        |=  j        |         rJ  j        |=  j
        |         rJ  j
        |= nH#  j        |         s6 j        |=  j        |         rJ  j        |=  j
        |         rJ  j
        |= w xY w| j        k     [n4 j                            |          } j                            |          } j                                          d d d           d {V  n# 1 d {V swxY w Y                        |||           d {V  w)Nr*   r!   c                 :    t           j        p j                  S rY   )r!   r   r#   rH   s   r5   	_continuez0ShardsBuffer._background_task.<locals>._continue   s    8t'8999r7   T)keyr   r*   r!   )r&   wait_forr#   r   maxr   __getitem__r   popappendr   
IndexError
notify_allrW   )r4   ra   part_idrM   r   shardss   `      r5   r1   zShardsBuffer._background_task   s     	: 	: 	: 	: 	: 	:	6- 4 4 4 4 4 4 4 4,55i@@@@@@@@@$ T[ 4 4 4 4 4 4 4 4 4 4 4 4 4 4 djdj.DEEE(1,,DF!666?$(K$8$<$<$>$>E"MM%000 $ 1' : > > @ @A AID Jw///14////) " " "!#';w#7 ?$(K$8+/:g+> > > >$(Jw$7+/+<W+E E E E$($5g$>" 0 $(;w#7 ?$(K$8+/:g+> > > >$(Jw$7+/+<W+E E E E$($5g$> $(;w#7 ?$(K$8+/:g+> > > >$(Jw$7+/+<W+E E E E$($5g$> > > > > !666" "[__W55F:>>'22D&1133374 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 48 ,,w555555555;	6sX   0I<IA-DF
EFAIEFAIAG##AI
IIdatadict[str, ShardType]c                4  K   | j         r| j         | j        r| j        rt          d|  d          |sdS d |                                D             }t          |                                          }| xj        |z  c_        | xj        |z  c_        | j	        
                    |           | j        4 d{V  |                                D ]f\  }}| j        |                             |           | j        |                             ||                    | j        |xx         ||         z  cc<   g| j                                         ddd          d{V  n# 1 d{V swxY w Y   | j	                                         d{V  ~|sJ dS )a  
        Writes objects into the local buffers, blocks until ready for more

        Parameters
        ----------
        data: dict
            A dictionary mapping destinations to the object that should
            be written to that destination

        Notes
        -----
        If this buffer has a memory limiter configured, then it will
        apply back-pressure to the sender (blocking further receives)
        if local resource usage hits the limit, until such time as the
        resource usage drops.

        zTrying to put data in closed .Nc                4    i | ]\  }}|t          |          S r.   r   )r2   workerrl   s      r5   
<dictcomp>z&ShardsBuffer.write.<locals>.<dictcomp>   s$    III=65IIIr7   )r$   r"   r#   RuntimeErroritemssumvaluesr   r   r   increaser&   r   rh   r   r   notifywait_for_available)r4   rn   r   total_batch_sizers   rl   s         r5   writezShardsBuffer.write   sQ     & ? 	"/!" 	Hd&7 	HFtFFFGGG 	FIIDJJLLIIIu||~~..--,,$$%5666) 	, 	, 	, 	, 	, 	, 	, 	,!% 4 4F#**5111!&)00v???
6"""eFm3"""""))+++	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, !44666666666s   <BE##
E-0E-c                "    | j         r| j         dS )z:Raises an exception if something went wrong during writingN)r$   rH   s    r5   raise_on_exceptionzShardsBuffer.raise_on_exception   s    ? 	"/!	" 	"r7   c                $   K    j         4 d{V  d _         j        4 d{V   j                                          j                             fd           d{V  d _         j                                         ddd          d{V  n# 1 d{V swxY w Y   t          j         j          d{V   j	        s% j
        rJ t                      j
        f            	 ddd          d{V  dS # 1 d{V swxY w Y   dS )zpWait until all writes are finished.

        This closes the buffer such that no new writes are allowed
        NFc                 0     j          p j        p j        S rY   )r   r$   r#   rH   s   r5   <lambda>z$ShardsBuffer.flush.<locals>.<lambda>   s    OStS$BS r7   T)r'   r"   r&   rj   rd   r#   r/   gatherr%   r$   r   typerH   s   `r5   flushzShardsBuffer.flush   s     
 # 	N 	N 	N 	N 	N 	N 	N 	N"'D- 4 4 4 4 4 4 4 4&11333,55SSSS         %)!&113334 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 .$+........? N,MMtDzz4;L.MMMMM	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	Ns6   C?ABC?
B	C?"B	#AC??
D	D	c                  K   |                                   d{V  | j        s%| j        rJ t          |           | j        f            | j        D ]}|                                 d| _        d| _        | j        	                                 d| _        | j
        4 d{V  | j
                                         ddd          d{V  n# 1 d{V swxY w Y   t          j        | j          d{V  dS )zUFlush and close the buffer.

        This cleans up all allocated resources.
        NFTr   )r   r$   r   r   r%   cancelr"   r#   r   clearr&   rj   r/   r   )r4   ts     r5   closezShardsBuffer.close   s     
 jjll 	J(II4::t7H*IIII 	 	AHHJJJJ# ) 	0 	0 	0 	0 	0 	0 	0 	0"--///	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0ndk**********s   "C
CCc                
   K   | S rY   r.   rH   s    r5   
__aenter__zShardsBuffer.__aenter__  s      r7   excr   typ	tracebackc                >   K   |                                   d {V  d S rY   )r   )r4   r   r   r   s       r5   	__aexit__zShardsBuffer.__aexit__  s,      jjllr7   nameIterator[None]c              #  z   K   t                      }d V  t                      }| j        |xx         ||z
  z  cc<   d S rY   )r   r   )r4   r   rT   rV   s       r5   r   zShardsBuffer.time  sJ      vv$,.r7   N)r(   r)   )r   r   r   r   r   r   r*   r+   )r*   r>   )rJ   rK   r   rL   rM   r   r*   r+   )rJ   rK   r   rL   r*   r+   )rJ   rK   r*   r   rc   )r*   r+   )rn   ro   r*   r+   )r*   r   )r   r   r   r   r   r   r*   r+   )r   rK   r*   r   )__name__
__module____qualname____doc____annotations__r=   rI   rW   rQ   rD   propertyr^   r1   r}   r   r   r   r   r   
contextlibcontextmanagerr   r.   r7   r5   r   r      s2         ( .---    ----####!!!!OOO    ((((
 "# "	    8	
 	
 	
 	
& & & &,$ $ $ $$ $ $ $    X!6 !6 !6 !6F)  )  )  ) V" " " "
N N N N&+ + + +$       / / / / / /r7   r   )
__future__r   r/   r   loggingcollectionsr   collections.abcr   r   typingr   r   r	   r
   distributed.metricsr   distributed.shuffle._limiterr   distributed.sizeofr   	getLoggerloggertyping_extensionsr   r   r   r   r.   r7   r5   <module>r      sR   " " " " " "       # # # # # # + + + + + + + + 7 7 7 7 7 7 7 7 7 7 7 7 $ $ $ $ $ $ 8 8 8 8 8 8 % % % % % %		0	1	1 (((((((FGKv...	GCLLs/ s/ s/ s/ s/79% s/ s/ s/ s/ s/r7   