
    0Fie                    n   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZmZ d dlmZmZmZmZ d d	lmZ d d
lmZm Z m!Z! d dl"m#Z# d dl$m%Z%m&Z&m'Z'm(Z(  ej)        e*          Z+ e,            Z-	 	 d1d2dZ. G d dej/                  Z0 G d de          Z1 G d de1          Z2d Z3d3d"Z4 G d# d$e          Z5 G d% d&e5          Z6 G d' d(e          Z7 G d) d*e7          Z8 G d+ d,e          Z9 G d- d.e9          Z: G d/ d0          Z;dS )4    )annotationsN)_SelectorSocketTransport)Callable)islice)Any)parse_host_portunparse_host_port)BaseListenerCommCommClosedError	Connector)Backend)ensure_concrete_hostfrom_frames	to_frames
host_array)	ensure_ipensure_memoryviewget_ipget_ipv6      bufferslist[bytes]target_buffer_sizeintsmall_buffer_sizereturnc                .   t          |           dk    r| S g g ddfd}| D ]a}t          |          }||k    r+                    |           |z  |k    r
 |             B |                                 |           b |             S )a  Given a list of buffers, coalesce them into a new list of buffers that
    minimizes both copying and tiny writes.

    Parameters
    ----------
    buffers : list of bytes_like
    target_buffer_size : int, optional
        The target intermediate buffer size from concatenating small buffers
        together. Coalesced buffers will be no larger than approximately this size.
    small_buffer_size : int, optional
        Buffers <= this size are considered "small" and may be copied.
       r   r   Nonec                      rot                     dk    r                     d                    n(                    d                                                                            dd S d S )Nr!   r       )lenappendjoinclear)concatcsizeout_bufferss   <lib/python3.11/site-packages/distributed/comm/asyncio_tcp.pyflushzcoalesce_buffers.<locals>.flush9   sw     	6{{a""6!9----""388F#3#3444LLNNNEEE	 	r$   r   r"   )r%   r&   )	r   r   r   r-   bsizer)   r*   r+   s	         @@@r,   coalesce_buffersr1       s    $ 7||q!KFE         	" 	"1vv$$$MM!TME***EGGGq!!!!	EGGGr$   c                      e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<    ee          Z	 	 dFdG fd#ZedHd&            Z	edHd'            Z
edId(            ZdJd*ZdKd,ZdJd-ZdLd0ZdMd3ZdNd5ZdJd6ZdId7ZdId8ZdId9ZdId:ZdJd;ZdJd<ZdOdPd?ZdJd@ZdJdAZdQdCZdRdEZ xZS )SDaskCommProtocola:  Manages a state machine for parsing the message framing used by dask.

    Parameters
    ----------
    on_connection : callable, optional
        A callback to call on connection, used server side for handling
        incoming connections.
    min_read_size : int, optional
        The minimum buffer size to pass to ``socket.recv_into``. Larger sizes
        will result in fewer recv calls, at the cost of more copying. For
        request-response comms (where only one message may be in the queue at a
        time), a smaller value is likely more performant.
    )Callable[[DaskCommProtocol], None] | Noneon_connectionasyncio.AbstractEventLoop_loopzasyncio.Queue | None_queuez/asyncio.WriteTransport | _ZeroCopyWriter | None
_transportbool_pausedzasyncio.Future | None_drain_waiterzasyncio.Future_closed_waiter_using_default_bufferr   _default_len
memoryview_default_buffer_default_start_default_end
int | None_nframeszlist[int] | None_frame_lengthszlist[memoryview] | None_frames_frame_index_frame_nbytes_needed_frame_nbytes_remainingN   min_read_sizec                   t                                                       || _        t          j                    | _        t          j                    | _        d | _        d| _	        d | _
        | j                                        | _        d| _        t          |d          | _        t!          | j                  | _        d| _        d| _        d | _        d | _        d | _        d | _        d| _        d| _        d S )NFT   r   )super__init__r5   asyncioget_running_loopr7   Queuer8   r9   r;   r<   create_futurer=   r>   maxr?   r   rA   rB   rC   rE   rF   rG   rH   rI   rJ   )selfr5   rL   	__class__s      r,   rP   zDaskCommProtocol.__init__   s    
 	*-//
moo!"j6688 &*"r22)$*;<<
 " $%!'($$$r$   r   strc                    | j         rdS | j        J | j                            d          }|t          |d d          S dS )N<closed>sockname   	<unknown>	is_closedr9   get_extra_infor	   )rV   r[   s     r,   
