
    0Fie1                       d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dlmZmZ d dlZd dlmZ d dlmZ d dlmZmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZ  ej        e           Z! G d de"          Z# G d de#          Z$ G d de	          Z% G d de	          Z& G d de&          Z' G d de	          Z(	 ddZ)ddZ*dS )    )annotationsN)ABCabstractmethod)AnyClassVar)parse_timedelta)registry)get_address_hostparse_addressresolve_address)time)get_compression_settings)HIGHEST_PROTOCOL)wait_forc                      e Zd ZdS )CommClosedErrorN__name__
__module____qualname__     5lib/python3.11/site-packages/distributed/comm/core.pyr   r              Dr   r   c                      e Zd ZdS )FatalCommClosedErrorNr   r   r   r   r   r      r   r   r   c                     e Zd ZU dZ ej                    Zded<   ded<   ded<   ded<   ded	<   d
ed<   d d!dZe	d"d            Z
e	d#d            Ze	d             Ze	d             Ze	d             Zee	d$d                        Zee	d$d                        Zed%d            Zed             Zd&dZed'd            Zd ZdS )(Comma  
    A message-oriented communication object, representing an established
    communication channel.  There should be only one reader and one
    writer at a time: to manage current communications, even with a
    single peer, you must create distinct ``Comm`` objects.

    Messages are arbitrary Python objects.  Concrete implementations
    of this class can implement different serialization mechanisms
    depending on the underlying transport's characteristics.
    zClassVar[weakref.WeakSet[Comm]]
_instancesz
str | Nonenamedict
local_inforemote_infohandshake_optionsbooldeserializeTc                    | j                             |            d| _        d | _        i | _        i | _        i | _        || _        d S )NT)r   addallow_offloadr    r"   r#   r$   r&   )selfr&   s     r   __init__zComm.__init__4   sL    D!!!!	!#&r   Nc                
   K   dS )aW  
        Read and return a message (a Python object).

        This method returns a coroutine.

        Parameters
        ----------
        deserializers : dict[str, tuple[Callable, Callable, bool]] | None
            An optional dict appropriate for distributed.protocol.deserialize.
            See :ref:`serialization` for more.
        Nr   )r*   deserializerss     r   readz	Comm.read?   
        r   c                
   K   dS )a(  
        Write a message (a Python object).

        This method returns a coroutine.

        Parameters
        ----------
        msg
        on_error : str | None
            The behavior when serialization fails. See
            ``distributed.protocol.core.dumps`` for valid values.
        Nr   )r*   msgserializerson_errors       r   writez
Comm.writeM   r/   r   c                
   K   dS )z
        Close the communication cleanly.  This will attempt to flush
        outgoing buffers before actually closing the underlying transport.

        This method returns a coroutine.
        Nr   r*   s    r   closez
Comm.close\   r/   r   c                    dS )z
        Close the communication immediately and abruptly.
        Useful in destructors or generators' ``finally`` blocks.
        Nr   r6   s    r   abortz
Comm.aborte         r   c                    dS )z$Return whether the stream is closed.Nr   r6   s    r   closedzComm.closedl   r:   r   returnstrc                    dS )zThe local addressNr   r6   s    r   local_addresszComm.local_addressp   r:   r   c                    dS )zThe peer's addressNr   r6   s    r   peer_addresszComm.peer_addressu   r:   r   c                    t          t          | j                            }t          t          | j                            }||k    S )z8Return True if the peer is on localhost; False otherwise)r
   r   r@   rB   )r*   local_ipaddrpeer_ipaddrs      r   	same_hostzComm.same_hostz   sA     (8J(K(KLL&t7H'I'IJJ
 {**r   c                    i S )z
        Return backend-specific information about the communication,
        as a dict.  Typically, this is information which is initialized
        when the communication is established and doesn't vary afterwards.
        r   r6   s    r   
extra_infozComm.extra_info   s	     	r   dict[str, Any]c                    | j         rd}nt          d          }|t          t          j                  dd         t
          dS )a  Share environment information with the peer that may differ, i.e. compression
        settings.

        Notes
        -----
        By the time this method runs, the "auto" compression setting has been updated to
        an actual compression algorithm. This matters if both peers had compression set
        to 'auto' but only one has lz4 installed. See
        distributed.protocol.compression._update_and_check_compression_settings()

        See also
        --------
        handshake_configuration
        Nzdistributed.comm.compression   )compressionpythonpickle-protocol)rF   r   tuplesysversion_infor   )r*   rL   s     r   handshake_infozComm.handshake_info   sR     > 	SKK23QRRK 'C,--bqb1/
 
 	
