
    0Fieh%                    F   d dl mZ d dlZd dlZd dlZd dlmZmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZmZ d dlmZ d dlmZm Z   ed          Z! G d de          Z" G d d          Z# G d dej$        ee!                   Z% e	dd           G d de%e!                               Z& e	dd           G d dee!                               Z' e	dd           G d d                      Z( G d de%e!                   Z)dS )    )annotationsN)	Awaitable	Generator)	dataclass)	timedelta)GenericLiteralNoReturnTypeVar)IOLoop)Futureto_serialize)LateLoopEventiscoroutinefunctionsyncthread_state)
WrappedKey)
get_client
get_worker_Tc                       e Zd ZdZd fd	Zd Zd Zd Zed             Z	ed             Z
ed	             Zed
             Zd Zd Zd Zed             Z xZS )ActoraM  Controls an object on a remote worker

    An actor allows remote control of a stateful object living on a remote
    worker.  Method calls on this object trigger operations on the remote
    object and return BaseActorFutures on which we can block to get results.

    Examples
    --------
    >>> class Counter:
    ...    def __init__(self):
    ...        self.n = 0
    ...    def increment(self):
    ...        self.n += 1
    ...        return self.n

    >>> from dask.distributed import Client
    >>> client = Client()

    You can create an actor by submitting a class with the keyword
    ``actor=True``.

    >>> future = client.submit(Counter, actor=True)
    >>> counter = future.result()
    >>> counter
    <Actor: Counter, key=Counter-1234abcd>

    Calling methods on this object immediately returns deferred ``BaseActorFuture``
    objects.  You can call ``.result()`` on these objects to block and get the
    result of the function call.

    >>> future = counter.increment()
    >>> future.result()
    1
    >>> future = counter.increment()
    >>> future.result()
    2
    Nc                    t                                          |           || _        || _        || _        d | _        || _        d | _        |                                  d S N)	super__init___cls_address_key_future_worker_client_try_bind_worker_client)selfclsaddresskeyworker	__class__s        1lib/python3.11/site-packages/distributed/actor.pyr   zActor.__init__=   s]    		$$&&&&&    c                   | j         s,	 t                      | _         n# t          $ r
 d | _         Y nw xY w| j        sI	 t	                      | _        t          | j        d          | _        d S # t          $ r d | _        Y d S w xY wd S )NF)inform)r"   r   
ValueErrorr#   r   r   r    r!   r%   s    r+   r$   zActor._try_bind_worker_clientG   s    | 	$$)|| $ $ $#$| 	$$)||%di>>> $ $ $#$	$ 	$s    11.A, ,B Bc                2    d| j         j         d| j         dS )Nz<Actor: z, key=>)r   __name__r(   r0   s    r+   __repr__zActor.__repr__U   s"    ?$),??DH????r,   c                8    t           | j        | j        | j        ffS r   )r   r   r   r(   r0   s    r+   
__reduce__zActor.__reduce__X   s    	4=$(;<<r,   c                    | j         | j        |                                  | j         r| j         j        S | j        j        S r   )r"   r#   r$   loopr0   s    r+   _io_loopzActor._io_loop[   sC    <DL$8((***< 	%<$$<$$r,   c                    | j         | j        |                                  | j         r| j         j        S | j        j        S r   )r"   r#   r$   	schedulerr0   s    r+   _scheduler_rpczActor._scheduler_rpcd   sC    <DL$8((***< 	*<))<))r,   c                &   | j         | j        |                                  | j         r| j                             | j                  S | j        j        r| j                            | j                  S t          | j        j        | j                  S r   )r"   r#   r$   rpcr   direct_to_workersProxyRPCr;   r0   s    r+   _worker_rpczActor._worker_rpcm   s    <DL$8((***< 	G<##DM222|- G|''666 6FFFr,   c                j    | j         r| j         j        S t          j                    | j        j        k    S r   )r#   asynchronous	threading	get_identr"   	thread_idr0   s    r+   _asynchronouszActor._asynchronousy   s1    < 	C<,,&((DL,BBBr,   c                    | j         r | j         j        |g|R i |S | j        r ||i |S t          | j        j        |g|R i |S r   )r#   r   rG   r"   r8   )r%   funcargskwargss       r+   _synczActor._sync   sw    < 	B$4<$T;D;;;F;;;! -tT,V,,,)4A$AAA&AAAr,   c                    t          t          t          |                               }|                    d t          | j                  D                        t          |          S )Nc              3  D   K   | ]}|                     d           |V  dS )_N)
startswith).0attrs     r+   	<genexpr>z Actor.__dir__.<locals>.<genexpr>   s3      MM$8L8LMMMMMMMr,   )setdirtypeupdater   sorted)r%   os     r+   __dir__zActor.__dir__   sP    DJJ  	MM#di..MMMMMMayyr,   c                T     j         r* j         j        dvrt          d j         j        z                                                j        ry j        j         j        k    rdt          t          dd          rN j        j	         j
                 }t          |          t                    rS t                    rfdS S t           j                  t                    r"t          j                   fd            }|S  fd}                     |          S )N)finishedpendingz(Worker holding Actor was lost.  Status: actorFc                 .    t           | i |          S r   )EagerActorFuture)rJ   rK   rR   s     r+   <lambda>z#Actor.__getattr__.<locals>.<lambda>   s    /?d@Uf@U@U/V/V r,   c                       fdt          j                  fd}j                            |           S )Nc                   K   	 j                             j        d D             d                                 D                        d {V } nm# t          $ r` j        r8j                                        sj         d {V                d {V cY S t	          d          }t          |          cY S w xY w| d         dk    rt          | d                   S t          | d                   S )	Nc                ,    g | ]}t          |          S  r   )rQ   args     r+   