local_addrzDaskCommProtocol.local_addr   R    > 	:***?11*==$hrrl33{r$   c                    | j         rdS | j        J | j                            d          }|t          |d d          S dS )NrZ   peernamer\   r]   r^   )rV   rd   s     r,   	peer_addrzDaskCommProtocol.peer_addr   rb   r$   c                    | j         d u S N)r9   rV   s    r,   r_   zDaskCommProtocol.is_closed   s    $&&r$   r"   c                f    | j         s)d | j        c| _        }|J |                                 d S d S rg   )r_   r9   abortrV   	transports     r,   _abortzDaskCommProtocol._abort   sH    ~ 	)-t&DOY(((OO	 	r$   	comm_reprc                    | j         sFt                              d| d           	 |                                  d S # t          $ r Y d S w xY wd S )NzClosing dangling comm ``)r_   loggerwarningrm   RuntimeError)rV   rn   s     r,   _close_from_finalizerz&DaskCommProtocol._close_from_finalizer   sl    ~ 	NNAYAAABBB   		 	s   = 
A
Ac                   K   | j         s'd | j        c| _        }|J |                                 | j         d {V  d S rg   )r_   r9   closer=   rk   s     r,   _closezDaskCommProtocol._close   s]      ~ 	)-t&DOY(((OO!!!!!!!!!!r$   rl   asyncio.BaseTransportc                   t          |          t          u rt          | |          | _        n#t	          |t
          j                  sJ || _        | j                            d           | j        |                     |            d S d S )Ni   )high)	typer   _ZeroCopyWriterr9   
isinstancerQ   WriteTransportset_write_buffer_limitsr5   rk   s     r,   connection_madez DaskCommProtocol.connection_made   s    
 	??666-dI>>DOOi)?@@@@@'DO 	//Z/@@@)t$$$$$ *)r$   sizehintobjectc                    | j         | j        | j        k     rd| _        | j        | j        d         S d| _        | j        J | j         | j                 }|| j         d         S )z-Get a buffer to read into for this read eventNTF)rG   rI   r?   r>   rA   rC   rH   )rV   r   frames      r,   
get_bufferzDaskCommProtocol.get_buffer   sx     <4#<t?P#P#P)-D&'(9(;(;<<).D&$000L!23E$335566r$   nbytesc                    |dk    rd S | j         r&| xj        |z  c_        |                                  d S | xj        |z  c_        |                                 s|                                  d S d S Nr   )r>   rC   _parse_default_bufferrI   _frames_check_remaining_message_completed)rV   r   s     r,   buffer_updatedzDaskCommProtocol.buffer_updated   s    Q;;F% 	*'&&(((((%%/%%//11 *'')))))* *r$   c                   	 | j         |                                 snZ| j         J | j        J t          | j                  | j         k     r|                                 sn|                                 snv|                                  dS )z)Parse all messages in the default buffer.TN)rE   _parse_nframesrF   r%   _parse_frame_lengths_parse_frames_reset_default_bufferrh   s    r,   r   z&DaskCommProtocol._parse_default_buffer   s    
	}$**,, =,,,&2224&''$-770022 %%'' 
	 	""$$$$$r$   c                    | j         | j        z
  dk    rHt          j        d| j        | j        dz             d         | _        | xj        dz  c_        g | _        dS dS )zlFill in `_nframes` from the default buffer. Returns True if
        successful, False if more data is neededrN   z<Q   offsetr   TF)rC   rB   structunpack_fromrA   rE   rF   rh   s    r,   r   zDaskCommProtocol._parse_nframes  su     t22b88".d*43F3J  DM 2%"$D4ur$   c                   | j         J | j        J | j         t          | j                  z
  }| j        | j        z
  dz  }t          ||          }| j                            t          j        d| d| j	        | j                             | xj        d|z  z  c_        ||k    r:d | j        D             | _
        d| _        | j        r| j        d         nd| _        dS d	S )
zrFill in `_frame_lengths` from the default buffer. Returns True if
        successful, False if more data is neededNr   <Qr   c                ,    g | ]}t          |          S  r   ).0ns     r,   