r   localremotec                    	 dt          | d         |d                   i}n"# t          $ r}t          d          |d}~ww xY w| d         |d         k    r| d         |d<   nd|d<   |S )a7  Find a configuration that is suitable for both local and remote

        Parameters
        ----------
        local
            Output of handshake_info() in this process
        remote
            Output of handshake_info() on the remote host

        See also
        --------
        handshake_info
        rN   zYour Dask versions may not be in sync. Please ensure that you have the same version of dask and distributed on your client, scheduler, and worker machinesNrL   )minKeyError
ValueError)rS   rT   outes       r   handshake_configurationzComm.handshake_configuration   s    "	!3+,f5F.G$ $CC
  	 	 	Q  		 6-#888!&}!5C!%C
s   ! 
A ;A c                    d                     | j        j        |                                 rdnd| j        pd| j        | j                  S )Nz<{}{} {} local={} remote={}>z	 (closed) )format	__class__r   r<   r    r@   rB   r6   s    r   __repr__zComm.__repr__   sK    -44N#;;==0KKbIO
 
 	
r   T)r&   r%   N)NN)r=   r>   )r=   r%   )r=   rI   )rS   rI   rT   rI   r=   rI   )r   r   r   __doc__weakrefWeakSetr   __annotations__r+   r   r.   r4   r7   r9   r<   propertyr@   rB   rF   rH   rR   staticmethodr[   r`   r   r   r   r   r   !   s        	 	 3B'/2C2CJCCCC' ' ' ' '    ^    ^   ^   ^ 3 3 ^3       ^ X  ! ! ! ^ X! + + + X+   X
 
 
 
4 " " " \"H
 
 
 
 
r   r   c                      e Zd Zed             Zed             Zeed                         Zeed                         Zd Z	d Z
d Z	 dddZdS )Listenerc                
   K   dS )z;
        Start listening for incoming connections.
        Nr   r6   s    r   startzListener.start   r/   r   c                    dS )z
        Stop listening.  This does not shutdown already established
        communications, but prevents accepting new ones.
        Nr   r6   s    r   stopzListener.stop   r:   r   c                    dS )z8
        The listening address as a URI string.
        Nr   r6   s    r   listen_addresszListener.listen_address   r:   r   c                    dS )z
        An address this listener can be contacted on.  This can be
        different from `listen_address` if the latter is some wildcard
        address such as 'tcp://0.0.0.0:123'.
        Nr   r6   s    r   contact_addresszListener.contact_address   r:   r   c                >   K   |                                   d {V  | S rb   rl   r6   s    r   
__aenter__zListener.__aenter__   s+      jjllr   c                n   K   |                                  }t          j        |          r
| d {V  d S d S rb   )rn   inspectisawaitable)r*   exc_type	exc_value	tracebackfutures        r   	__aexit__zListener.__aexit__   sE      v&& 	LLLLLLLLL	 	r   c                F      fd} |                                             S )Nc                 @   K                                      d {V   S rb   rt   r6   s   r   _zListener.__await__.<locals>._   s,      **,,Kr   )	__await__)r*   r   s   ` r   r   zListener.__await__   s2    	 	 	 	 	 qss}}r   Ncommr   handshake_overridesdict[str, Any] | Noner=   Nonec                J  K   i |                                 |pi }|                    |           d {V  |                                 d {V }||_        |j        |j        d<   ||_        |j        |j        d<   |                    |j        |j                  |_        d S )Naddress)	rR   r4   r.   r#   rB   r"   r@   r[   r$   )r*   r   r   r"   	handshakes        r   on_connectionzListener.on_connection  s       N++--M2E2KM
jj$$$$$$$$$))++%%%%%%	$&*&7#$%)%7	"!%!=!=OT-"
 "
r   rb   r   r   r   r   r=   r   )r   r   r   r   rl   rn   rg   rp   rr   ru   r}   r   r   r   r   r   rj   rj      s          ^
   ^   ^ X
   ^ X    
   HL
 
 
 
 
 
 