<listcomp>zYActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker.<locals>.<listcomp>   s     !D!D!D,s"3"3!D!D!Dr,   c                4    i | ]\  }}|t          |          S re   r   )rQ   kvs      r+   
<dictcomp>zYActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker.<locals>.<dictcomp>   s$    #R#R#R41aA|A#R#R#Rr,   )functionr^   rJ   rK   z Unable to contact Actor's workerstatusOKresult	exception)	rA   actor_executer(   itemsOSErrorr!   done_Error_OK)ro   excrJ   r(   rK   run_actor_function_on_workerr%   s     r+   rx   zEActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker   sQ     /'+'7'E'E%("&(!D!Dt!D!D!D#R#R6<<>>#R#R#R	 (F ( ( " " " " " " # / / /< /0A0A0C0C /"&,.......)E)E)G)G#G#G#G#G#G#GGGG")*L"M"MC#)#;;.../ h'4//"6(#3444!&"5666s   AA AB?B?>B?)io_loopc                 R   K                                       d {V            d S r   )_set_result)actor_futurerx   s   r+   wait_then_set_resultz=Actor.__getattr__.<locals>.func.<locals>.wait_then_set_result   s=       ,,3O3O3Q3Q-Q-Q-Q-Q-Q-QRRRRRr,   )ActorFuturer9   add_callback)rJ   rK   r}   r|   rx   r(   r%   s   `` @@r+   rI   zActor.__getattr__.<locals>.func   s    7 7 7 7 7 7 7 7 7&  +4=AAAS S S S S S **+?@@@##r,   c                    K   j                             j                   d {V } | d         dk    r| d         S | d         )N)	attributer^   rm   rn   ro   rp   )rA   actor_attributer(   )xr(   r%   s    r+   get_actor_attribute_from_workerz:Actor.__getattr__.<locals>.get_actor_attribute_from_worker   sn      *::! ;         X;$&&X;&K.(r,   )r!   rm   r/   r$   r"   r'   r   getattrr   actorsr(   r   callabler   	functoolswrapsrL   )r%   r(   r^   rI   r   rR   s   ``   @r+   __getattr__zActor.__getattr__   sg   < 	DL/7NNN:T\=PP   	$$&&&L	$55gu55 6 L'1E5#&&D"4(( $ VVVVVty#&&D>> ,	?_T""$ $ $ $ $ #"$8 K) ) ) ) ) ) ::=>>>r,   c                    | j         j        S r   )r!   clientr0   s    r+   r   zActor.client   s    |""r,   r   )r3   
__module____qualname____doc__r   r$   r4   r6   propertyr9   r<   rA   rG   rL   rZ   r   r   __classcell__)r*   s   @r+   r   r      s9       $ $L' ' ' ' ' '$ $ $@ @ @= = = % % X% * * X* 	G 	G X	G C C XCB B B  
E? E? E?N # # X# # # # #r,   r   c                      e Zd ZdZd Zd ZdS )r@   zQ
    An rpc-like object that uses the scheduler's rpc to connect to a worker
    c                "    || _         || _        d S r   )r>   r   )r%   r>   r'   s      r+   r   zProxyRPC.__init__   s    r,   c                      fd}|S )Nc                 d   K   | d<   j                             j        |            d {V }|S )Nop)r)   msg)r>   proxyr   )r   ro   r(   r%   s     r+   rI   z"ProxyRPC.__getattr__.<locals>.func   s@      CI8>>C>HHHHHHHHFMr,   re   )r%   r(   rI   s   `` r+   r   zProxyRPC.__getattr__   s)    	 	 	 	 	 	
 r,   N)r3   r   r   r   r   r   re   r,   r+   r@   r@      s<                  r,   r@   c                  `    e Zd ZdZej        ddd            Zej        dd	            ZddZdS )BaseActorFuturea  Future to an actor's method call

    Whenever you call a method on an Actor you get a BaseActorFuture immediately
    while the computation happens in the background.  You can call ``.result``
    to block and collect the full result

    See Also
    --------
    Actor
    Ntimeoutstr | timedelta | float | Nonereturnr   c                    d S r   re   r%   r   s     r+   ro   zBaseActorFuture.result       r,   boolc                    d S r   re   r0   s    r+   rt   zBaseActorFuture.done   r   r,   Literal['<ActorFuture>']c                    dS )Nz<ActorFuture>re   r0   s    r+   r4   zBaseActorFuture.__repr__   s    r,   r   r   r   r   r   r   r   )r   r   )	r3   r   r   r   abcabstractmethodro   rt   r4   re   r,   r+   r   r      s        	 	 	     	        r,   r   TF)frozeneqc                  8    e Zd ZU dZded<   ddZddd
ZddZdS )r`   zUFuture to an actor's method call when an actor calls another actor on the same workerr   _resultr   Generator[object, None, _T]c              #     K   | j         S r   r   r0   s    r+   	__await__zEagerActorFuture.__await__	  s      |r,   Nr   objectc                    | j         S r   r   r   s     r+   ro   zEagerActorFuture.result  s
    |r,   Literal[True]c                    dS )NTre   r0   s    r+   rt   zEagerActorFuture.done  s    tr,   r   r   r   )r   r   r   r   )r   r   )r3   r   r   r   __annotations__r   ro   rt   re   r,   r+   r`   r`     sf         __KKK            r,   r`   c                  "    e Zd ZU ded<   ddZdS )rv   r   _vr   c                    | j         S r   )r   r0   s    r+   unwrapz
_OK.unwrap  s	    wr,   Nr   r   r3   r   r   r   r   re   r,   r+   rv   rv     s3         
FFF     r,   rv   c                  "    e Zd ZU ded<   ddZdS )ru   	Exception_er   r
   c                    | j         r   )r   r0   s    r+   r   z_Error.unwrap   s	    gr,   N)r   r
   r   re   r,   r+   ru   ru     s3         MMM     r,   ru   c                  @    e Zd ZddZddZddZdd
ZddZdddZdS )r~   ry   r   c                H    || _         t                      | _        d | _        d S r   )r9   r   _event_out)r%   ry   s     r+   r   zActorFuture.__init__%  s    #oo-1			r,   r   r   c                N    |                                                                  S r   )r   r   r0   s    r+   r   zActorFuture.__await__*  s    ||~~'')))r,   r   c                4    | j                                         S r   )r   is_setr0   s    r+   rt   zActorFuture.done-  s    {!!###r,   r   c                   K   | j                                          d {V  | j        }|J |                                S r   )r   waitr   r   r%   outs     r+   r   zActorFuture._result0  sJ      k         izz||r,   r   _Error | _OK[_T]Nonec                F    || _         | j                                         d S r   )r   r   rT   r   s     r+   r{   zActorFuture._set_result6  s!    	r,   Nr   r   c                :    t          | j        | j        |          S )N)callback_timeout)r   r9   r   r   s     r+   ro   zActorFuture.result:  s    DM4<'JJJJr,   )ry   r   r   r   r   )r   r   r   r   r   r   )	r3   r   r   r   r   rt   r   r{   ro   re   r,   r+   r~   r~   $  s        2 2 2 2
* * * *$ $ $ $      K K K K K K Kr,   r~   )*
__future__r   r   r   rD   collections.abcr   r   dataclassesr   datetimer   typingr   r	   r
   r   tornado.ioloopr   distributed.clientr   distributed.protocolr   distributed.utilsr   r   r   r   distributed.utils_commr   distributed.workerr   r   r   r   r@   ABCr   r`   rv   ru   r~   re   r,   r+   <module>r      s   " " " " " " 



         0 0 0 0 0 0 0 0 ! ! ! ! ! !       6 6 6 6 6 6 6 6 6 6 6 6 ! ! ! ! ! ! % % % % % % - - - - - - T T T T T T T T T T T T - - - - - - 5 5 5 5 5 5 5 5WT]]@# @# @# @# @#J @# @# @#F       $    cgy}   0 $5!!!    r*   "!  $5!!!    '"+   "! $5!!!       "!K K K K K/"% K K K K Kr,   