<listcomp>z9DaskCommProtocol._parse_frame_lengths.<locals>.<listcomp>.  s    GGGaJqMMGGGr$   r   TF)rE   rF   r%   rC   rB   minextendr   r   rA   rG   rH   rI   )rV   needed	availablen_reads       r,   r   z%DaskCommProtocol._parse_frame_lengths  s    }((("...T%8!9!99&)<<B	Y''""Ft3D<O  	
 	
 	

 	q6z)VGG43FGGGDL !D*.*=D#A&&1 % 4ur$   c                    | j         rdS | j        J | j        J | j        J 	 | xj        dz  c_        | j        | j        k     r!| j        | j                 | _         | j         rdS ndS D)NTr!   F)rI   rH   rE   rF   rh   s    r,   r   z(DaskCommProtocol._frames_check_remaining6  s    $ 	4 ,,,}((("...	" 4=00,0,?@Q,R),  4  u	r$   c                   	 | j         | j        z
  }|                                 s#|                                  t	          |          S |sdS | j        J | j        J | j        | j                 }t          | j        |          }| j	        | j        | j        |z            || j         || j        z
  pd<   | xj        |z  c_        | xj        |z  c_        )zkFill in `_frames` from the default buffer. Returns True if
        successful, False if more data is neededTFN)
