
    0Fie                         d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZ  ej        e          Z G d d          Z G d d          ZdS )    )annotationsN)defaultdict)suppress)merge)parse_timedelta)Future)time)TimeoutError
log_errorswait_for)
get_clientc                  N    e Zd ZdZd Zd	dZd Zd	dZd
dZe	dd            Z
dS )VariableExtensionzAn extension for the scheduler to manage Variables

    This adds the following routes to the scheduler

    *  variable-set
    *  variable-get
    *  variable-delete
    c                   || _         t                      | _        t          t                    | _        t          t          j                  | _        t          j                    | _	        | j         j
                            | j        | j        d           | j        | j         j        d<   | j        | j         j        d<   d S )N)variable_setvariable_getvariable-future-releasevariable_delete)	schedulerdict	variablesr   setwaitingasyncio	Conditionwaiting_conditionsstartedhandlersupdategetfuture_releasestream_handlersdelete)selfr   s     4lib/python3.11/site-packages/distributed/variable.py__init__zVariableExtension.__init__   s    ""3''"-g.?"@"@(**&&!Xtx@@	
 	
 	
 EIDW&'@A<@K&'8999    Nc                  K   |&d|d}| j                             |gd|z             nd|d}	 | j        |         }|d         dk    r:|d         |k    r.t          j        |                     |d         |                     n# t          $ r Y nw xY w|| j        vrJ| j        4 d {V  | j                                         d d d           d {V  n# 1 d {V swxY w Y   || j        |<   d S )Nr   )typevaluevariable-%skeysclientmsgpackr)   r*   )	r   client_desires_keysr   r   ensure_futurereleaseKeyErrorr   
notify_all)r$   namekeydatar.   recordolds          r%   r   zVariableExtension.set-   s     ?&55FN..SE-RVBV.WWWW'$77F	H.&C 6{h&&3w<3+>+>%dll3w<&F&FGGG	  	 	 	D	
 t~%%| * * * * * * * *'')))* * * * * * * * * * * * * * * * * * * * * * * * * * *%ts#   B 