r   rj   c                  4     e Zd ZddZ	 dd fd	Zdd
Z xZS )BaseListenerr=   r   c                ,    t                      | _        d S rb   )set_BaseListener__commsr6   s    r   r+   zBaseListener.__init__  s    "%%%r   Nr   r   r   r   c                  K   | j                             |           	 t                                          ||           d {V 	 | j                             |           S # | j                             |           w xY wrb   )r   r(   superr   discard)r*   r   r   r_   s      r   r   zBaseListener.on_connection  s       		'..t5HIIIIIIIIIL  &&&&DL  &&&&s   'A" "A>c                n    | j         t                      c}| _         |D ]}|                                 d S rb   )r   r   r9   )r*   commsr   s      r   abort_handshaking_commsz$BaseListener.abort_handshaking_comms$  s>    "lCEEt| 	 	DJJLLLL	 	r   )r=   r   rb   r   )r   r   r   r+   r   r   __classcell__)r_   s   @r   r   r     sq        ( ( ( ( HL' ' ' ' ' ' '       r   r   c                  &    e Zd Zedd            ZdS )	ConnectorTc                
   K   dS )a  
        Connect to the given address and return a Comm object.
        This function returns a coroutine. It may raise EnvironmentError
        if the other endpoint is unreachable or unavailable.  It
        may raise ValueError if the address is malformed.
        Nr   )r*   r   r&   s      r   connectzConnector.connect+  r/   r   Nra   )r   r   r   r   r   r   r   r   r   r   *  s2           ^  r   r   Tc                  K   t           j                            d          t          d          t	          |           \  }}t          j        |          }|                                }d}	t                      fd}
d}d}t          
                    d|           d	z  }d} |
            dk    r	 t           |j        |fd
|i|t          | |
                                 d{V }	n# t          $ r  t          j        t"          f$ r|}|}}t           |
            |d|z  z            }t%          j        d|          }|dz  }t          
                    d||           t          j        |           d{V  Y d}~nd}~ww xY w |
            dk    t#          d|  d d          |i |	                                |pi }|	                    |           d{V  |	                                 d{V }||	_        |	j        |	j        d<   ||	_        |	j        |	j        d<   |	                    |	j        |	j                  |	_        t          
                    d|           |	S )z
    Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
    and yield a ``Comm`` object.  If the connection attempt fails, it is
    retried until the *timeout* is expired.
    Nz!distributed.comm.timeouts.connectseconds)defaultc                 L    z   } t          d| t                      z
            S )Nr   )maxr   )deadlinerl   timeouts    r   	time_leftzconnect.<locals>.time_leftH  s%    7?1h'(((r   g{Gz?r   zEstablishing connection to %s   r&   )r         z7Could not connect to %s, waiting for %s before retryingzTimed out trying to connect to z after z sr   zConnection to %s established)daskconfiggetr   r   r	   get_backendget_connectorr   loggerdebugr   r   rV   r   asyncioTimeoutErrorOSErrorrandomuniformsleeprR   r4   r.   r#   
_peer_addrr"   _local_addrr[   r$   )addrr   r&   r   connection_argsschemelocbackend	connectorr   r   backoff_baseattemptintermediate_capactive_exceptionexc	upper_capbackoffr"   r   rl   s    `                  @r   r   r   5  s-      +//"EFFgy999G%%KFC"6**G%%''IDFFE) ) ) ) ) ) LG
LL0#666 {
)++//	)!!	!#RR;R/RR,iikk::        D # 	 	 	$g. 	) 	) 	)"  ' IIKKG)DEEInQ	22GqLGLLI3PW   -((((((((((((((!	) )++//8 FdFF7FFF
 
	 




$"J **Z
 
        iikk!!!!!!I D"&/DY DO!%!1DOI!99) D LL/555Ks   <;C8 8 FA2FFc                    	 t          | d          \  }}nD# t          $ r7 |                    d          rd| z   } nd| z   } t          | d          \  }}Y nw xY wt          j        |          } |j        |||fi |S )aJ  
    Create a listener object with the given parameters.  When its ``start()``
    method is called, the listener will listen on the given address
    (a URI such as ``tcp://0.0.0.0``) and call *handle_comm* with a
    ``Comm`` object for each incoming connection.

    *handle_comm* can be a regular function or a coroutine.
    T)strictssl_contextztls://ztcp://)r   rX   r   r	   r   get_listener)r   handle_commr&   kwargsr   r   r   s          r   listenr     s    7#D666 7 7 7::m$$ 	#d?DDd?D#D6667 "6**G7[+HHHHHs    >AA)NTNra   )+
__future__r   r   rw   loggingr   rP   rd   abcr   r   typingr   r   r   
dask.utilsr   distributed.commr	   distributed.comm.addressingr
   r   r   distributed.metricsr    distributed.protocol.compressionr   distributed.protocol.pickler   distributed.utilsr   	getLoggerr   r   IOErrorr   r   r   rj   r   r   r   r   r   r   r   <module>r      sk   " " " " " "     



  # # # # # # # #                  & & & & & & % % % % % % X X X X X X X X X X $ $ $ $ $ $ E E E E E E 8 8 8 8 8 8 & & & & & &		8	$	$	 	 	 	 	g 	 	 		 	 	 	 	? 	 	 	s
 s
 s
 s
 s
3 s
 s
 s
l=
 =
 =
 =
 =
s =
 =
 =
@    8   &        ?CO O O OdI I I I I Ir   