rC   rB   r   r   r:   rG   rH   r   rI   rA   )rV   r   r   r   s       r,   r   zDaskCommProtocol._parse_framesH  s   	0)D,??I//11 ''')))I& u<+++$000L!23E2I>>F $T%84;NQW;W%WX **ft7P.P . 6)%%/%%'	0r$   c                    | j         }| j        }||k     r5|dk    r/| j        ||         | j        d||z
  <   d| _         ||z
  | _        dS ||k    rd| _         d| _        dS dS )z0Reset the default buffer for the next read eventr   N)rB   rC   rA   )rV   startends      r,   r   z&DaskCommProtocol._reset_default_buffer`  s    #3;;5A::262FuSy2QD 3;/"#D #eDc\\"#D !D \r$   c                    | j         J | j                             | j                   d| _        d| _        d| _        d| _        dS )zAPush a completed message to the queue and reset per-message stateNr   )r8   
put_nowaitrG   rE   rF   rJ   rh   s    r,   r   z#DaskCommProtocol._message_completedo  sL    {&&&t|,,,"'($$$r$   excBaseException | Nonec                >   d | _         | j                            d            | j        J | j                            t
                     | j        rH| j        }|Ad | _        |                                s(|	                    t          d                     d S d S d S d S )NConnection closed)r9   r=   
set_resultr8   r   _COMM_CLOSEDr;   r<   doneset_exceptionr   )rV   r   waiters      r,   connection_lostz DaskCommProtocol.connection_lostx  s    &&t,,, {&&&|,,, < 	O'F!%)"{{}} O((9L)M)MNNNNN	O 	O!!O Or$   c                    d| _         d S )NT)r;   rh   s    r,   pause_writingzDaskCommProtocol.pause_writing  s    r$   c                    d| _         | j        }|2d | _        |                                s|                    d            d S d S d S )NF)r;   r<   r   r   )rV   r   s     r,   resume_writingzDaskCommProtocol.resume_writing  s[    #!%D;;== (!!$''''' ( (r$   r   c                   K   | j         1| j                                          d{V }|t          ur|S d| _         t          d          )z$Read a single message from the comm.Nr   )r8   getr   r   )rV   outs     r,   readzDaskCommProtocol.read  sY       ;"))))))))C,&&
DK1222r$   framesc                X  K   | j         rt          d          | j        r(| j                                        x}| _        | d{V  d |D             }t          |          }d |D             }t          |          |dz   dz  z   }t          j	        |dz    d||g|R  }|d	k     rd

                    |g|          g}nt          |g|          }| j        J t          |          dk    r| j                            |           n | j                            |d                    |S )zWrite a message to the comm.r   Nc                Z    g | ](}t          |t                    rt          |          n|)S r   )r}   r@   r   r   fs     r,   r   z*DaskCommProtocol.write.<locals>.<listcomp>  sB     
 
 
IJJq*$=$=Da   1
 
 
r$   c                ,    g | ]}t          |          S r   )r%   r   s     r,   r   z*DaskCommProtocol.write.<locals>.<listcomp>  s    000AQ000r$   r!   r   r\   r   i   r$   r   )r_   r   r;   r7   rT   r<   r%   sumr   packr'   r1   r9   
writelineswrite)rV   r   drain_waiternframesframes_nbytes
msg_nbytesheaderr   s           r,   r   zDaskCommProtocol.write  su     > 	!"5666\ 	04
0H0H0J0JJL4-
 
NT
 
 
 f++00000
 ''7Q;!*;;
!...
GTmTTT  xx 1& 1223GG&'8'899G***w<<!O&&w////O!!'!*---r$   )NrK   )r5   r4   rL   r   r   rX   r   r:   r.   )rn   rX   r   r"   )rl   rx   r   r"   )r   r   r   r@   )r   r   r   r"   rg   )r   r   r   r"   )r   r   )r   r   r   r   )__name__
__module____qualname____doc____annotations__tuple	__slots__rP   propertyra   re   r_   rm   rt   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   __classcell__rW   s   @r,   r3   r3   R   s          =<<<$$$$    ????MMM((((""""$$$$$$$$    o&&I DH'$) $) $) $) $) $) $)L    X    X ' ' ' X'      " " " "% % % %"7 7 7 7
* 
* 
* 
*% % % %       0   $0 0 0 00" " " ") ) ) )O O O O O    ( ( ( (	3 	3 	3 	3# # # # # # # #r$   r3   c                       e Zd Zej                            ej                            d                    Z	 dd fd
Z	ddZ
edd            Zedd            ZddZddZddZddZd dZed             Z xZS )!TCPzdistributed.comm.shardTprotocolr3   ra   rX   re   deserializer:   c                2   || _         || _        || _        d| _        t	                                          |           t          j        | | j         j        t          |                     | _
        d| j
        _        |                                 | _        d S )NFr   )	_protocol_local_addr
_peer_addr_closedrO   rP   weakreffinalizert   repr
_finalizeratexit_get_extra_info_extra_info)rV   r   ra   re   r   rW   s        r,   rP   zTCP.__init__  s     "%#[111 "*$.6T


 
 "'  //11r$   r   dict[str, Any]c                    i S rg   r   rh   s    r,   r   zTCP._get_extra_info  s    	r$   c                    | j         S rg   )r   rh   s    r,   local_addresszTCP.local_address      r$   c                    | j         S rg   )r   rh   s    r,   peer_addresszTCP.peer_address  s
    r$   Nc                   K   | j                                          d {V }	 t          || j        || j                   d {V S # t
          $ r$ |                                  t          d          w xY w)N)r   deserializersallow_offloadz aborted stream on truncated data)r   r   r   r   r   EOFErrorrj   r   )rV   r   r   s      r,   r   zTCP.read  s      ~**,,,,,,,,
	F$ ,+"0	           	F 	F 	FJJLLL!"DEEE	Fs   "A .A4messagec           	        K   t          || j        ||| j        | j        d| j        | j                   d {V }| j                            |           d {V }|S )N)sender	recipient)r   serializerson_errorcontextframe_split_size)r   r   
local_inforemote_infohandshake_optionsmax_shard_sizer   r   )rV   msgr   r   r   r   s         r,   r   z	TCP.write  s       ,#/!-  (
 "0
 
 
 
 
 
 
 
 
 ~++F33333333r$   r"   c                z   K   | j                                          d{V  | j                                         dS )zFlush and close the commN)r   rw   r   detachrh   s    r,   rv   z	TCP.close  sF      n##%%%%%%%%%     r$   c                j    | j                                          | j                                         dS )zHard close the commN)r   rm   r   r  rh   s    r,   rj   z	TCP.abort  s0         r$   c                    | j         j        S rg   )r   r_   rh   s    r,   closedz
TCP.closed  s    ~''r$   c                    | j         S rg   )r   rh   s    r,   
extra_infozTCP.extra_info  r   r$   T)r   r3   ra   rX   re   rX   r   r:   r   r   r   rg   )Nr   r.   r   )r   r   r   daskutilsparse_bytesconfigr   r   rP   r   r   r   r   r   r   rv   rj   r  r  r   r   s   @r,   r   r     sB       Z++DKOO<T,U,UVVN !2 2 2 2 2 2 2,          X     XF F F F    ! ! ! !
! ! ! !
( ( ( (     X         r$   r   c                      e Zd ZddZdS )TLSr   r   c                n    | j         j        J | j         j        j        } |d           |d          dS )Npeercertcipher)r  r  )r   r9   r`   )rV   r   s     r,   r   zTLS._get_extra_info  s>    ~(444n'6C
OOss8}}EEEr$   Nr
  )r   r   r   r   r   r$   r,   r  r    s.        F F F F F Fr$   r  c                    |                      d          }t          |t          j                  st	          d|          |S )Nssl_contextzpTLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got )r   r}   ssl
SSLContext	TypeError)connection_argsctxs     r,   _expect_tls_contextr     sT    


m
,
,Cc3>** 
$$ $
 
 	

 Jr$   addressrX   kwargsr   r"   c                Z    |                     d          rt          dd| z             d S )Nrequire_encryptionzJencryption required by Dask configuration, refusing communication from/to tcp://)r   rs   )r  r  s     r,   _error_if_require_encryptionr!  +  sF    zz&'' 
l3;g3E3EH
 
 	

 
r$   c                  &    e Zd ZdZeZddZdd
ZdS )TCPConnectorr   Tc                  K   t          j                    }t          |          \  }} | j        |fi |} |j        t
          ||fi | d {V \  }}| j        |j        z   }	| j        |z   }
