
    0Fie	                    z    d dl mZ d dlmZmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ  G d d	e
          Zd
S )    )annotations)	AwaitableCallable)Any)parse_bytes)ShardsBuffer)ResourceLimiter)
log_errorsc                  F     e Zd ZdZ ed          Z	 dd fd
ZddZ xZS )CommShardsBuffera%  Accept, buffer, and send many small messages to many workers

    This takes in lots of small messages destined for remote workers, buffers
    those messages in memory, and then sends out batches of them when possible
    to different workers.  This tries to send larger messages when possible,
    while also respecting a memory bound

    **State**

    -   shards: dict[str, list[ShardType]]

        This is our in-memory buffer of data waiting to be sent to other workers.

    -   sizes: dict[str, int]

        The size of each list of shards.  We find the largest and send data from that buffer

    State
    -----
    max_message_size: int
        The maximum size in bytes of a single message that we want to send

    Parameters
    ----------
    send : callable
        How to send a list of shards to a worker
        Expects an address of the target worker (string)
        and a payload of shards (list of bytes) to send to that worker
    memory_limiter : ResourceLimiter
        Limiter for memory usage (in bytes). If the incoming data that
        has yet to be processed exceeds this limit, then the buffer will
        block until below the threshold. See :meth:`.write` for the
        implementation of this scheme.
    concurrency_limit : int
        Number of background tasks to run.
    z2 MiB
   send7Callable[[str, list[tuple[Any, Any]]], Awaitable[None]]memory_limiterr	   concurrency_limitintc                r    t                                          ||t          j                   || _        d S )N)r   r   max_message_size)super__init__r   r   r   )selfr   r   r   	__class__s       :lib/python3.11/site-packages/distributed/shuffle/_comms.pyr   zCommShardsBuffer.__init__5   s?     	)/-> 	 	
 	
 	

 			    addressstrshardslist[tuple[Any, Any]]returnNonec                   K   t                      5  |                     d          5  |                     ||           d{V  ddd           n# 1 swxY w Y   ddd           dS # 1 swxY w Y   dS )z,Send one message off to a neighboring workerr   N)r
   timer   )r   r   r   s      r   _processzCommShardsBuffer._processB   s     \\ 	1 	16"" 1 1ii0000000001 1 1 1 1 1 1 1 1 1 1 1 1 1 1	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1s4   A(AA(A	A(A	A((A,/A,)r   )r   r   r   r	   r   r   )r   r   r   r   r   r    )	__name__
__module____qualname____doc__r   r   r   r#   __classcell__)r   s   @r   r   r      sx        # #J #{7++ "$	      1 1 1 1 1 1 1 1r   r   N)
__future__r   collections.abcr   r   typingr   
dask.utilsr   distributed.shuffle._diskr   distributed.shuffle._limiterr	   distributed.utilsr
   r    r   r   <module>r1      s    " " " " " " / / / / / / / /       " " " " " " 2 2 2 2 2 2 8 8 8 8 8 8 ( ( ( ( ( (:1 :1 :1 :1 :1| :1 :1 :1 :1 :1r   