BB+C
C!$C!c                R  K   | j         ||f         rk| j        |         4 d {V  | j        |                                          d {V  d d d           d {V  n# 1 d {V swxY w Y   | j         ||f         k| j                            |gd|z             | j         ||f= d S )Nr+   r,   )r   r   waitr   client_releases_keys)r$   r6   r5   s      r%   r2   zVariableExtension.release?   sO     l39% 	;.t4 ; ; ; ; ; ; ; ;-d388:::::::::; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; l39% 	; 	++#}t?S+TTTLd###s   &A
A'*A'c                  K   | j         ||f                             |           | j         ||f         sY| j        |         4 d {V  | j        |                                          d d d           d {V  d S # 1 d {V swxY w Y   d S d S N)r   remover   r4   )r$   r5   r6   tokenr.   s        r%   r!   z VariableExtension.future_releaseG   s(     S$Y&&u---|CI& 	;.t4 ; ; ; ; ; ; ; ;'-88:::; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ;	; 	;s    A::
BBc                   K   t                      }| j        vr||t                      |z
  z
  }nd }|r|dk     rt                      	  fd}t           |            |           d {V   j                                         n#  j                                         w xY w| j        v j        |         }|d         dk    r|d         }t          j                    j        }	 j	        j
                            |          }
|
|
j        nd}|	|d}|d	k    r|
j        j        |d
<   |
j        j        |d<   t!          ||          } j        ||f                             |	           |S )Nr   c                    K    j                                          d {V   j                                          d {V  d S r>   )r   acquirer;   r$   s   r%   _z VariableExtension.get.<locals>._X   sY      ,..000000000,++-----------r'   )timeoutr)   r   r*   lost)r@   stateerred	exception	traceback)r	   r   r
   r   r   r2   uuiduuid4hexr   tasksr    rH   exception_blamerJ   rK   r   r   add)r$   r5   r.   rF   startleftrE   r8   r6   r@   tsrH   msgs   `            r%   r    zVariableExtension.getM   s     $.(("$&&5.1 %q"nn$'. . . . . qqssD1111111111$$&&&&$$&&&& $.((" %&>X%%/CJLL$E%))#..B "BHHFE!E22C#%#5#?K #%#5#?K 63''FLd#''...s   $B	 	B$c                  K   	 | j         |         }|d         dk    r"|                     |d         |           d {V  n# t          $ r Y nw xY wt          t                    5  | j        |= d d d            n# 1 swxY w Y   t          t                    5  | j         |= d d d            n# 1 swxY w Y   | j                            d|z             d S )Nr)   r   r*   r+   )r   r2   r3   r   r   r   remove_client)r$   r5   r.   r9   s       r%   r#   zVariableExtension.deleten   su     	7.&C 6{h&&ll3w<666666666	  	 	 	D	
 h 	. 	.'-	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.h 	% 	%t$	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	$$]T%9:::::s5   A   
AA$	A99A= A=	B--B14B1)NNNN)NNNNN)__name__
__module____qualname____doc__r&   r   r2   r!   r    r   r#    r'   r%   r   r      s         H H H& & & &$$ $ $; ; ; ;   B ; ; ; Z; ; ;r'   r   c                  ^    e Zd ZdZddZed             Zd Zd Zd Z	ddZ
dd	Zd
 Zd ZdS )Variablea-  Distributed Global Variable

    This allows multiple clients to share futures and data between each other
    with a single mutable variable.  All metadata is sequentialized through the
    scheduler.  Race conditions can occur.

    Values must be either Futures or msgpack-encodable data (ints, lists,
    strings, etc..)  All data will be kept and sent through the scheduler, so
    it is wise not to send too much.  If you want to share a large amount of
    data then ``scatter`` it and share the future instead.

    Parameters
    ----------
    name: string (optional)
        Name used by other clients and the scheduler to identify the variable.
        If not given, a random name will be generated.
    client: Client (optional)
        Client used for communication with the scheduler.
        If not given, the default global client will be used.

    Examples
    --------
    >>> from dask.distributed import Client, Variable # doctest: +SKIP
    >>> client = Client()  # doctest: +SKIP
    >>> x = Variable('x')  # doctest: +SKIP
    >>> x.set(123)  # docttest: +SKIP
    >>> x.get()  # docttest: +SKIP
    123
    >>> future = client.submit(f, x)  # doctest: +SKIP
    >>> x.set(future)  # doctest: +SKIP

    See Also
    --------
    Queue: shared multi-producer/multi-consumer queue between clients
    Nc                X    || _         |pdt          j                    j        z   | _        d S )Nz	variable-)_clientrL   rM   rN   r5   )r$   r5   r.   s      r%   r&   zVariable.__init__   s'    :K$*,,*::			r'   c                h    | j         s%	 t                      | _         n# t          $ r Y nw xY w| j         S r>   )ra   r   
ValueErrorrD   s    r%   r.   zVariable.client   sF    | 	)||   |s    
**c                R    | j         st          t          |            d          d S )Nz object not properly initialized. This can happen if the object is being deserialized outside of the context of a Client or Worker.)r.   RuntimeErrorr)   rD   s    r%   _verify_runningzVariable._verify_running   s@    { 	:: ' ' '  	 	r'   c                   K   t          |t                    r3| j        j                            |j        | j                   d {V  d S | j        j                            || j                   d {V  d S )N)r6   r5   )r7   r5   )
isinstancer   r.   r   r   r6   r5   )r$   r*   s     r%   _setzVariable._set   s      eV$$ 	Q+'444SSSSSSSSSSS+'44%di4PPPPPPPPPPPr'   c                \    |                                    | j        j        | j        |fi |S )zSet the value of this variable

        Parameters
        ----------
        value : Future or object
            Must be either a Future or a msgpack-encodable value
        )rf   r.   syncri   )r$   r*   kwargss      r%   r   zVariable.set   s7     	t{	5;;F;;;r'   c                  K   | j         j                            || j        | j         j                   d {V }|d         dk    rt          |d         | j         d|d                   }|d         dk    r'|j                            |d	         |d
                    | j                             d| j        |d         |d         d           n|d         }|S )N)rF   r5   r.   r)   r   r*   TrH   )informrH   rI   rJ   rK   r   r@   )opr5   r6   r@   )	r.   r   r   r5   idr   _state	set_error_send_to_scheduler)r$   rF   dr*   s       r%   _getzVariable._get   s      +'44$)DKN 5 
 
 
 
 
 
 
 
 V9  1W:t{4qzRRREzW$$&&q~q~FFFK**3 IW:wZ	     gJEr'   c                |    |                                   t          |          } | j        j        | j        fd|i|S )a@  Get the value of this variable

        Parameters
        ----------
        timeout : number or string or timedelta, optional
            Time in seconds to wait before timing out.
            Instead of number of seconds, it is also possible to specify
            a timedelta in string format, e.g. "200ms".
        rF   )rf   r   r.   rk   ru   )r$   rF   rl   s      r%   r    zVariable.get   sG     	!'**t{	EE7EfEEEr'   c                    |                                   | j        j        dk    r$| j                            d| j        d           dS dS )zmDelete this variable

        Caution, this affects all clients currently pointing to this variable.
        runningr   )ro   r5   N)rf   r.   statusrs   r5   rD   s    r%   r#   zVariable.delete   sU    
 	;**K**2CTY+W+WXXXXX +*r'   c                     t           | j        ffS r>   )r_   r5   rD   s    r%   
__reduce__zVariable.__reduce__   s    $)%%r'   rX   r>   )rY   rZ   r[   r\   r&   propertyr.   rf   ri   r   ru   r    r#   r{   r]   r'   r%   r_   r_      s        " "H; ; ; ;   X  Q Q Q	< 	< 	<   (F F F FY Y Y& & & & &r'   r_   )
__future__r   r   loggingrL   collectionsr   
contextlibr   tlzr   
dask.utilsr   distributed.clientr   distributed.metricsr	   distributed.utilsr
   r   r   distributed.workerr   	getLoggerrY   loggerr   r_   r]   r'   r%   <module>r      sM   " " " " " "    # # # # # #             & & & & & & % % % % % % $ $ $ $ $ $ @ @ @ @ @ @ @ @ @ @ ) ) ) ) ) )		8	$	$g; g; g; g; g; g; g; g;Tw& w& w& w& w& w& w& w& w& w&r'   