|                     ||	|
|          S )Nr   )	rQ   rR   r   _get_extra_kwargscreate_connectionr3   prefixra   
comm_class)rV   r  r   r  loopipportrl   r   ra   re   s              r,   connectzTCPConnector.connect7  s      '))"7++D''::6::$:D$:b$%
 %
*0%
 %
 
 
 
 
 
 
	8 [8#66
K')	xYKXXXr$   r  rX   r  r   r   r   c                     t          |fi | i S rg   r!  rV   r  r  s      r,   r%  zTCPConnector._get_extra_kwargsC      $W77777	r$   Nr	  r  rX   r  r   r   r   )r   r   r   r'  r   r(  r,  r%  r   r$   r,   r#  r#  3  sJ        FJ
Y 
Y 
Y 
Y     r$   r#  c                      e Zd ZdZeZd
dZd	S )TLSConnectortls://r  rX   r  r   r   r   c                (    t          |          }d|iS Nr  r  rV   r  r  r  s       r,   r%  zTLSConnector._get_extra_kwargsL      !&))s|r$   Nr1  r   r   r   r'  r  r(  r%  r   r$   r,   r3  r3  H  2        FJ     r$   r3  c                       e Zd ZdZeZ	 	 	 	 d fd	ZddZd Zd Z	d Z
ddZddZd Zedd            Zedd            Z xZS )TCPListenerr   TNr   c                    t                                                       t          ||          \  | _        | _        || _        || _        || _        || _         | j	        |fi || _
        d | _        d S rg   )rO   rP   r   r*  r+  default_hostcomm_handlerr   r   r%  _extra_kwargsbound_address)	rV   r  r@  r   r   r?  default_portr  rW   s	           r,   rP   zTCPListener.__init__U  s~     	,WlCC((&*3T3GFFvFF!r$   r  rX   r  r   r   r   c                     t          |fi | i S rg   r.  r/  s      r,   r%  zTCPListener._get_extra_kwargsh  r0  r$   c                    |                      || j        |j        z   | j        |j        z   | j                  }| j        |_        t          j        |                     |                     d S )N)ra   re   r   )	r(  r'  ra   re   r   r   rQ   ensure_future_comm_handler)rV   r   comms      r,   _on_connectionzTCPListener._on_connectionl  so    {X%88kH$66(	  
 
 "/d006677777r$   c                   K   	 |                      |           d {V  n+# t          $ r t                              d           Y d S w xY w|                     |           d {V  d S )Nz,Connection closed before handshake completed)r5   r   rq   debugr@  )rV   rH  s     r,   rG  zTCPListener._comm_handlerv  s      	$$T********** 	 	 	LLGHHHFF	 %%%%%%%%%%%s     $AAc           
        K   t          j                    }|                    ddt          j        t          j        t          j        d           d{V }t          |d           }g }d}	 |D ]Q}|\  }}}}	}
	 t          j        |||          }n# t          $ r Y /w xY w|t          t          dd          k    r@t          t          d          r+|                    t          j        t          j        d           ||
d         |g|
d	d         R }
	 |                    |
           nE# t          $ r8}t          |j        d
|
d|j                                                  dd}~ww xY w||                                d         } |j         fdfd|i j         d{V }|                    |           d}Sn># t,          $ r1 |D ]}|                                 ||                                  w xY w|S )a8  Due to a design decision in asyncio, listening on `("", 0)` will
        result in two different random ports being used (one for IPV4, one for
        IPV6), rather than both interfaces sharing the same random port. We
        work around this here. See https://bugs.python.org/issue45693 for more
        info.Nr   )familyr{   flagsprotoc                    | d         j         S r   )name)xs    r,   <lambda>zDTCPListener._start_all_interfaces_with_random_port.<locals>.<lambda>  s    AaDI r$   )keyAF_INET6IPPROTO_IPV6Tr\   z*error while attempting to bind on address z: r!   c                 ,    t           j                  S rg   r3   rI  rh   s   r,   rS  zDTCPListener._start_all_interfaces_with_random_port.<locals>.<lambda>      ,T-@AA r$   sock)rQ   rR   getaddrinfosocket	AF_UNSPECSOCK_STREAM
AI_PASSIVEsortedOSErrorgetattrhasattr
setsockoptrV  IPV6_V6ONLYbinderrnostrerrorlowergetsocknamecreate_serverrA  r&   BaseExceptionrv   )rV   r)  infosserversr+  resafsocktyperO  	canonnamesarZ  errservers   `             r,   &_start_all_interfaces_with_random_portz2TCPListener._start_all_interfaces_with_random_port~  s      '))&&### ' 
 
 
 
 
 
 
 
 u"5"5666 1	 ' '582HeY!=Xu==DD   H T::::wN@ @: OOF$79KTRRR #Q%/122//B IIbMMMM      !		79rr3<;M;M;O;O;OQ   	   <++--a0D  2t1AAAA     (         
 v&&&O'P  	 	 	!   

	 sV   -G ;BG 
BG BA4G D*)G *
E,43E''E,,AG ;G>r"   c                    K   t          j                    } j        s" j        s                                  d {V }n* |j         fdf j         j        d j         d {V g}| _        d S )Nc                 ,    t           j                  S rg   rX  rh   s   r,   rS  z#TCPListener.start.<locals>.<lambda>  rY  r$   )hostr+  )rQ   rR   r*  r+  rv  rk  rA  _servers)rV   r)  rn  s   `  r,   r   zTCPListener.start  s      '))w 
	ty 
	 GGIIIIIIIIGG )d(AAAA  (	       G  r$   c                B    | j         D ]}|                                 d S rg   )rz  rv   )rV   ru  s     r,   stopzTCPListener.stop  s,    m 	 	FLLNNNN	 	r$   c                    | j         :d } || j        d                   }|                                dd         | _         | j         S )z@
        The listening address as a (host, port) tuple.
        Nc                    t           j        t           j        fD ]}| j        D ]}|j        |k    r|c c S t          d          )NzNo active INET socket found?)r\  AF_INETrU  socketsrM  rs   )ru  rM  rZ  s      r,   
get_socketz-TCPListener.get_host_port.<locals>.get_socket  s`    %~v? ( (F & ( (;&00#'KKKKK 1( ##ABBBr$   r   r\   )rB  rz  rj  )rV   r  rZ  s      r,   get_host_portzTCPListener.get_host_port  s]     %C C C :dmA.//D!%!1!1!3!3BQB!7D!!r$   c                H    | j         t          |                                  z   S )z4
        The listening address as a string.
        )r'  r	   r  rh   s    r,   listen_addresszTCPListener.listen_address  s#    
 {.0B0B0D0DEEEr$   c                    |                                  \  }}t          || j                  }| j        t	          ||          z   S )z2
        The contact address as a string.
        )r?  )r  r   r?  r'  r	   )rV   ry  r+  s      r,   contact_addresszTCPListener.contact_address  sF    
 ''))
d#Dt7HIII{.tT::::r$   )TTNr   r1  r.   r   )r   r   r   r'  r   r(  rP   r%  rI  rG  rv  r   r|  r  r   r  r  r   r   s   @r,   r=  r=  Q  s       FJ " " " " " "&   8 8 8& & &J J JX          
" " "" F F F XF ; ; ; X; ; ; ; ;r$   r=  c                      e Zd ZdZeZd
dZd	S )TLSListenerr4  r  rX   r  r   r   r   c                (    t          |          }d|iS r6  r7  r8  s       r,   r%  zTLSListener._get_extra_kwargs  r9  r$   Nr1  r:  r   r$   r,   r  r     r;  r$   r  c                  <    e Zd ZeZeZd
dZd Zd Z	d Z
d Zd Zd	S )
TCPBackendr   r   c                *    |                                  S rg   )_connector_classrh   s    r,   get_connectorzTCPBackend.get_connector  s    $$&&&r$   c                "     | j         |||fi |S rg   )_listener_class)rV   lochandle_commr   r  s        r,   get_listenerzTCPBackend.get_listener  s!    #t#CkUU_UUUr$   c                ,    t          |          d         S r   r   rV   r  s     r,   get_address_hostzTCPBackend.get_address_host  s    s##A&&r$   c                     t          |          S rg   r  r  s     r,   get_address_host_portz TCPBackend.get_address_host_port  s    s###r$   c                `    t          |          \  }}t          t          |          |          S rg   )r   r	   r   )rV   r  ry  r+  s       r,   resolve_addresszTCPBackend.resolve_address  s)    $S))
d 4$777r$   c                    t          |          \  }}t          |          }d|v rt          |          }nt          |          }t	          |d           S )N:)r   r   r   r   r	   )rV   r  ry  r+  
local_hosts        r,   get_local_address_forz TCPBackend.get_local_address_for  sO    $S))
d$;;!$JJJ T222r$   N)r   r   )r   r   r   r#  r  r=  r  r  r  r  r  r  r  r   r$   r,   r  r  	  s        #!O' ' ' 'V V V' ' '$ $ $8 8 83 3 3 3 3r$   r  c                      e Zd ZeZeZdS )
TLSBackendN)r   r   r   r3  r  r  r  r   r$   r,   r  r  '  s        #!OOOr$   r  c                  N   e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<    eej        d          r-ej        dk    rdZ	n!	  e
j        d          Z	n# e$ r dZ	Y nw xY wdZ	d1dZ	 	 d2d3dZd4dZd4dZd4dZd5d Zd6d"Zd7d$Zd5d%Zd8d(Zd4d)Zd4d*Zd9d.Zd4d/Zd4d0ZdS ):r|   zThe builtin socket transport in asyncio makes a bunch of copies, which
    can make sending large amounts of data much slower. This hacks around that.

    Note that this workaround isn't used with the windows ProactorEventLoop or
    uvloop.r3   r   r   rl   r6   r7   zcollections.deque[memoryview]_buffersr   _sizer:   _protocol_pausedsendmsgwin32rN   
SC_IOV_MAXr!   c                ,   || _         || _        t          j                    | _        dD ]}t          ||          sJ dD ]}t          | j        |          sJ t          j                    | _        d| _	        d| _
        |                                  d S )N)_sock_sock_fd_fatal_error_eof_closing
_conn_lost_call_connection_lost)_add_writer_remove_writerr   F)r   rl   rQ   rR   r7   rc  collectionsdequer  r  r  r   )rV   r   rl   attrs       r,   rP   z_ZeroCopyWriter.__init__P  s     "-//

 		, 		,D 9d++++++ 6 	- 	-D4:t,,,,,, $)++
 %$$&&&&&r$   Nrz   rD   lowr   r"   c                p    |
|d}nd|z  }||dz  }|| _         || _        |                                  dS )zSet the write buffer limitsNr      )_high_water
_low_water_maybe_pause_protocol)rV   rz   r  s      r,   r   z'_ZeroCopyWriter.set_write_buffer_limitso  sS     <{ 3w;!)C""$$$$$r$   c                |    | j         s2| j        | j        k    r$d| _         | j                                         dS dS dS )z;If the high water mark has been reached, pause the protocolTN)r  r  r  r   r   rh   s    r,   r  z%_ZeroCopyWriter._maybe_pause_protocol  sN    $ 	*d6F)F)F$(D!M'')))))	* 	*)F)Fr$   c                |    | j         r2| j        | j        k    r$d| _         | j                                         dS dS dS )z<If the low water mark has been reached, unpause the protocolFN)r  r  r  r   r   rh   s    r,   _maybe_resume_protocolz&_ZeroCopyWriter._maybe_resume_protocol  sM      	+TZ4?%B%B$)D!M((*****	+ 	+%B%Br$   c                F    | j                                          d| _        dS )zClear the send bufferr   N)r  r(   r  rh   s    r,   _buffer_clearz_ZeroCopyWriter._buffer_clear  s!    


r$   databytesc                    t          |          }| xj        t          |          z  c_        | j                            |           dS )z"Append new data to the send bufferN)r   r  r%   r  r&   )rV   r  mvs      r,   _buffer_appendz_ZeroCopyWriter._buffer_append  sA    t$$

c"gg

R     r$   list[memoryview]c                P    t          t          | j        | j                            S )z.Get one or more buffers to write to the socket)listr   r  SENDMSG_MAX_COUNTrh   s    r,   _buffer_peekz_ZeroCopyWriter._buffer_peek  s    F4=$*@AABBBr$   r0   c                    | xj         |z  c_         | j        }|rJ|d         }t          |          }||k    r|                                 ||z  }n||d         |d<   dS |HdS dS )z*Advance the buffer index forward by `size`r   N)r  r  r%   popleft)rV   r0   r   r/   b_lens        r,   _buffer_advancez_ZeroCopyWriter._buffer_advance  s    

d

- 	
AFFE}}!!!tuuX
  	 	 	 	 	r$   c                   | j         }|j        rt          d          |sd S |j        rd S | j        s	 |j                            |          }||d          }|sd S nQ# t          t          f$ r Y n>t          t          f$ r  t          $ r!}|                    |d           Y d }~d S d }~ww xY w| j                            |j        | j                   |                     |           |                                  d S )Nz%Cannot call write() after write_eof()%Fatal write error on socket transport)rl   r  rs   r  r  r  sendBlockingIOErrorInterruptedError
SystemExitKeyboardInterruptrl  r  r7   r  r  _on_write_readyr  r  )rV   r  rl   r   r   s        r,   r   z_ZeroCopyWriter.write  sQ   N	> 	HFGGG 	F 	F} 	MO((.. ABBx F $%56    12       &&s,STTT J""9#5t7KLLL 	D!!!""$$$$$s   A B*/B*	B%%B*r   r   c                   t          | j                  }|D ]}|                     |           |s	 |                                  nV# t          t
          f$ r Y nCt          t          f$ r  t          $ r&}| j	        
                    |d           Y d }~d S d }~ww xY w| j        sd S | j                            | j	        j        | j                   |                                  d S Nr  )r:   r  r  _do_bulk_writer  r  r  r  rl  rl   r  r7   r  r  r  r  )rV   r   waitingr/   r   s        r,   r   z_ZeroCopyWriter.writelines  s+   t}%% 	# 	#A"""" 	R
##%%%%#%56    12       ++@   	
 =  J""4>#:D<PQQQ""$$$$$s   A BB4BBc                \    |                                   | j                                        S rg   )r  rl   rv   rh   s    r,   rv   z_ZeroCopyWriter.close  (    ~##%%%r$   c                \    |                                   | j                                        S rg   )r  rl   rj   rh   s    r,   rj   z_ZeroCopyWriter.abort  r  r$   rT  rX   r   c                6    | j                             |          S rg   )rl   r`   )rV   rT  s     r,   r`   z_ZeroCopyWriter.get_extra_info  s    ~,,S111r$   c                   |                                  }t          |          dk    r&| j        j                            |d                   }n| j        j                            |          }|                     |           d S )Nr!   r   )r  r%   rl   r  r  r  r  )rV   r   r   s      r,   r  z_ZeroCopyWriter._do_bulk_write  st    ##%%w<<1$))'!*55AA$,,W55AQr$   c                r   | j         }|j        rd S 	 |                                  |                                  | j        sj| j                            |j                   |j        r|	                    d            d S |j
        r(|j                            t          j                   d S d S d S # t          t           f$ r Y d S t"          t$          f$ r  t&          $ rY}| j                            |j                   | j                                         |                    |d           Y d }~d S d }~ww xY wr  )rl   r  r  r  r  r7   r  r  r  r  r  r  shutdownr\  SHUT_WRr  r  r  r  rl  r(   r  )rV   rl   r   s      r,   r  z_ZeroCopyWriter._on_write_ready  s}   N	 	F	=!!! '')))= =
)))*<===% =33D99999^ =O,,V^<<<<<= =
= =!  !12 	 	 	DD-. 	 	 	 	Q 	Q 	QJ%%i&8999M!!!""3(OPPPPPPPPP		Qs   B/ /D6D6AD11D6)r   r3   rl   r   )NN)rz   rD   r  rD   r   r"   r.   )r  r  r   r"   )r   r  )r0   r   r   r"   )r   r   r   r"   )rT  rX   r   r   )r   r   r   r   r   rc  r\  sysplatformr  ossysconf	ExceptionrP   r   r  r  r  r  r  r  r   r   rv   rj   r`   r  r  r   r$   r,   r|   r|   .  s          ''''$$$$++++JJJ wv}i(( 
<7"" "'$.BJ|$<$<!! ' ' '$&!!!' ' ' ' 'B  % % % % %$* * * *+ + + +   
! ! ! !C C C C   % % % %B% % % %2& & & && & & &2 2 2 2       = = = = = =s   A A! A!r|   )r   r   )r   r   r   r   r   r   r   r   )r  rX   r  r   r   r"   )<
__future__r   rQ   r  loggingr  r\  r  r   r  r   asyncio.selector_eventsr   collections.abcr   	itertoolsr   typingr   r  distributed.comm.addressingr   r	   distributed.comm.corer
   r   r   r   distributed.comm.registryr   distributed.comm.utilsr   r   r   distributed.protocol.utilsr   distributed.utilsr   r   r   r   	getLoggerr   rq   r   r   r1   BufferedProtocolr3   r   r  r  r!  r#  r3  r=  r  r  r  r|   r   r$   r,   <module>r     s   " " " " " "       				  



  



  < < < < < < $ $ $ $ $ $              J J J J J J J J P P P P P P P P P P P P - - - - - - O O O O O O O O O O 1 1 1 1 1 1 L L L L L L L L L L L L		8	$	$ vxx
 (!/ / / / /dp p p p pw/ p p pfQ  Q  Q  Q  Q $ Q  Q  Q hF F F F F# F F F  
 
 
 
    9   *    <   l; l; l; l; l;, l; l; l;^    +   3 3 3 3 3 3 3 3<" " " " " " " "b= b= b= b= b= b= b= b= b= b=r$   