
    0Fie,                   
   U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZmZ d dlmZmZmZmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZm Z m!Z!m"Z" d dl#m$Z$ d d	l%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z, d d
l-m.Z. d dl/Z/d dl0m1Z1 d dl2m3Z3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> d dl?m@Z@mAZAmBZB d dlCmDZD d dlEmFZFmGZG d dlHmIZJ d dlKmLZL  ejM        d          ZNe&r'd dlOmPZPmQZQmRZR  eQd          ZSd dlTmUZU d dlVmWZW d dlXmYZY e)d         ZZde[d<   h dZ\d e[d!<   d"d#hZ]d e[d$<   h d%Z^d e[d&<   d'Z_d(Z` G d) d*e*          Za G d+ d,eb          Zc G d- d.ec          Zd G d/ d0eb          Ze G d1 d2eb          Zfedd5            Zge	jh        d6k    rd7d8ini Zi edd9d9d:ei G d; d<                      Zje G d= d>                      Zk G d? d@          Zle G dA dBek                      Zme G dC dDek                      Zne G dE dFek                      Zo G dG dHek          Zpe G dI dJep                      Zqe G dK dLep                      Zre G dM dNep                      Zse G dO dPep                      Zte G dQ dRep                      Zue G dS dTep                      Zve G dU dVep                      Zwe G dW dXep                      Zxe G dY dZ                      Zye G d[ d\ey                      Zze G d] d^ey                      Z{e G d_ d`ey                      Z|e G da dbey                      Z}e G dc dde}                      Z~e G de dfe}                      Ze G dg dhe}                      Ze G di dje}                      Ze G dk dley                      Ze G dm dney                      Ze G do dpey                      Ze G dq dre                      Ze G ds dte                      Ze G du dve                      Ze G dw dxey                      Ze G dy dzey                      Ze G d{ d|ey                      Ze G d} d~ey                      Ze G d dey                      Ze G d dey                      Ze G d dey                      Ze G d dey                      Ze G d dey                      Zeeje+eZef         f         Zde[d<   eek         Zde[d<   eeef         Zde[d<   ddZ G d d          Z G d dej                  Z G d d          Z G d d          ZdS )    )annotationsN)Counterdefaultdictdeque)		AwaitableCallable
Collection	ContainerHashableIteratorMappingMutableMappingSet)copy)	dataclassfield)	lru_cachepartialsingledispatchmethodwraps)chain)TYPE_CHECKINGAnyClassVarLiteral	TypedDictUnioncast)peekn)Key)	key_splitparse_bytestypename)worker_story)HeapSet)get_address_host)ErrorMessageerror_message)DelayedMetricsLedger	monotonictime)pickle)	SerializeToPickle)safe_sizeof)recursive_to_dictz distributed.worker.state_machine)NotRequired	ParamSpec	TypeAliasP)WorkerPlugin)	T_runspec)Worker)	cancelledconstrainederror	executingfetchflight	forgottenlong-runningmemorymissingreadyreleasedrescheduledresumedwaitingr3   TaskStateState>   rB   rF   r;   r9   r?   zSet[TaskStateState]
PROCESSINGrB   r9   READY>	   r<   rB   r=   rA   rE   rF   r;   r9   r?   WAITING_FOR_DATAz--no-value-sentinel--c                  8    e Zd ZU ded<   ded<   ded<   ded<   dS )		StartStopzHLiteral['compute', 'transfer', 'disk-read', 'disk-write', 'deserialize']actionfloatstartstopzNotRequired[str]sourceN)__name__
__module____qualname____annotations__     @lib/python3.11/site-packages/distributed/worker_state_machine.pyrM   rM   k   s<         TTTTLLLKKKrX   rM   c                  2    e Zd ZddZddZddZeZddZdS )InvalidTransitionkeyr    rP   rG   finishstorylist[tuple]c                >    || _         || _        || _        || _        d S Nr\   rP   r]   r^   )selfr\   rP   r]   r^   s        rY   __init__zInvalidTransition.__init__s   s$     



rX   returntuple[Callable, tuple]c                T    t          |           | j        | j        | j        | j        ffS ra   )typer\   rP   r]   r^   rc   s    rY   
__reduce__zInvalidTransition.__reduce__   s$    DzzDHdj$+tzJJJrX   strc                    | j         j         d| j        d| j         d| j         dz   dz   d                    t          t          | j                            z   S )N:  :: ->
  Story:
    
    )		__class__rS   r\   rP   r]   joinmaprk   r^   ri   s    rY   __repr__zInvalidTransition.__repr__   se    ~&UU$(UU$*UUUU mmCTZ00112	
rX   tuple[str, dict[str, Any]]c                <    d| j         | j        | j        | j        dfS )Nzinvalid-worker-transitionrb   rb   ri   s    rY   to_eventzInvalidTransition.to_event   s-    'x+	 
 	
rX   N)r\   r    rP   rG   r]   rG   r^   r_   re   rf   re   rk   re   rw   rS   rT   rU   rd   rj   rv   __str__ry   rW   rX   rY   r[   r[   r   so        
 
 
 
K K K K
 
 
 
 G	
 	
 	
 	
 	
 	
rX   r[   c                        e Zd Zd fdZ xZS )TransitionCounterMaxExceededre   rw   c                R    t                                                      \  }}d|fS )Nztransition-counter-max-exceeded)superry   )rc   topicmsgrs   s      rY   ry   z%TransitionCounterMaxExceeded.to_event   s'    WW%%''
s0#55rX   r|   )rS   rT   rU   ry   __classcell__rs   s   @rY   r   r      s=        6 6 6 6 6 6 6 6 6 6rX   r   c                  2    e Zd ZddZdd
ZddZeZddZdS )InvalidTaskStater\   r    staterG   r^   r_   c                0    || _         || _        || _        d S ra   r\   r   r^   )rc   r\   r   r^   s       rY   rd   zInvalidTaskState.__init__   s     



rX   re   rf   c                H    t          |           | j        | j        | j        ffS ra   )rh   r\   r   r^   ri   s    rY   rj   zInvalidTaskState.__reduce__   s     DzzDHdj$*===rX   rk   c                    | j         j         d| j        d| j         dz   dz   d                    t          t          | j                            z   S )Nrm   rn   rp   rq   rr   )rs   rS   r\   r   rt   ru   rk   r^   ri   s    rY   rv   zInvalidTaskState.__repr__   s[    ~&FF$(FF$*FF mmCTZ00112	
rX   rw   c                0    d| j         | j        | j        dfS )Nzinvalid-worker-task-stater   r   ri   s    rY   ry   zInvalidTaskState.to_event   s(    'x 
 	
rX   N)r\   r    r   rG   r^   r_   rz   r{   r|   r}   rW   rX   rY   r   r      sk           > > > >
 
 
 
 G
 
 
 
 
 
rX   r   c                      e Zd ZdZdS )RecommendationsConflictzOTwo or more recommendations for the same task suggested different finish statesN)rS   rT   rU   __doc__rW   rX   rY   r   r      s        YYYYrX   r   re   intc                 Z    t          t          j                            d                    S )Nz'distributed.scheduler.default-data-size)r"   daskconfiggetrW   rX   rY   _default_data_sizer      s    t{'PQQRRRrX   )   
   slotsTF)repreqc                     e Zd ZU dZded<    ed          Zded<   eZded	<   d
Z	ded<    ee
          Zded<    ee
          Zded<    ee
          Zded<    ee
          Zded<   dZded<   d
Zded<   d
Zded<   d
Zded<   d
Zded<    ee
          Zded<   d
Zd ed!<    ee          Zd"ed#<   d
Zd$ed%<   d
Zd$ed&<   d'Zded(<   d'Zded)<   d
Zd*ed+<   d,Zded-<    ee          Zd.ed/<   d
Z ded0<   d
Z!ded1<    ee          Z"d2ed3<   d
Z#d4ed5<   d
Z$d6ed7<   d
Z%d ed8<   dZ&d9ed:<    e'j(                    Z)d;ed<<    ed          Z*d=ed><   dJdAZ+dKdBZ,dLdCZ-dLdDZ.dEdFdMdIZ/d
S )N	TaskStatezHolds volatile state relating to an individual Dask task.

    Not to be confused with :class:`distributed.scheduler.TaskState`, which holds
    similar information on the scheduler side.
    r    r\   F)initrk   prefixr   run_idNT_runspec | Nonerun_spec)default_factoryset[TaskState]dependencies
dependentswaiting_for_datawaitersrC   rG   r   z4Literal['executing', 'long-running', 'flight', None]previousz!Literal['fetch', 'waiting', None]nextfloat | Nonedurationztuple[int, ...] | Nonepriorityset[str]who_has
str | Nonecoming_fromdict[str, float]resource_restrictionsSerialize | None	exception	traceback exception_texttraceback_texttype | Nonerh   r   suspicious_countlist[StartStop]
startstops
start_time	stop_timedictmetadata
int | Nonenbytesdict | Noner   span_idbooldonez$ClassVar[weakref.WeakSet[TaskState]]
_instancesr   __weakref__re   Nonec                v    t           j                            |            t          | j                  | _        d S ra   )r   r   addr!   r\   r   ri   s    rY   __post_init__zTaskState.__post_init__$  s.      &&&))rX   c                    | j         dk    rd| j         d}n&| j         dk    rd| j         d| j         d}n| j         }d| j        d| d	S )
Nr8   z
cancelled()rE   zresumed(ro   z<TaskState  >)r   r   r   r\   rc   r   s     rY   rv   zTaskState.__repr__(  sp    :$$1111EEZ9$$<t}<<	<<<EEJE2TX22%2222rX   c                     t          |           S )a  Override dataclass __hash__, reverting to the default behaviour
        hash(o) == id(o).

        Note that we also defined @dataclass(eq=False), which reverts to the default
        behaviour (a == b) == (a is b).

        On first thought, it would make sense to use TaskState.key for equality and
        hashing. However, a task may be forgotten and a new TaskState object with the
        same key may be created in its place later on. In the Worker state, you should
        never have multiple TaskState objects with the same key; see
        WorkerState.validate_state for relevant checks. We can't assert the same thing
        in __eq__ though, as multiple objects with the same key may appear in
        TaskState._instances for a brief period of time.
        )idri   s    rY   __hash__zTaskState.__hash__1  s     $xxrX   c                4    | j         }||nt                      S ra   )r   r   )rc   r   s     rY   
get_nbyteszTaskState.get_nbytesB  s    +vv1C1E1EErX   rW   excluder   Container[str]c               b    t          | |d          }d |                                D             S )a~  Dictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict

        Notes
        -----
        This class uses ``_to_dict_no_nest`` instead of ``_to_dict``.
        When a task references another task, just print the task repr. All tasks
        should neatly appear under Worker.tasks. This also prevents a RecursionError
        during particularly heavy loads, which have been observed to happen whenever
        there's an acyclic dependency chain of ~200+ tasks.
        T)r   membersc                *    i | ]\  }}||d k    ||S )r   rW   .0kvs      rY   
<dictcomp>z.TaskState._to_dict_no_nest.<locals>.<dictcomp>Y  s(    DDDADa8mm1mmmrX   )r0   items)rc   r   outs      rY   _to_dict_no_nestzTaskState._to_dict_no_nestF  s4    "  gtDDDDDDDDDrX   re   r   r{   re   r   r   r   re   r   )0rS   rT   rU   r   rV   r   r   RUN_ID_SENTINELr   r   setr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rh   r   listr   r   r   r   r   r   r   r   weakrefWeakSetr   r   r   rv   r   r   r   rW   rX   rY   r   r      s\          HHH%U###F####!F!!!!
 "&H%%%% $)5#=#=#=L====!&s!;!;!;J;;;;',uS'A'A'AAAAA#eC888G8888 'E&&&& FJHIIII.2D2222 "H!!!!'+H++++c222G2222"K"""".3eD.I.I.IIIII"&I&&&&"&I&&&&NND"'%"="="=J====#J####"I"""" U4000H0000F#K#### G D7Fw7H7HJHHHH u%(((K((((* * * *3 3 3 3   "F F F F =? E E E E E E E ErX   r   c                  L     e Zd ZU dZdZded<   edd	            Zd fdZ xZ	S )InstructionzLCommand from the worker state machine to the Worker, in response to an eventstimulus_idrk   r   kwargsr   re   _InstructionMatchc                    t          | fi |S )a  Generate a partial match to compare against an Instruction instance.
        The typical usage is to compare a list of instructions returned by
        :meth:`WorkerState.handle_stimulus` or in :attr:`WorkerState.stimulus_log` vs.
        an expected list of matches.

        Examples
        --------

        .. code-block:: python

            instructions = ws.handle_stimulus(...)
            assert instructions == [
                TaskFinishedMsg.match(key="x"),
                ...
            ]
        )r   clsr   s     rY   matchzInstruction.matchc  s    $ !/////rX   otherobjectr   c                |    t          |t                    r|| k    S t                                          |          S ra   )
isinstancer   r   __eq__)rc   r   rs   s     rY   r   zInstruction.__eq__w  s8    e.// 	)D=  77>>%(((rX   )r   r   re   r   r   r   re   r   )
rS   rT   rU   r   	__slots__rV   classmethodr   r   r   r   s   @rY   r   r   \  st         VV I0 0 0 [0&) ) ) ) ) ) ) ) ) )rX   r   c                  @    e Zd ZU dZded<   ded<   ddZdd
ZddZdS )r   z_Utility class, to be used to test an instructions list.
    See :meth:`Instruction.match`.
    type[Instruction]r   dict[str, Any]r   r   c                "    || _         || _        d S ra   r   )rc   r   r   s      rY   rd   z_InstructionMatch.__init__  s    rX   re   rk   c                    | j         j        }d                    d | j                                        D                       }| d| dS )N, c              3  *   K   | ]\  }}| d | V  dS )=NrW   r   s      rY   	<genexpr>z-_InstructionMatch.__repr__.<locals>.<genexpr>  s0      JJda!zzazzJJJJJJrX   (z) (partial match))r   rS   rt   r   r   )rc   cls_str
kwargs_strs      rY   rv   z_InstructionMatch.__repr__  sQ    (#YYJJdk6G6G6I6IJJJJJ
99J9999rX   r   r   r   c                    t                    | j        urdS t          fd| j                                        D                       S )NFc              3  F   K   | ]\  }}t          |          |k    V  d S ra   getattr)r   r   r   r   s      rY   r	  z+_InstructionMatch.__eq__.<locals>.<genexpr>  s6      JJda75!$$)JJJJJJrX   )rh   r   allr   r   )rc   r   s    `rY   r   z_InstructionMatch.__eq__  sL    ;;dh&&5JJJJdk6G6G6I6IJJJJJJrX   N)r   r  r   r   r{   r   )rS   rT   rU   r   rV   rd   rv   r   rW   rX   rY   r   r     s|              : : : :
K K K K K KrX   r   c                  2    e Zd ZU dZded<   ded<   ded<   dS )		GatherDep)worker	to_gathertotal_nbytesrk   r  zset[Key]r  r   r  NrS   rT   rU   r   rV   rW   rX   rY   r  r    s8         7IKKKrX   r  c                      e Zd ZU dZded<   dS )Executer\   r    r\   Nr  rW   rX   rY   r  r             IHHHHHrX   r  c                      e Zd ZU dZded<   dS )RetryBusyWorkerLaterr  rk   r  Nr  rW   rX   rY   r  r             IKKKKKrX   r  c                  &    e Zd ZU ded<   dZddZdS )	SendMessageToSchedulerzClassVar[str]oprW   re   r  c                V      fd j         D             } j        |d<    j        |d<   |S )z@Convert object to dict so that it can be serialized with msgpackc                2    i | ]}|t          |          S rW   r  )r   r   rc   s     rY   r   z2SendMessageToScheduler.to_dict.<locals>.<dictcomp>  s%    ???QQa  ???rX   r"  r   )rV   r"  r   )rc   ds   ` rY   to_dictzSendMessageToScheduler.to_dict  s<    ????$*>???'$+-rX   Nre   r  )rS   rT   rU   rV   r   r&  rW   rX   rY   r!  r!    s;         I     rX   r!  c                       e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<    ee          Zd fdZ xZS )TaskFinishedMsgtask-finishedr    r\   r   r   r   r   bytesrh   rk   r#   r   r   threadr   r   re   r  c                R    t                                                      }d|d<   |S )NOKstatusr   r&  rc   r%  rs   s     rY   r&  zTaskFinishedMsg.to_dict  s$    GGOO(rX   r'  )	rS   rT   rU   r"  rV   tupler   r&  r   r   s   @rY   r)  r)    s         	BHHHKKKKKKMMMNNNo&&I         rX   r)  c                       e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   d
ed<   ded<   ded<    ee          Zd fdZe	 ddd            Z	 xZ
S )TaskErredMsgz
task-erredr    r\   r   r   r-   r   r   r   rk   r   r   r   r,  r   r   re   r  c                R    t                                                      }d|d<   |S )Nr:   r/  r0  r1  s     rY   r&  zTaskErredMsg.to_dict  s$    GGOO(rX   Ntsr   r   c                    | j         sJ t          | j        || j         | j        | j        | j        || j        |	  	        S )N)	r\   r   r   r   r   r   r,  r   r   )r   r4  r\   r   r   r   r   )r6  r   r   r,  s       rY   	from_taskzTaskErredMsg.from_task  sR     |ll,,}#

 

 

 
	
rX   r'  ra   )
r6  r   r   r   r   rk   r,  r   re   r4  )rS   rT   rU   r"  rV   r2  r   r&  staticmethodr8  r   r   s   @rY   r4  r4    s         	BHHHKKKo&&I     
 KO
 
 
 
 \
 
 
 
 
rX   r4  c                  "    e Zd ZU dZdZded<   dS )ReleaseWorkerDataMsgzrelease-worker-datar  r    r\   NrS   rT   rU   r"  r   rV   rW   rX   rY   r;  r;    s#         	BIHHHHHrX   r;  c                  "    e Zd ZU dZdZded<   dS )RescheduleMsg
rescheduler  r    r\   Nr<  rW   rX   rY   r>  r>    s#         	BIHHHHHrX   r>  c                  ,    e Zd ZU dZdZded<   ded<   dS )LongRunningMsgr?   r\   compute_durationr    r\   r   rC  Nr<  rW   rX   rY   rA  rA    s1         	B+IHHH""""""rX   rA  c                  "    e Zd ZU dZdZded<   dS )
AddKeysMsgadd-keyskeysCollection[Key]rH  Nr<  rW   rX   rY   rE  rE  
  s(         	BIrX   rE  c                  &    e Zd ZU dZdZdZded<   dS )RequestRefreshWhoHasMsga{  Worker -> Scheduler asynchronous request for updated who_has information.
    Not to be confused with the scheduler.who_has synchronous RPC call, which is used
    by the Client.

    See also
    --------
    RefreshWhoHasEvent
    distributed.scheduler.Scheduler.request_refresh_who_has
    distributed.client.Client.who_has
    distributed.scheduler.Scheduler.get_who_has
    zrequest-refresh-who-hasrG  rI  rH  NrS   rT   rU   r   r"  r   rV   rW   rX   rY   rK  rK    s4         
 
 
#BIrX   rK  c                  0    e Zd ZU dZdZdZded<   ded<   dS )	StealResponseMsgzkWorker->Scheduler response to ``{op: steal-request}``

    See also
    --------
    StealRequestEvent
    zsteal-response)r\   r   r    r\   zTaskStateState | Noner   NrL  rW   rX   rY   rN  rN  &  s=           
B IHHH      rX   rN  c                  v    e Zd ZU dZdZded<   i Zded<   ddZddZddZ	ddddZ
edd            ZddZdS ) StateMachineEventzDBase abstract class for all stimuli that can modify the worker state)r   handledrk   r   z,ClassVar[dict[str, type[StateMachineEvent]]]_classesargsr   r   re   c                H    t                               |           }d|_        |S )z<Hack to initialize the ``handled`` attribute in Python <3.10N)r   __new__rQ  )r   rS  r   rc   s       rY   rU  zStateMachineEvent.__new__C  s    ~~c""rX   r   c                .    | t           j        | j        <   d S ra   )rP  rR  rS   r   s    rY   __init_subclass__z#StateMachineEvent.__init_subclass__I  s    36"3<000rX   rQ  rO   c                   || _         | S )zProduce a variant version of self that is small enough to be stored in memory
        in the medium term and contains meaningful information for debugging
        rQ  )rc   rQ  s     rY   to_loggablezStateMachineEvent.to_loggableL  s     &-rX   rW   r   r   r   r   c               T   dt          |           j        i}t          |           D ]r}||v s|                    d          rt	          t          t          |           |d          t                    rNt          | |          }t          |          s|||<   st          ||          S )zDictionary representation for debugging purposes.

        See also
        --------
        distributed.utils.recursive_to_dict
        r   _Nr   )	rh   rS   dir
startswithr   r  propertycallabler0   )rc   r   infor   r   s        rY   _to_dictzStateMachineEvent._to_dictS  s     tDzz*+T 	 	AG||q||C00|'$t**a66AA a  AA;; Q w7777rX   r%  c                    |                                  }t          j        |                    d                   }|                    d          } |di |}||_        |                                 |S )zConvert the output of ``recursive_to_dict`` back into the original object.
        The output object is meaningful for the purpose of rebuilding the state machine,
        but not necessarily identical to the original.
        r   rQ  rW   )r   rP  rR  poprQ  _after_from_dict)r%  r   r   rQ  insts        rY   	from_dictzStateMachineEvent.from_dicte  sj     (E):):;**Y''s}}V}}rX   c                    dS )zFOptional post-processing after an instance is created by ``from_dict``NrW   ri   s    rY   rf  z"StateMachineEvent._after_from_dicts        rX   N)rS  r   r   r   re   rP  r   rQ  rO   re   rP  r   )r%  r   re   rP  )rS   rT   rU   r   r   rV   rR  rU  rX  r[  rc  r9  rh  rf  rW   rX   rY   rP  rP  6  s         NN*I
 >@H????   7 7 7 7    57 8 8 8 8 8 8$    \U U U U U UrX   rP  c                      e Zd ZdZdS )
PauseEventrW   NrS   rT   rU   r   rW   rX   rY   rm  rm  w          IIIrX   rm  c                      e Zd ZdZdS )UnpauseEventrW   Nrn  rW   rX   rY   rq  rq  |  ro  rX   rq  c                      e Zd ZU dZded<   dS )RetryBusyWorkerEventr  rk   r  Nr  rW   rX   rY   rs  rs    r  rX   rs  c                  ,    e Zd ZU dZdZded<   ded<   dS )GatherDepDoneEventz?:class:`GatherDep` instruction terminated (abstract base class))r  r  rk   r  r   r  NrS   rT   rU   r   r   rV   rW   rX   rY   ru  ru    s2         II*IKKKrX   ru  c                  2    e Zd ZU dZdZded<   dd	ZddZdS )GatherDepSuccessEventzV:class:`GatherDep` instruction terminated:
    remote worker fetched successfully
    datadict[Key, object]rz  rQ  rO   re   rP  c               ^    t          |           }||_        d | j        D             |_        |S )Nc                    i | ]}|d S ra   rW   r   r   s     rY   r   z5GatherDepSuccessEvent.to_loggable.<locals>.<dictcomp>  s    ///At///rX   )r   rQ  rz  rc   rQ  r   s      rY   r[  z!GatherDepSuccessEvent.to_loggable  s0    4jj//TY///
rX   r   c                2    d | j         D             | _         d S )Nc                    i | ]}|d S ra   rW   r~  s     rY   r   z:GatherDepSuccessEvent._after_from_dict.<locals>.<dictcomp>  s    000Q000rX   ry  ri   s    rY   rf  z&GatherDepSuccessEvent._after_from_dict  s    00di000			rX   Nrk  r   )rS   rT   rU   r   r   rV   r[  rf  rW   rX   rY   rx  rx    s[           I   1 1 1 1 1 1rX   rx  c                      e Zd ZdZdZdS )GatherDepBusyEventzI:class:`GatherDep` instruction terminated:
    remote worker is busy
    rW   NrS   rT   rU   r   r   rW   rX   rY   r  r              IIIrX   r  c                      e Zd ZdZdZdS )GatherDepNetworkFailureEventzr:class:`GatherDep` instruction terminated:
    network failure while trying to communicate with remote worker
    rW   Nr  rW   rX   rY   r  r    r  rX   r  c                  r    e Zd ZU dZded<   ded<   ded<   ded<    ee          ZddZedd            Z	dS )GatherDepFailureEventzclass:`GatherDep` instruction terminated:
    generic error raised (not a network failure); e.g. data failed to deserialize.
    r-   r   r   r   rk   r   r   re   r   c                T    t          t                                | _        d | _        d S ra   r-   	Exceptionr   r   ri   s    rY   rf  z&GatherDepFailureEvent._after_from_dict       "9;;//rX   errBaseExceptionr  r  r   r   c          	     t    t          |          } | |||d         |d         |d         |d         |          S )Nr   r   r   r   )r  r  r   r   r   r   r   )r(   )r   r  r  r  r   r   s         rY   from_exceptionz$GatherDepFailureEvent.from_exception  sU     C  s%+&+&/0/0#
 
 
 	
rX   Nr   )
r  r  r  rk   r  r   r   rk   re   r  )
rS   rT   rU   r   rV   r2  r   rf  r   r  rW   rX   rY   r  r    s           o&&I    
 
 
 [
 
 
rX   r  c                      e Zd ZU ded<   dZdS )RemoveWorkerEventrk   r  r  N)rS   rT   rU   rV   r   rW   rX   rY   r  r    s         KKKIIIrX   r  c            
         e Zd ZU ded<   ded<   ded<   ded<   d	ed
<   ded<   ded<   ded<   ded<   ded<   ded<    ee          Zd4dZddd5dZd6d Zd7d"Z	d4d#Z
ed4d$            Zed8d&            Zed'd(d(d)d*d(d+d(d,d9d3            Zd(S ):ComputeTaskEventr    r\   r   r   dict[Key, Collection[str]]r   dict[Key, int]r   tuple[int, ...]r   rO   r   r   r   r   r   r   actorr   r   r   r   re   r   c                    t          | j        t                    rt          | j                  | _        t          | j        t
                    r| j        j        | _        d S d S ra   )r   r   r   r2  r   r.   rz  ri   s    rY   r   zComputeTaskEvent.__post_init__  sW    dmT** 	1!$-00DMdmX.. 	/ !M.DMMM	/ 	/rX   rW   r   r   r   c               ^    t                               |                                 |          S )Nr   )rP  rc  _clean)rc   r   s     rY   rc  zComputeTaskEvent._to_dict  s#     ))$++--)IIIrX   rP  c                2    t          |           }d |_        |S ra   )r   r   )rc   r   s     rY   r  zComputeTaskEvent._clean  s    4jj
rX   rQ  c               <    |                                  }||_        |S ra   )r  rQ  r  s      rY   r[  zComputeTaskEvent.to_loggable  s    kkmm
rX   c                    d | _         d S ra   )r   ri   s    rY   rf  z!ComputeTaskEvent._after_from_dict  s    rX   c                    d S ra   rW   rW  s    rY   _fzComputeTaskEvent._f
  s    rX   tuple[Callable, tuple, dict]c                    | j         di fS )NrW   )r  rW  s    rY   dummy_runspeczComputeTaskEvent.dummy_runspec  s    BrX   r   Nr         ?F)r   r   r   r   r   r   r  r   !dict[Key, Collection[str]] | Nonedict[Key, int] | Nonedict[str, float] | Noner   r   rk   c       	            t          | ||pi |pd |pdD             ||t                                           |pi ||pi d|	          S )Build a dummy event, with most attributes set to a reasonable default.
        This is a convenience method to be used in unit testing only.
        c                    i | ]}|d S    rW   r~  s     rY   r   z*ComputeTaskEvent.dummy.<locals>.<dictcomp>'  s    :::qa:::rX   rW   N)r\   r   r   r   r   r   r   r   r  r   r   r   )r  r  )
r\   r   r   r   r   r   r   r  r   r   s
             rY   dummyzComputeTaskEvent.dummy  sq    "  Mr:::GMr:::%3355"7"=2#)r#
 
 
 	
rX   r   r   re   rP  rk  )re   r  )r\   r    r   r   r   r  r   r  r   r  r   rO   r   r  r  r   r   r   r   rk   re   r  )rS   rT   rU   rV   r2  r   r   rc  r  r[  rf  r   r  r  r9  r  rW   rX   rY   r  r    s        HHHKKK''''OOO++++KKKo&&I/ / / / 57 J J J J J J   
   
       [       [   59(,$(9=#'
 
 
 
 
 \
 
 
rX   r  c                  "    e Zd ZU dZded<   dZdS )ExecuteDoneEventz\Abstract base event for all the possible outcomes of a :class:`Compute`
    instruction
    r    r\   r  N)rS   rT   rU   r   rV   r   rW   rX   rY   r  r  3  s*           HHHIIIrX   r  c                       e Zd ZU ded<   ded<   ded<   ded<   ded<   d	ed
<    ee          ZddZddd  fdZd!dZe		 d"dddd#d            Z
 xZS )$ExecuteSuccessEventr   r   r   valuerO   rP   rQ   r   r   rh   rQ  re   rP  c               @    t          |           }||_        d |_        |S ra   )r   rQ  r  r  s      rY   r[  zExecuteSuccessEvent.to_loggableG  s     4jj	
rX   rW   r   r   r   r   c                   t                                          |          }d|vrt          | j                  |d<   |S )Nr   rh   )r   rc  rk   rh   )rc   r   r%  rs   s      rY   rc  zExecuteSuccessEvent._to_dictM  s>    GGW--  DIAfIrX   r   c                "    d | _         d | _        d S ra   )r  rh   ri   s    rY   rf  z$ExecuteSuccessEvent._after_from_dictT  s    
			rX   Nr  )r   r   r\   r    r   rk   c          
     0    t          | ||dd|d|          S )r  g        r  N)r\   r   r  rP   rQ   r   rh   r   )r  )r\   r  r   r   r   s        rY   r  zExecuteSuccessEvent.dummyX  s3     ##	
 	
 	
 		
rX   rk  r   r   ra   )r\   r    r  r   r   r   r   r   r   rk   re   r  )rS   rT   rU   rV   r2  r   r[  rc  rf  r9  r  r   r   s   @rY   r  r  =  s         KKKMMMLLLKKKKKKo&&I    57             
 
 
 
 
 
 \
 
 
 
 
rX   r  c                      e Zd ZU ded<   ded<   ded<   ded<   ded	<   d
ed<   d
ed<    ee          ZddZeddddd            Ze	dddd            Z
dS )ExecuteFailureEventr   r   r   rP   rQ   r-   r   r   r   rk   r   r   re   r   c                T    t          t                                | _        d | _        d S ra   r  ri   s    rY   rf  z$ExecuteFailureEvent._after_from_dict{  r  rX   N)rP   rQ   
err_or_msgBaseException | ErrorMessager\   r    r   c                   t          |t                    r|}nt          |          } | |||||d         |d         |d         |d         |	  	        S )Nr   r   r   r   	r\   r   rP   rQ   r   r   r   r   r   )r   r   r(   )r   r  r\   r   rP   rQ   r   r   s           rY   r  z"ExecuteFailureEvent.from_exception  st     j$'' 	,CC
++Cs+&+&/0/0#

 

 

 
	
rX   r  )r   c               L    t          | |ddt          d          ddd|	  	        S )r  Nr   r  )r  r-   )r\   r   r   s      rY   r  zExecuteFailureEvent.dummy  s<     #oo#

 

 

 
	
rX   r   )r  r  r\   r    r   r   rP   r   rQ   r   r   rk   re   r  )r\   r    r   r   r   rk   re   r  )rS   rT   rU   rV   r2  r   rf  r   r  r9  r  rW   rX   rY   r  r  p  s         KKKo&&I     #!
 
 
 
 
 [
6  
 
 
 
 
 \
 
 
rX   r  c                  *    e Zd ZdZed	d            ZdS )
RescheduleEventrW   r\   r    r   rk   re   c               $    t          | |          S )zqBuild an event. This method exists for compatibility with the other
        ExecuteDoneEvent subclasses.
        r\   r   )r  r  s     rY   r  zRescheduleEvent.dummy  s    
 3K@@@@rX   N)r\   r    r   rk   re   r  )rS   rT   rU   r   r9  r  rW   rX   rY   r  r    s=        IA A A \A A ArX   r  c                      e Zd ZU dZded<   dS )CancelComputeEventr  r    r\   Nr  rW   rX   rY   r  r    r  rX   r  c                      e Zd ZdZdS )FindMissingEventrW   Nrn  rW   rX   rY   r  r    ro  rX   r  c                  "    e Zd ZU dZdZded<   dS )RefreshWhoHasEventzScheduler -> Worker message containing updated who_has information.

    See also
    --------
    RequestRefreshWhoHasMsg
    )r   r  r   Nrv  rW   rX   rY   r  r    s/           I''''''rX   r  c                  (    e Zd ZU dZded<   ded<   dS )AcquireReplicasEvent)r   r   r  r   r  r   Nr  rW   rX   rY   r  r    s/         %I''''rX   r  c                      e Zd ZU dZded<   dS )RemoveReplicasEventrG  rI  rH  Nr  rW   rX   rY   r  r    #         IrX   r  c                      e Zd ZU dZded<   dS )FreeKeysEventrG  rI  rH  Nr  rW   rX   rY   r  r    r  rX   r  c                  "    e Zd ZU dZdZded<   dS )StealRequestEventzEvent that requests a worker to release a key because it's now being computed
    somewhere else.

    See also
    --------
    StealResponseMsg
    r  r    r\   Nrv  rW   rX   rY   r  r    s*           IHHHHHrX   r  c                  &    e Zd ZU dZded<   d
dZd	S )UpdateDataEventry  r{  rz  rQ  rO   re   rP  c               z    t          |           }||_        t                              | j                  |_        |S ra   )r   rQ  r   fromkeysrz  r  s      rY   r[  zUpdateDataEvent.to_loggable  s.    4jj==++
rX   Nrk  )rS   rT   rU   r   rV   r[  rW   rX   rY   r  r    s;         I     rX   r  c                  (    e Zd ZU dZded<   ded<   dS )SecedeEventrB  r    r\   rO   rC  Nr  rW   rX   rY   r  r    s,         +IHHHrX   r  RecsInstructions
RecsInstrsrS  c            
         i }g }| D ]\\  }}|                                 D ]=\  }}||v r/||         |k    r#t          d|j        d||          d|           |||<   >||z  }]||fS )z|Merge multiple (recommendations, instructions) tuples.
    Collisions in recommendations are only allowed if identical.
    zMismatched recommendations for rm   z vs. )r   r   r\   )rS  recsinstrrecs_iinstr_ir6  r]   s          rY   merge_recs_instructionsr    s     DE   ,,.. 	 	JBTzzd2h&00-YbfYY$r(YYQWYY   DHH;rX   c                  	   e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded <   ded!<   ded"<   ded#<   d$ed%<   ded&<   d'ed(<   d'ed)<   ded*<   ded+<   ded,<   ded-<   d.ed/<   d0ed1<   d2ed3<   ded4<   d5ed6<   ded7<   d8ed9<   ded:<   d;ed<<    ee          Zd=d>d>d>d>d>d?d@dAej        ej        dBd1dIZ	d2dNZ
ed3dO            Zed4dP            Zed3dQ            Zed3dR            Zd5dYZd6d]Zd7d_Zd8d`Zd9dbZd:ddZd;dgZd<diZd=djZd>dlZd?doZd@dpZd@dqZd@drZd@dsZd@dtZd@duZ d@dvZ!d@dwZ"d@dxZ#dAdzZ$d@d{Z%d@d|Z&d@d}Z'd@d~Z(d@dZ)d@dZ*dBdZ+dBdZ,d@dZ-d@dZ.d@dZ/d@dZ0d@dZ1d@dZ2dCdZ3d@dZ4d@dZ5d@dZ6d@dZ7d@dZ8dDdZ9dDdZ:dEdZ;dEdZ<dEdZ=dEdZ>dFdZ?d@dZ@i de3de1de:de3de3de3de3de2de,de.de:de>de0de-de5de"de"i de+de9de;de4de)de$de%de&de+de7de=dede8de+de;de)de4i de'de+dededede6de"de+de!de@de<de de#de(de*de"ZAded<   dGdڄZBdHd݄ZCdIdބZDd7d߄ZEd7dZFdJdZGeHdKd            ZIeIjJ        dLd            ZKeIjJ        dMd            ZLeIjJ        dNd            ZMeIjJ        dOd            ZNeIjJ        dPd            ZOdQdZPeIjJ        dRd            ZQeIjJ        dSd            ZReIjJ        dTd            ZSeIjJ        dUd            ZTeIjJ        dVd            ZUeIjJ        dWd            ZVeIjJ        dXd             ZWeIjJ        dYd            ZXeIjJ        dZd            ZYeIjJ        d[d            ZZeIjJ        d\d            Z[d]dZ\eIjJ        d^d            Z]eIjJ        d_d            Z^eIjJ        d`d            Z_eIjJ        dad            Z`eIjJ        dbd            ZadcdZbdddZcddded#Zdd7d$Zed7d%Zfd7d&Zgd7d'Zhd7d(Zid7d)Zjd7d*Zkd7d+Zld7d,Zmd7d-Znd7d.Zodfd/Zpdfd0Zqd>S (g  WorkerStateay  State machine encapsulating the lifetime of all tasks on a worker.

    Not to be confused with :class:`distributed.scheduler.WorkerState`.

    .. note::
       The data attributes of this class are implementation details and may be
       changed without a deprecation cycle.

    .. warning::
       The attributes of this class are all heavily correlated with each other.
       *Do not* modify them directly, *ever*, as it is extremely easy to obtain a broken
       state this way, which in turn will likely result in cluster-wide deadlocks.

       The state should be exclusively mutated through :meth:`handle_stimulus`.
    rk   addresszdict[Key, TaskState]tasksr  threadszMutableMapping[Key, object]rz  zdict[str, WorkerPlugin]pluginsHeapSet[TaskState]rB   r9   r   nthreadsr   runningr   rF   zdefaultdict[str, set[Key]]has_whatz$defaultdict[str, HeapSet[TaskState]]data_neededfetch_countrO   transfer_message_bytes_limitmissing_dep_flightin_flight_taskszdict[str, set[Key]]in_flight_workerstransfer_incoming_bytestransfer_incoming_count_limittransfer_incoming_count_total*transfer_incoming_bytes_throttle_thresholdr   busy_workers
generationr   total_resourcesavailable_resourcesr;   long_runningexecuted_countr   r{  actorszdeque[tuple]logzdeque[StateMachineEvent]stimulus_logvalidateTaskCountertask_countertransition_counterint | Literal[False]transition_counter_maxtransfer_incoming_bytes_limitzrandom.Randomrngr  Ni'  TF)r  r  rz  r  r  	resourcesr  r  r  r  r  r   "MutableMapping[Key, object] | Noner  dict[str, WorkerPlugin] | Noner	  Mapping[str, float] | Nonec                  || _         |r|| _        ||ni | _        ||ni | _        ||ni | _        |t          |          ni | _        | j                                        | _        || _	        i | _
        d| _        t                      | _        t          t                    | _        t          t!          t"          t$                   t'          j        d                              | _        d| _        i | _        t                      | _        || _        d| _        t7          d          | _        d| _        t                      | _        d| _        t#          t'          j        d                    | _         t#          t'          j        d                    | _!        t                      | _"        t                      | _#        d| _$        d| _%        t                      | _&        || _'        tP          j)        *                    d          }tW          |          | _,        tW          |          | _-        t]                      | _/        d| _0        |	| _1        |
| _2        i | _3        ti          j5        d          | _6        d S )NTr   r  r   g    cAz&distributed.admin.low-level-log-length)maxlen)7r  r  rz  r  r  r   r  r   r  r  r  r  r   rF   r   r  r   r%   r   operator
attrgetterr  r  r  r  r  r  r   r  r  r  r  rB   r9   r;   r  r   r  r  r  r   r   r   r   r  r   r  r  r  r  r  r  randomRandomr  )rc   r  r  rz  r  r  r	  r  r  r  r  r  r  s                rY   rd   zWorkerState.__init__  s"    !  	#"DL !,DD"	")"5ww2")"5ww22;2GtIR#'#7#<#<#>#>  
uu#C((&GI&H,?
,K,KLLL
 
 !#EE-J*-.*:=d))7'($"%%%!4Z!@!@AAA
"x'::'F'FGGG"uuEE,H)!IJJ'''!000'MM"#&<#-J*=##rX   stimsrP  re   r  c                .   g }t                      }|D ]}t          |t                    s.| j                            |                    |                     |                     |          \  }}||z  }||                     ||j                  z  }|S )zProcess one or more external events, transition relevant tasks to new states,
        and return a list of instructions to be executed as a consequence.

        See also
        --------
        BaseWorker.handle_stimulus
        rZ  r   )	r+   r   r  r   appendr[  _handle_event_transitionsr   )rc   r  instructionsrQ  stimr  r  s          rY   handle_stimuluszWorkerState.handle_stimulus2  s     && 	R 	RDd$455 L!(()9)9')9)J)JKKK,,T22KD%E!LD--d@P-QQQLLrX   c                *    t          | j                  S )a  Count of tasks currently executing on this worker and counting towards the
        maximum number of threads.

        It includes cancelled tasks, but does not include long running (a.k.a. seceded)
        tasks.

        See also
        --------
        WorkerState.executing
        WorkerState.executed_count
        WorkerState.nthreads
        WorkerState.all_running_tasks
        )lenr;   ri   s    rY   executing_countzWorkerState.executing_countH  s     4>"""rX   c                     | j         | j        z  S )ah  All tasks that are currently occupying a thread. They may or may not count
        towards the maximum number of threads.

        These are:

        - ts.status in (executing, long-running)
        - ts.status in (cancelled, resumed) and ts.previous in (executing, long-running)

        See also
        --------
        WorkerState.executing_count
        )r;   r  ri   s    rY   all_running_taskszWorkerState.all_running_tasksY  s     ~ 111rX   c                *    t          | j                  S )zNumber of tasks currently being replicated from other workers to this one.

        See also
        --------
        WorkerState.in_flight_tasks
        )r  r  ri   s    rY   in_flight_tasks_countz!WorkerState.in_flight_tasks_countj  s     4'(((rX   c                *    t          | j                  S )zCurrent number of open data transfers from other workers.

        See also
        --------
        WorkerState.in_flight_workers
        )r  r  ri   s    rY   transfer_incoming_countz#WorkerState.transfer_incoming_countt  s     4)***rX   r\   r    r   r  r   r   c               f   	 | j         |         }t                              d||           nC# t          $ r6 t	          |          x| j         |<   }| j                            |           Y nw xY w|j        s|sJ ||_        | j        	                    |d|j
        |t                      f           |S )Nz+Data task %s already known (stimulus_id=%s)zensure-task-exists)r  loggerdebugKeyErrorr   r  new_taskr   r  r  r   r+   )rc   r\   r   r   r6  s        rY   _ensure_task_existszWorkerState._ensure_task_exists  s    	+CBLLFKXXXX 	+ 	+ 	+#,S>>1DJsOb&&r*****	+ { 	#OOO"BK2BHk466RSSS	s   ), =A,+A,r   Mapping[Key, Collection[str]]r   c                   |                                 D ]D\  }}| j                            |          }|s#t          |          }| j        |v rF|                    | j                   |j        dk    r!t                              d| j        |           |j	        |k    r|j	        |z
  D ]M}| j
        |                             |           |j        dk    r | j        |                             |           N||j	        z
  D ]M}| j
        |                             |           |j        dk    r | j        |                             |           N||_	        Fd S )Nr@   zEScheduler claims worker %s holds data for task %s, which is not true.r<   )r   r  r   r   r  remover   r%  r&  r   r  r  r   )rc   r   r\   workersr6  r  s         rY   _update_who_haszWorkerState._update_who_has  sw   #MMOO &	! &	!LC$$B  'llG|w&&t|,,, 8x''LL-	   zW$$*w. 8 8f%,,S1118w&&$V,33B777!BJ. 5 5f%))#...8w&&$V,00444 BJJM&	! &	!rX   r6  c                Z   t                               d|           |j        | j        v r| j        |j        = | j                            |j        d           | j                            |j        d           |j        D ]G}| j        |         	                    |j                   | j
        |         	                    |           H|j                                         |j        D ]6}|j        	                    |           |j        	                    |           7|j                                         d|_        d|_        d|_        d|_        d|_        d|_        d|_        d|_        d|_        | j        	                    |           | j        	                    |           | j        	                    |           | j        	                    |           | j        	                    |           | j        	                    |           | j        	                    |           dS )zEnsure that TaskState attributes are reset to a neutral default and
        Worker-level state associated to the provided key is cleared (e.g.
        who_has)
        This is idempotent
        zPurge task: %sNFr   )r%  r&  r\   rz  r  re  r  r   r  discardr  clearr   r   r   r   r   r   r   r   r   r   r   r  rB   r9   r;   r  r  rF   )rc   r6  r  r%  s       rY   _purge_statezWorkerState._purge_state  s    	%r*** 6TY	"&!%%%&&&j 	1 	1FM&!))"&111V$,,R0000

 	" 	"A''***Ib!!!!
!!###	''+++
2  $$$r"""!!"%%%$$R(((R     rX   c                n    | j         | j        k    }| j        | j        k    }| j        | j        k    }|r|p|S )a  Decides whether the WorkerState should throttle data transfers from other workers.

        Returns
        -------
        * True if the number of incoming data transfers reached its limit
        and the size of incoming data transfers reached the minimum threshold for throttling
        * True if the size of incoming data transfers reached its limit
        * False otherwise
        )r#  r  r  r  r  )rc   reached_count_limitreached_throttle_thresholdreached_bytes_limits       rY   #_should_throttle_incoming_transfersz/WorkerState._should_throttle_incoming_transfers  sX     (D,NN 	 (>? 	#
 (D,NN 	 #A'AXEXXrX   r  c               v   | j         r| j        si g fS |                                 ri g fS i }g }|                                 D ]r\  }}|| j        k    sJ |                     |          \  }}|s	| j        sJ |s n9d |D             }t                              dt          |          |t          |          t          | j                  | j
        | j        t          | j                             | j                            d|||t                      f           |D ].}	| j        r|	j        dk    sJ ||	j        v sJ |	|vsJ d|f||	<   /|                    t'          ||||                     || j        |<   | xj        dz  c_        | xj        |z  c_        |                                 r nt||fS )zTransition tasks from fetch to flight, until there are no more tasks in fetch
        state or a threshold has been reached.
        c                    h | ]	}|j         
S rW   r  r   r6  s     rY   	<setcomp>z4WorkerState._ensure_communicating.<locals>.<setcomp>  s    ???bf???rX   z]Gathering %d tasks from %s; %d more remain. Pending workers: %d; connections: %d/%d; busy: %dzgather-dependenciesr<   r=   )r  r  r  r   r  )r  r  r7  _select_workers_for_gatherr  _select_keys_for_gatherr  r%  r&  r  r#  r  r  r  r  r+   r  r   r   r  r  r  )
rc   r   recommendationsr  r  available_tasksto_gather_tasksmessage_nbytesto_gather_keysr6  s
             rY   _ensure_communicatingz!WorkerState._ensure_communicating  sU    | 	4#3 	r6M3355 	r6M "%''+'F'F'H'H 5	 5	#FOT\)))).2.J.J/ /+O^ #Bd&BBBB" ?????NLLDO$$O$$D$%%,2D%&&
 
 
 HOO&TVVT   & 9 9= 58w....!RZ////_4444'/&8## !,!/ +	     .<D"6*..!3..((N:((7799  ,,rX   (Iterator[tuple[str, HeapSet[TaskState]]]c           	   #    K   t          | j                  }g }t          | j                                                  D ]\  }}|s	| j        |= || j        v s	|| j        v r#|                    |                                j	        t          |          |k    t          |           | j                                        ||f           t          j        |           |rt          j        |          \  }}}}}}|s	| j        |= ne| t          |          k    rAt          j        ||                                j	        |t          |           |||f           n||fV  |s| j        |= |dS dS )a}  Helper of _ensure_communicating.

        Yield the peer workers and tasks in data_needed, sorted by:

        1. By highest-priority task available across all workers
        2. If tied, first by local peer workers, then remote. Note that, if a task is
           replicated across multiple host, it may go in a tie with itself.
        3. If still tied, by number of tasks available to be fetched from the host
           (see note below)
        4. If still tied, by a random element. This is statically seeded to guarantee
           reproducibility.

           FIXME https://github.com/dask/distributed/issues/6620
                 You won't get determinism when a single task is replicated on multiple
                 workers, because TaskState.who_has changes order at every interpreter
                 restart.

        Omit workers that are either busy or in flight.
        Remove peer workers with no tasks from data_needed.

        Note
        ----
        Instead of number of tasks, we could've measured total nbytes and/or number of
        tasks that only exist on the worker. Raw number of tasks is cruder but simpler.
        N)r&   r  r   r  r   r  r  r  peekr   r  r  r  heapqheapifyheappopheappush)	rc   hostheapr  r  r]  	is_remote
ntasks_negrnds	            rY   r<  z&WorkerState._select_workers_for_gather@  s     4  --!$"2"8"8":":;; 	 	MFE $V,///6T=N3N3NKKJJLL)$V,,4ZZKHOO%%	 	 	 	 	d 	1;@=;N;N8Ay*c65  
1$V,,E

**ZZ\\*IE

{CQVW   
 em### 1(0  	1 	1 	1 	1 	1rX   	availabletuple[list[TaskState], int]c                   g }d}|r|                                 }|                     ||          rnX|j        D ]"}| j        |                             |           #|                    |           ||                                z  }|||fS )zHelper of _ensure_communicating.

        Fetch all tasks that are replicated on the target worker within a single
        message, up to transfer_message_bytes_limit or until we reach the limit
        for the size of incoming data transfers.
        r   )rF  _task_exceeds_transfer_limitsr   r  r,  r  r   )rc   rP  r  rA  r6  r  s         rY   r=  z#WorkerState._select_keys_for_gather  s     &(	 	.!!B11"nEE * 4 4 (//3333R   bmmoo-N  	. .((rX   rA  c                    | j         dk    r|dk    rdS | j        | j         z
  }|dk    rt          || j                  |z
  }|                                |k    S )a  Would asking to gather this task exceed transfer limits?

        Parameters
        ----------
        ts
            Candidate task for gathering
        message_nbytes
            Total number of bytes already scheduled for gathering in this message
        Returns
        -------
        exceeds_limit
            True if gathering the task would exceed limits, False otherwise
            (in which case the task can be gathered).
        r   F)r  r  minr  r   )rc   r6  rA  incoming_bytes_allowances       rY   rS  z)WorkerState._task_exceeds_transfer_limits  s     '1,,11D1D 5 .1MM 	! Q,5  !	! % }}!999rX   c                h   | j         si g fS i }t          | j                  | j        k     r|                                 }|snn| j        r|j        t          v sJ ||vsJ d||<   |                     |           | j        	                    |           t          | j                  | j        k     |g fS )Nr;   )
r  r  r;   r  _next_ready_taskr  r   rI   _acquire_resourcesr   )rc   r  r6  s      rY   _ensure_computingzWorkerState._ensure_computing  s    | 	r6M$.!!DM11&&((B } &x5((((~~~~"DH##B'''Nr""" $.!!DM11 RxrX   TaskState | Nonec                4   | j         r| j        r| j                                         }| j                                        }|j        sJ |j        sJ |j        |j        k     r.|                     |          r| j                                        S | j                                         S | j         r| j                                         S | j        rG| j                                        }|                     |          r| j                                        S dS )z=Pop the top-priority task from self.ready or self.constrainedN)rB   r9   rF  r    _resource_restrictions_satisfiedre  )rc   tsrtscs      rY   rX  zWorkerState._next_ready_task  s   : 	.$* 	.*//##C"''))C<<|cl**t/T/T0 0* '++---z~~'''Z 	.:>>### 	."''))C44S99 .'++---trX   r   r)  c                   | j         r=|j        dk    sJ |j        | j        v s|j        | j        v sJ |j        J |j        J 	 t          j        |j                  }n6# t          $ r) t          j        t          |j                            }Y nw xY wt          |j        ||j        |t          |j                  |j        | j                            |j                  |j        |	  	        S )Nr@   )	r\   r   r   rh   r#   r   r,  r   r   )r  r   r\   rz  r  rh   r   r,   dumpsr  r#   r)  r   r  r   r   )rc   r6  r   r   type_serializeds        rY   _get_task_finished_msgz"WorkerState._get_task_finished_msg  s
    = 	)8x''''6TY&&"&DK*?*?*?*?7&&&9(((	>$l2733OO 	> 	> 	> %l8BG+<+<==OOO	>
 9 bg&&[<##BF++}#

 

 

 
	
s   A   0BBc                    |j         s|dig fS d|_        d|_        | xj        dz  c_        |j        sJ |j         D ]"}| j        |                             |           #i g fS )NrA   r<   Fr  )r   r   r   r  r   r  r   )rc   r6  r   ws       rY   _transition_generic_fetchz%WorkerState._transition_generic_fetch  s    z 	'	?B&&A{ 	( 	(AQ##B''''2vrX   c                   | j                             |           |                     |           |                     ||          S Nr   )r  r0  r2  _transition_released_waitingrc   r6  r   s      rY   _transition_missing_waitingz'WorkerState._transition_missing_waiting  sH     	''+++"000MMMrX   c                   | j         r|j        dk    sJ |j        si g fS | j                            |           |                     ||          S )NrA   r   )r  r   r   r  r0  rf  rj  s      rY   _transition_missing_fetchz%WorkerState._transition_missing_fetch   sd     = 	)8y((((z 	r6M''+++--bk-JJJrX   c                   | j                             |           |                     ||          \  }}|j        | j        v sJ ||fS rh  )r  r0  _transition_generic_releasedr\   r  rc   r6  r   r  r  s        rY   _transition_missing_releasedz(WorkerState._transition_missing_released,  sa     	''+++!>>K ? 
 
l v####\!!rX   c               B    |j         sJ |                     ||          S rh  )r   _transition_generic_missingrj  s      rY   _transition_flight_missingz&WorkerState._transition_flight_missing6  s(     w///LLLrX   c               z    | j         r	|j        rJ d|_        | j                            |           d|_        i g fS )NrA   F)r  r   r   r  r   r   rj  s      rY   rs  z'WorkerState._transition_generic_missing<  sJ     = 	"z!!!##B'''2vrX   c               X    | j         r|j        dk    sJ |                     ||          S )NrC   r   )r  r   rf  rj  s      rY   _transition_released_fetchz&WorkerState._transition_released_fetchG  s:     = 	*8z))))--bk-JJJrX   c                   |                      |           i }|j        D ](}|j        s|j        t          t
          z  dhz  vrd||<   )d|_        |j        sd||<   |g fS )Nr@   rC   r>   )r2  r   r   r   rI   rH   r   )rc   r6  r   r  
dependencys        rY   ro  z(WorkerState._transition_generic_releasedN  s     	"/ 	. 	.J&.$EJ,>(,KKK#-Z } 	#"DHRxrX   c                    j         r"t           fd|j        D                       sJ i }|j                                         |j        D ]F}|j        dk    r9|j                            |           |j                            |           d||<   G|j        sd||<   d|_         j                            |           |g fS )Nc              3  4   K   | ]}|j         j        v V  d S ra   )r\   r  )r   r%  rc   s     rY   r	  z;WorkerState._transition_released_waiting.<locals>.<genexpr>d  s,      DDqqu
*DDDDDDrX   r@   r<   rB   rF   )	r  r  r   r   r1  r   r   r   rF   )rc   r6  r   r>  dep_tss   `    rY   ri  z(WorkerState._transition_released_waiting`  s     = 	EDDDDBODDDDDDDD "
!!###o 	2 	2F|x''#''///""2&&&*1'" 	*")OB""rX   r  c                   | j         r1|j        dk    sJ |j        sJ |j        D ]}|| j        |         vsJ d|_        d|_        ||_        | j                            |           | xj        dz  c_        i g fS )Nr<   Fr=   r  )	r  r   r   r  r   r   r  r   r  )rc   r6  r  r   re  s        rY   _transition_fetch_flightz$WorkerState._transition_fetch_flightu  s     = 	58w&&&&:Z 5 5!1!!444444  $$$A2vrX   c               P    | xj         dz  c_         |                     ||          S Nr  r   )r  rs  rj  s      rY   _transition_fetch_missingz%WorkerState._transition_fetch_missing  s1     	A///LLLrX   c               P    | xj         dz  c_         |                     ||          S r  )r  ro  rj  s      rY   _transition_fetch_releasedz&WorkerState._transition_fetch_released  s1     	A000MMMrX   c                   |j         J | xj         |j         z  c_         |                     ||          \  }}|                    t          |j        |                     ||fS )Nr   r  )r   ro  r  r;  r\   rp  s        rY   _transition_memory_releasedz'WorkerState._transition_memory_released  sy     y$$$ry !>>K ? 
 
l 	0RVUUUVVV\!!rX   c                    j         rn|j        dk    sJ |j        rJ t           fd|j        D                       sJ t          d |j        D                       sJ | j        vsJ | j        vsJ d|_         j                            |            j        	                    |            
                                S )NrF   c              3  P   K   | ] }|j         j        v p|j         j        v V  !d S ra   r\   rz  r  r   deprc   s     rY   r	  z>WorkerState._transition_waiting_constrained.<locals>.<genexpr>  N         49$>4;(>     rX   c              3  ,   K   | ]}|j         d k    V  dS )r@   Nr   )r   r  s     rY   r	  z>WorkerState._transition_waiting_constrained.<locals>.<genexpr>  s)      HHsyH,HHHHHHrX   r9   )r  r   r   r  r   rB   r9   rF   r,  r   rZ  rj  s   `  rY   _transition_waiting_constrainedz+WorkerState._transition_waiting_constrained  s	    = 		.8y((((****    ?        HHHHHHHHHHTZ''''T----- BR   %%'''rX   c                   |j         sJ t          i t          |j        |          gf|                     ||                    S )zNote: this transition is triggered exclusively by a task raising the
        Reschedule() Exception; it is not involved in work stealing.
        r  r   )r   r  r>  r\   ro  rj  s      rY   !_transition_executing_rescheduledz-WorkerState._transition_executing_rescheduled  sS     w&-BFDDDEF --bk-JJ
 
 	
rX   c                  | j         ra|j        dk    sJ || j        vsJ || j        vsJ |j        rJ |j        D ]-}|j        | j        v s|j        | j        v sJ |j        dk    sJ .|j	        r|dig fS d|_        |j
        J | j                            |           | j                            |           |                                 S )NrF   r@   r9   rB   )r  r   rB   r9   r   r   r\   rz  r  r   r   rF   r,  r   rZ  )rc   r6  r   r  s       rY   _transition_waiting_readyz%WorkerState._transition_waiting_ready  s    = 	-8y((((TZ''''T-----**** - -w$)++sw$+/E/E/E/EyH,,,,,# 	+&**{&&&B
r%%'''rX   r   r-   r   r   r   r   c                   ||_         ||_        ||_        ||_        d|_        t
                              |||| j                            |j	                            }i |gfS )Nr:   )r   r   r,  )
r   r   r   r   r   r4  r8  r  r   r\   )	rc   r6  r   r   r   r   r   r   smsgs	            rY   _transition_generic_errorz%WorkerState._transition_generic_error  so     ! **%%#<##BF++	 & 
 
 D6zrX   c                   |j         sJ |j        dv r|j        dk    sJ |di}n|j        dk    sJ |j        dk    sJ |di}d|_        d|_         d|_        d|_        |g fS )zIn case of failure of the previous state, discard the error and kick off the
        next state without informing the scheduler
        r;   r?   r<   r=   rF   rC   FNr   r   r   r   )	rc   r6  r   r   r   r   r   r   r  s	            rY   _transition_resumed_errorz%WorkerState._transition_resumed_error  s     w;7777g%%%%gDD;(****7i''''	?DRxrX   c                   |j         sJ |j        dv sJ |j        dk    sJ d|_        d|_         d|_        d|_        |dig fS )a  If the task raises the Reschedule() exception, but the scheduler already told
        the worker to fetch it somewhere else, silently transition to fetch.

        Note that this transition effectively duplicates the logic of
        _transition_resumed_error.
        r  r<   rC   FNr  rj  s      rY   _transition_resumed_rescheduledz+WorkerState._transition_resumed_rescheduled
  sa     w{;;;;;w'!!!!G}b  rX   c                  |j         dk    rS| j        r|j        dk    sJ |j        r"d|_        d|_        d|_         d|_        |dig fS d|_        d|_         d|_        n(| j        r!|j         dv sJ |j        dk    sJ |j        rJ i g fS )z
        See also
        --------
        _transition_cancelled_fetch
        _transition_cancelled_waiting
        _transition_flight_fetch
        r=   rF   rC   FNr  r<   )r   r  r   r   r   rj  s      rY   _transition_resumed_fetchz%WorkerState._transition_resumed_fetch  s     ;(""} ,w)++++w  &"I** $"] 	;"?????7g%%%%w2vrX   c                   |dig fS )Nr<   rW   rj  s      rY   _transition_resumed_missingz'WorkerState._transition_resumed_missingC  s     G}b  rX   c               8    |j         rJ d|_        d |_        i g fS )Nr8   )r   r   r   rj  s      rY   _transition_resumed_releasedz(WorkerState._transition_resumed_releasedH  s)    
 72vrX   c                   |j         dk    r|j        r|dig fS d|_        d|_         i g fS |j         dv sJ |j        rJ d|_        d|_        i g fS )zs
        See also
        --------
        _transition_cancelled_waiting
        _transition_resumed_fetch
        r=   rC   Nr  rE   r<   )r   r   r   r   rj  s      rY   _transition_cancelled_fetchz'WorkerState._transition_cancelled_fetchR  s     ;(""w  J'++ $"2v;"?????w BHBGr6MrX   c                   |j         rJ |j        dk    rd|_        d|_        i g fS |j        dk    r*d|_        d|_        t          |j        d|          }i |gfS |j        dk    sJ d|_        d|_        i g fS )z
        See also
        --------
        _transition_cancelled_fetch
        _transition_cancelled_or_resumed_long_running
        _transition_resumed_fetch
        r;   Nr?   r\   rC  r   r=   rE   rF   )r   r   r   rA  r\   r   )rc   r6  r   r  s       rY   _transition_cancelled_waitingz)WorkerState._transition_cancelled_waitingm  s     7;+%%"BHBKr6M[N**
 &BHBK!FT{  D v:;(**** BHBGr6MrX   rS  r   c               b    |j         si g fS d |_        d|_         |                     ||          S )NFr   )r   r   ro  )rc   r6  r   rS  s       rY   _transition_cancelled_releasedz*WorkerState._transition_cancelled_released  s>     w 	r6M000MMMrX   c                   | j         r|j        dv sJ |j        rJ |j        rJ t	          t
          d         |j                  |_        d|_        i g fS )zWe can't stop executing a task just because the scheduler asked us to,
        so we're entering cancelled state and waiting until it completes.
        r  r8   )r  r   r   r   r   r   r   rj  s      rY   _transition_executing_releasedz*WorkerState._transition_executing_released  sg     = 	8<<<<<ww7#>?JJ2vrX   c                  | j         rd|j        dk    sJ |j        rJ |j        | j        vsJ || j        vsJ || j        vsJ |j        D ] }|j        | j        v s|j        | j        v sJ !d|_        t          |j        |          }i |gfS )Nr9   r;   r  )
r  r   r   r\   rz  rB   r9   r   r  r  )rc   r6  r   r  r  s        rY   !_transition_constrained_executingz-WorkerState._transition_constrained_executing  s     = 	F8},,,,****6****TZ''''T----- F Fw$)++sw$+/E/E/E/EBF<<<E7{rX   c                    j         r^|j        dk    sJ |j        rJ |j         j        vsJ | j        vsJ | j        vsJ t           fd|j        D                       sJ d|_        t          |j        |          }i |gfS )NrB   c              3  P   K   | ] }|j         j        v p|j         j        v V  !d S ra   r  r  s     rY   r	  z:WorkerState._transition_ready_executing.<locals>.<genexpr>  r  rX   r;   r  )
r  r   r   r\   rz  rB   r9   r  r   r  )rc   r6  r   r  s   `   rY   _transition_ready_executingz'WorkerState._transition_ready_executing  s     = 		8w&&&&****6****TZ''''T-----    ?       
 BF<<<E7{rX   c               F    |j         si g fS |                     ||          S rh  )r   rf  rj  s      rY   _transition_flight_fetchz$WorkerState._transition_flight_fetch  s0     w 	r6M--bk-JJJrX   c               F    |j         rJ d|_        d |_        d|_        i g fS )Nr=   r8   r  rj  s      rY   _transition_flight_releasedz'WorkerState._transition_flight_released  s0     72vrX   rC  c                   d|_         | j                            |           | j                            |           t          |j        ||          }t          i |gf|                                           S )za
        See also
        --------
        _transition_cancelled_or_resumed_long_running
        r?   r  )	r   r;   r0  r  r   rA  r\   r  rZ  )rc   r6  rC  r   r  s        rY   "_transition_executing_long_runningz.WorkerState._transition_executing_long_running  s     "r"""b!!!)9{
 
 
 '$L""$$
 
 	
rX   c                   |j         dv sJ d|_         | j                            |           | j                            |           |                                 S )a  Handles transitions:

        - cancelled(executing) -> long-running
        - cancelled(long-running) -> long-running (user called secede() twice)
        - resumed(executing->fetch) -> long-running
        - resumed(long-running->fetch) -> long-running (user called secede() twice)

        Unlike in the executing->long_running transition, do not send LongRunningMsg.
        From the scheduler's perspective, this task no longer exists (cancelled) or is
        in memory on another worker (resumed). So it shouldn't hear about it.
        Instead, we're going to send the LongRunningMsg when and if the task
        transitions back to waiting.

        See also
        --------
        _transition_executing_long_running
        _transition_cancelled_waiting
        r  r?   )r   r;   r0  r  r   rZ  )rc   r6  rC  r   s       rY   -_transition_cancelled_or_resumed_long_runningz9WorkerState._transition_cancelled_or_resumed_long_running  s^    * {;;;;;$r"""b!!!%%'''rX   r  r   c               6    |                      ||d||          S )zThis transition is *normally* triggered by ExecuteSuccessEvent.
        However, beware that it can also be triggered by scatter().
        r*  r   r   )_transition_to_memoryrc   r6  r  r   r   s        rY   _transition_executing_memoryz(WorkerState._transition_executing_memory	  s,     ))v; * 
 
 	
rX   c               @    |                      ||dt          |          S )zThis transition is triggered by scatter().
        This transition does not send any message back to the scheduler, because the
        scheduler doesn't know this key exists yet.
        Fr  r  r   r  s        rY   _transition_released_memoryz'WorkerState._transition_released_memory	  s,     ))u_+ * 
 
 	
rX   c               @    |                      ||dt          |          S )zThis transition is *normally* triggered by GatherDepSuccessEvent.
        However, beware that it can also be triggered by scatter().
        rF  r  r  r  s        rY   _transition_flight_memoryz%WorkerState._transition_flight_memory'	  s,     ))z/{ * 
 
 	
rX   c                   |j         dv r|j        dk    sJ d}n|j         dk    sJ |j        dk    sJ d}d|_         d|_        |                     |||||          S )	a(  Normally, we send to the scheduler a 'task-finished' message for a completed
        execution and 'add-data' for a completed replication from another worker. The
        scheduler's reaction to the two messages is fundamentally different; namely,
        add-data is only admissible for tasks that are already in memory on another
        worker, and won't trigger transitions.

        In the case of resumed tasks, the scheduler's expectation is set by ts.next -
        which means, the opposite of what the worker actually just completed.
        r  r<   rF  r=   rF   r*  Nr  )r   r   r  )rc   r6  r  r   r   msg_types         rY   _transition_resumed_memoryz&WorkerState._transition_resumed_memory1	  s     ;7777g%%%%!HH;(****7i''''&H))xK * 
 
 	
rX   r  +Literal[False, 'add-keys', 'task-finished']c               2   i }g }| j         r|j        | j        vsJ |j        dk    sJ |j        | j        v r|| j        |j        <   nt                      }	 || j        |j        <   nO# t          $ rB}	t          |	          }
|t          |
	                                          |fz   ig fcY d}	~	S d}	~	ww xY wt                      }||z
  dk    r|j
                            d||d           d|_        |j        t          |          |_        | xj        |j        z  c_        t          |          |_        |j        D ]3}|j                            |           |j        s|j        dk    rd||<   4| j                            |j        d|t                      f           |d	k    r+|                    t'          |j        g|
                     nE|dk    r9|t(          k    sJ |                    |                     |||                     n|du sJ ||fS )a4  Insert a task's output in self.data and set the state to memory.
        This method is the one and only place where keys are inserted in self.data.

        There are three ways to get here:
        1. task execution just terminated successfully. Initial state is one of
           - executing
           - long-running
           - resumed(prev=executing next=fetch)
           - resumed(prev=long-running next=fetch)
        2. transfer from another worker terminated successfully. Initial state is
           - flight
           - resumed(prev=flight next=waiting)
        3. scatter. In this case *normally* the task is in released state, but nothing
           stops a client to scatter a key while is in any other state; these race
           conditions are not well tested and are expected to misbehave.
        r@   Ng{Gzt?z
disk-writerN   rP   rQ   rF   rB   zput-in-memoryrF  rH  r   r*  r  F)r  r\   rz  r   r  r+   r  r(   r2  valuesr   r  r   sizeofrh   r   r   r0  r  rE  r   rc  )rc   r6  r  r  r   r   r>  r  rP   er   rQ   r  s                rY   r  z!WorkerState._transition_to_memoryL	  s   2 !#%'= 	(6****8x''''6T[  "'DKFFE
A$)	"&!! A A A $A&&E#**,,//6);<b@@@@@@@A 66De|e## $$+eTJJ   9uBIry u++= 	/ 	/C ((,,,' /CI,B,B'.$+tvvFGGG
 z!!
k R R RSSSS((_,,,,++! + ,      u$$$$,,s   A& &
B207B-'B2-B2c               J   i }| j         r t          d |j        D                       rJ |j        D ]3}|j                            |           |j        dk    r|j        sd||<   4|                     |           d|_        | j                            |j	        d            |g fS )Nc              3  ,   K   | ]}|j         d k    V  dS )r>   Nr  )r   r%  s     rY   r	  z=WorkerState._transition_released_forgotten.<locals>.<genexpr>	  s)      IIa17k1IIIIIIrX   rC   r>   )
r  anyr   r   r0  r   r2  r  re  r\   )rc   r6  r   r>  r  s        rY   _transition_released_forgottenz*WorkerState._transition_released_forgotten	  s     !#= 	JII2=IIIIIIII? 	3 	3CN""2&&&yJ&&s~&'2$"
rvt$$$""rX   )r8   r:   )r8   r<   )r8   r?   )r8   r@   )r8   rA   )r8   rC   )r8   rD   )r8   rF   )rE   r:   )rE   r<   )rE   r?   )rE   r@   )rE   rC   )rE   rD   )r9   r;   )r9   rC   )r:   rC   )r;   r:   r  )r;   r@   )r;   rC   )r;   rD   )r<   r=   )r<   rA   )r<   rC   )r=   r:   )r=   r<   )r=   r@   )r=   rA   )r=   rC   )r?   r:   )r?   r@   )r?   rD   )r?   rC   )r@   rC   )rA   r:   )rA   r<   )rA   rC   )rA   rF   )rB   r;   )rB   rC   )rC   r:   )rC   r<   )rC   r>   )rC   r@   )rC   rA   )rC   rF   )rF   r9   )rF   rB   )rF   rC   zSClassVar[Mapping[tuple[TaskStateState, TaskStateState], Callable[..., RecsInstrs]]]_TRANSITIONS_TABLEmethod_namer   c                    | j                                         D ]Z\  }}t          ||          rE	  t          ||          |i | -# t          $ r  t
                              d|d           Y Vw xY w[d S )Nz!Plugin '%s' failed with exceptionT)exc_info)r  r   hasattrr  r  r%  rb  )rc   r  rS  r   nameplugins         rY   _notify_pluginszWorkerState._notify_plugins	  s     L..00 	 	LD&v{++ 0GFK00$A&AAAA    KK;TD       	 	s   A'A10A1r]   TaskStateState | tuplec          
        t          |t                    r)|rJ |dd         }t          t          |d                   }|j        |k    ri g fS |j        }| j                            ||f          }| xj        dz  c_        | j        r:| j        | j        k    r*t          |j
        |||                     |                    |/ || |g|R d|i\  }}|                     d|j
        ||           n1d||fvr 	 |                     |d|          \  }}|                    |d          x}	rbt          |	t                    r|	^}
}n|	d}}
|
d	k    r=t          ||f | j        ||
g|R d|i          \  }}|                    |d          x}	bt          ||f | j        ||g|R d|i          \  }}nn# t           t"          f$ r0}t!          |j
        |||                     |                    |d}~ww xY wt!          |j
        |||                     |                    | j                            |j
        |||j        d
 |                                D             |t+                      f           ||fS )zTransition a key from its current state to the finish state

        See Also
        --------
        Worker.transitions: wrapper around this method
        r  Nr   r   
transitionrC   r   rW   r>   c                ^    i | ]*\  }}|j         t          |t                    r|d          n|+S r  )r\   r   r2  )r   r6  news      rY   r   z+WorkerState._transition.<locals>.<dictcomp>T
  sG       C Fje&<&<ECFF#  rX   )r   r2  r   rG   r   r  r   r  r  r   r\   r^   r  _transitionre  r  r[   r   r  r  r   r+   )rc   r6  r]   r   rS  rP   funcr  r  r   v_statev_argsr  s                rY   r  zWorkerState._transition
  s    fe$$ 	5OOO!"":D.&)44F8vr6M&**E6?;; 	1$'	V'4+FFF.rvufdjjQSnnUUU!%dB!O!O!O!O;!O!OD,  rvufEEEEv..V%)%5%5
 &6 & &"l
  88B---a !!U++ 0+,(&&*+R+-- !)@|,((WWvWWW;WW* *&D,  88B---a  &=<($D$RP$PPPKPP& &"ll &'>? V V V'vtzz"~~NNTUUV $BFE64::b>>JJJ  #'::<<   	
 	
 	
& \!!s   ;B:F6 6G7+G22G7c                h     t           fd|j                                        D                       S )Nc              3  @   K   | ]\  }}j         |         |k    V  d S ra   )r  )r   resourceneededrc   s      rY   r	  z?WorkerState._resource_restrictions_satisfied.<locals>.<genexpr>_
  sH       
 
 & $X.&8
 
 
 
 
 
rX   )r  r   r   rc   r6  s   ` rY   r]  z,WorkerState._resource_restrictions_satisfied^
  sJ     
 
 
 
$&$<$B$B$D$D
 
 
 
 
 	
rX   c                n    |j                                         D ]\  }}| j        |xx         |z  cc<   d S ra   r   r   r  rc   r6  r  r  s       rY   rY  zWorkerState._acquire_resourcesd
  Q     " 8 > > @ @ 	9 	9Hf$X...&8....	9 	9rX   c                n    |j                                         D ]\  }}| j        |xx         |z  cc<   d S ra   r  r  s       rY   _release_resourceszWorkerState._release_resourcesh
  r  rX   r>  r  c               J   	 g t                      	i d 	fd} ||                                                                          \  }}|z   ||            j                                        j        r	D ]}                     |           S )zProcess transitions until none are left

        This includes feedback from previous transitions and continues until we
        reach a steady state
        r  r  re   r   c                ,   | r|                                  \  }}	                    |                               ||j                                       ||          \  }}|                     |                               |           | d S d S rh  )popitemr   
setdefaultr   r  updateextend)
r  r6  r]   a_recsa_instructionsinitial_statesr  rc   r   r  s
        rY   process_recsz.WorkerState._transitions.<locals>.process_recsv
  s     4!\\^^
F		"))"bh777)-)9)9K *: * *& F#####N333  4 4 4 4 4rX   r   )r  r  re   r   )r   r   rC  r  transitionsr  validate_task)
rc   r>  r   r  r  r  r6  r  r  r  s
   ` `    @@@rY   r  zWorkerState._transitionsl
  s     :<		4 		4 		4 		4 		4 		4 		4 		4 		4 		4 	_))++,,, "&!;!;!;!T!T&V%%n555= 	' ' '""2&&&&rX   evc                     t          |          ra   )	TypeErrorrc   r  s     rY   r  zWorkerState._handle_event
  s    mmrX   r  c           	     t   i }|j                                         D ]\  }}	 | j        |         }nC# t          $ r6 t	          |          x| j        |<   }| j                            |           Y nw xY wd|t          f||<   | j        	                    |d|j
        |j        t                      f           |g fS )Nr@   zreceive-from-scatter)rz  r   r  r'  r   r  r(  r   r  r  r   r   r+   )rc   r  r>  r\   r  r6  s         rY   _handle_update_datazWorkerState._handle_update_data
  s     "'--// 
	 
	JC/Z_ / / /'0~~5
3"!**2...../ $,UO"DOBHOO,bhO    ""s   0=A0/A0r  c                    | j                             d|j        |j        t	                      f           i }|j        D ]#}| j                            |          }|rd||<   $|g fS )a  Handler to be called by the scheduler.

        The given keys are no longer referred to and required by the scheduler.
        The worker is now allowed to release the key, if applicable.

        This does not guarantee that the memory is released since the worker may
        still decide to hold on to the data and task since it is required by an
        upstream dependency.
        z	free-keysrC   )r  r  rH  r   r+   r  r   )rc   r  r>  r\   r6  s        rY   _handle_free_keyszWorkerState._handle_free_keys
  sp     	bgr~tvvFGGG "7 	1 	1C$$B 1&0#""rX   r  c                ^   i }g }|j         D ]}| j                            |          }||j        dk    r*|j        D ]3}|j        dv r(t          d|j        d|j        d|j        d          4| j                            |j        d|j	        t                      f           d	||<   ||fS )
a  Stream handler notifying the worker that it might be holding unreferenced,
        superfluous data.

        This should not actually happen during ordinary operations and is only intended
        to correct any erroneous state. An example where this is necessary is if a
        worker fetches data for a downstream task but that task is released before the
        data arrives. In this case, the scheduler will notify the worker that it may be
        holding this unnecessary data, if the worker hasn't released the data itself,
        already.

        This handler only releases tasks that are indeed in state memory.

        For stronger guarantees, see handler free_keys
        Nr@   r  zCannot remove replica of z while z
 in state .zremove-replicarC   )rH  r  r   r   r   RuntimeErrorr\   r  r  r   r+   )rc   r  r>  r  r\   r6  r  s          rY   _handle_remove_replicasz#WorkerState._handle_remove_replicas
  s      !#%'7 	- 	-C$$BzRX11 }  9 ===&hBFhhSWhhZ]Zchhh   > HOORV%5r~tvvNOOO",OB,,rX   r  c                   | j         r^|j                                        |j                                        k    sJ t	          |j                                                  sJ i }|j                                        D ]9\  }}|                     |d|j                  }|j	        dk    r||_        d||<   :| 
                    |j                   |g fS )Nr  r\   r   r   r@   r<   )r  r   rH  r   r  r  r   r)  r   r   r.  )rc   r  r>  r\   r   r6  s         rY   _handle_acquire_replicasz$WorkerState._handle_acquire_replicas
  s    = 	,:??$$	(8(88888rz((**+++++ "9??,, 	. 	.KC))
 N *  B x8##"	&-#RZ(((""rX   r  c           
     r   	 | j         |j                 }t                              d||j        d           nM# t
          $ r@ t          |j                  x| j         |j        <   }| j                            |           Y nw xY w| j	        
                    |j        d|j        |j        t                      f           |j        |_        i }g }|j        t          h dz  v rnQ|j        dk    r7|
                    |                     ||j        |j                             n|j        dv rd||<   |j        |_        |j        | j        fz   }| xj        d	z  c_        |j        rd | j        |j        <   d |_        d |_        d
|_        d
|_        ||_        |j        |_        |j        |_        |j        |_        |j        dv r	|j        dv s|j        |_        | j        r|j                                         |j!                                         k    sJ |j        "                                D ]5}|sJ tG          |          tG          tI          |                    k    sJ 6|j!        %                                D ]h\  }}| &                    |||j                  }	|	j        dk    r||	_!        |j'        (                    |	           |	j)        (                    |           i| *                    |j                   n0tW          d| d|j         d| ,                    |                     ||fS )Nz)Asked to compute an already known task %s)taskr   zcompute-task>   rF   r;   r?   r@   r  >   r:   r<   r=   rA   rE   rC   r8   rF   r  r   r8   rE   r  r  z&Unexpected task state encountered for z; stimulus_id=z; story=)-r  r\   r%  r&  r   r'  r   r  r(  r  r  r   r+   r   rI   rc  r   r   r  r  r  r   r   r   r   r   r   r   r   r   r  r   rH  r   r  r  r   r   r)  r   r   r   r.  r  r^   )
rc   r  r6  r>  r  r   dep_workersdep_keyr   r|  s
             rY   _handle_compute_taskz WorkerState._handle_compute_task  s   	+BF#BLL;BN;;     	+ 	+ 	+&/&7&77DJrv&&r*****	+ 	2>466RSSSI	 "%'8u  
  
  
 
 
 

 X!!++rybn ,     
 X 
 
 
 #,OB+BK{do%77HOOq OOx +&*BF#BLBL "B "B"BK+BK^BNBJ 444K#@@@+-+C(} Ez((BINN,<,<<<<<#%:#4#4#6#6 E EK&&&&{++s3{3C3C/D/DDDDDD#%9??#4#4 * *11% " 2  
 <8++$*FM ##F+++!%%b))))  ,,,,H H H!~H H7;zz"~~H H  
 ,,s   58 ABBru  Iterator[TaskState]c              #     K   | xj         |j        z  c_         | j                            |j                  }|D ];}| j        |         }d|_        d|_        | j        	                    |           |V  <dS )aZ  Common code for the handlers of all subclasses of GatherDepDoneEvent.

        Yields the tasks that need to transition out of flight.
        The task states can be flight, cancelled, or resumed, but in case of scatter()
        they can also be in memory or error states.

        See also
        --------
        _execute_done_common
        TN)
r  r  r  re  r  r  r   r   r  r,  )rc   r  rH  r\   r6  s        rY   _gather_dep_done_commonz#WorkerState._gather_dep_done_commona  s       	$$7$$%))")44 	 	CCBBG!BN ''+++HHHH	 	rX   rx  c                
   i }|                      |          D ]}|j        |j        v rd|j        |j                 |j        f||<   .| j                            |j        d|j        t                      f           | j        r#|j	        dk    sJ || j
        |j                 vsJ |j                            |j                   |j        | j        v r*| j        |j                                     |j                   d||<   |g fS )zjgather_dep terminated successfully.
        The response may contain fewer keys than the request.
        r@   missing-depr<   )r  r\   rz  r   r  r  r   r+   r  r   r  r  r   r0  r  )rc   r  r>  r6  s       rY   _handle_gather_dep_successz&WorkerState._handle_gather_dep_successu  s   
 !#..r22 	. 	.Bv  '/")&L## OPPP= A8w....T%5bi%@@@@@
""29---9--M"),44RV<<<&-##""rX   r  c                j   | j                             |j                   i }g }|                     |          D ]0}d||<   |j        | j         z
  s|                    |j                   1t          |j        |j                  g}|r)|                    t          ||j                             ||fS )z,gather_dep terminated: remote worker is busyr<   )r  r   r  )
r  r   r  r  r   r  r\   r  r   rK  )rc   r  r>  refresh_who_hasr6  r  s         rY   _handle_gather_dep_busyz#WorkerState._handle_gather_dep_busy  s    
 	bi((( "..r22 	/ 	/B")OB: 11 /&&rv... !	r~NNN&
  	 '(bn     ,,rX   r  c                   i }|                      |          D ];}| j                            |j        d|j        t                      f           d||<   <| j                            |j        d          D ]<}| j	        r|j
        dk    sJ |j        |j        v sJ |j        |j        hk    rd||<   =| j                            |j        d          D ].}| j        |         }|j                            |j                   /|g fS )a  gather_dep terminated: network failure while trying to
        communicate with remote worker

        Though the network failure could be transient, we assume it is not, and
        preemptively act as though the other worker has died (including removing all
        keys from it, even ones we did not fetch).

        This optimization leads to faster completion of the fetch, since we immediately
        either retry a different worker, or ask the scheduler to inform us of a new
        worker if no other worker is available.
        r  r<   rW   rA   )r  r  r  r\   r   r+   r  re  r  r  r   r   r  r  r,  rc   r  r>  r6  r\   s        rY   "_handle_gather_dep_network_failurez.WorkerState._handle_gather_dep_network_failure  s    !#..r22 	* 	*BHOORV]BNDFFKLLL")OB"&&ry"55 	0 	0B} /x7****yBJ....zbi[((&/#=$$RY33 	) 	)CCBJbi((((""rX   r  c                N    fd|                                D             }|g fS )zvgather_dep terminated: generic error raised (not a network failure);
        e.g. data failed to deserialize.
        c           	     T    i | ]$}|d j         j        j        j        |j        f%S )r:   )r   r   r   r   r   )r   r6  r  s     rY   r   z:WorkerState._handle_gather_dep_failure.<locals>.<dictcomp>  sN     
!
 
!
 
!
  !!	
!
 
!
 
!
rX   )r  )rc   r  r>  s    ` rY   _handle_gather_dep_failurez&WorkerState._handle_gather_dep_failure  sH    

!
 
!
 
!
 
!
 22266
!
 
!
 
!
 ""rX   r  c                f   i }| j                             |j        d          D ]<}| j        r|j        dk    sJ |j        |j        v sJ |j        |j        hk    rd||<   =| j                            |j        d          D ].}| j        |         }|j                            |j                   /|g fS )NrW   r<   rA   )	r  re  r  r  r   r   r  r  r,  r  s        rY   _handle_remove_workerz!WorkerState._handle_remove_worker  s     ""&&ry"55 	0 	0B} /x7****yBJ....zbi[((&/#=$$RY33 	) 	)CCBJbi((((""rX   r  c                f    | j                             |j                  }|si g fS |d|j        fig fS )Nr?   )r  r   r\   rC  rc   r  r6  s      rY   _handle_secedezWorkerState._handle_secede  s@    Z^^BF## 	r6M^R%89:B>>rX   r  c                    | j                             |j                  }||j        nd }t	          |j        ||j                  }|t          dhz  v r|sJ |di|gfS i |gfS )N)r\   r   r   rF   rC   )r  r   r\   r   rN  r   rI   )rc   r  r6  r   r  s        rY   _handle_steal_requestz!WorkerState._handle_steal_request  s{    
 Z^^BF##NBF%R^TTTEYK''' III
#dV++v:rX   rm  c                    d| _         i g fS )zPrevent any further tasks to be executed or gathered. Tasks that are
        currently executing or in flight will continue to progress.
        F)r  r  s     rY   _handle_pausezWorkerState._handle_pause  s    
 2vrX   rq  c                8    d| _         |                                 S )zEmerge from paused statusT)r  rZ  r  s     rY   _handle_unpausezWorkerState._handle_unpause  s     %%'''rX   rs  c                H    | j                             |j                   i g fS ra   )r  r0  r  r  s     rY   _handle_retry_busy_workerz%WorkerState._handle_retry_busy_worker  s$    !!"),,,2vrX   r  c                    | j                             |j                  }|r|j        t          dhz  vri g fS | j                            |j        d|j        t                      f           |j	        rJ |dig fS )zCancel a task on a best-effort basis. This is only possible while a task
        is in state `waiting` or `ready`; nothing will happen otherwise.
        rF   zcancel-computerC   )
r  r   r\   r   rI   r  r  r   r+   r   r  s      rY   _handle_cancel_computez"WorkerState._handle_cancel_compute  s    
 Z^^BF## 	RXUi[%888r6M!12>466JKKK =   J##rX   r  $tuple[TaskState, Recs, Instructions]c                   | j                             |j                  }|s"J |                     |j                              | j        r|| j        v || j        v k    sJ d|_        | xj        dz  c_        | 	                    |           | j        
                    |           | j        
                    |           |                                 \  }}||vsJ |||fS )a  Common code for the handlers of all subclasses of ExecuteDoneEvent.

        The task state can be executing, cancelled, or resumed, but in case of scatter()
        it can also be in memory or error state.

        See also
        --------
        _gather_dep_done_common
        Tr  )r  r   r\   r^   r  r;   r  r   r  r  r0  rZ  rc   r  r6  r  r  s        rY   _execute_done_commonz WorkerState._execute_done_common)  s     Z^^BF##%%4::bf%%%%%= 	G$.(bD4E.EFFFFq ###r"""!!"%%%,,..e~~~~4rX   r  c                    |                      |          \  }}}|j                            d|j        |j        d           |j        |_        |j        |_        d|j        |j        f||<   ||fS )zTask completed successfullycomputer  r@   )	r.  r   r  rP   rQ   r   rh   r  r   r-  s        rY   _handle_execute_successz#WorkerState._handle_execute_successE  ss     33B77D% 		BHbgVVWWWI	'bh	2RU{rX   r  c                    |                      |          \  }}}|j        /|j        (|j                            d|j        |j        d           d|j        |j        |j        |j        |j	        f||<   ||fS )zTask execution failedNr0  r  r:   )
r.  rP   rQ   r   r  r   r   r   r   r   r-  s        rY   _handle_execute_failurez#WorkerState._handle_execute_failureR  s     33B77D%8BG$7M  $rxII   LLI
R U{rX   r  c                F    |                      |          \  }}}d||<   ||fS )zTask raised Reschedule() exception while it was running.

        Note: this has nothing to do with work stealing, which instead causes a
        FreeKeysEvent.
        rD   )r.  r-  s        rY   _handle_reschedulezWorkerState._handle_rescheduled  s0     33B77D% RU{rX   r  c                    | j         si g fS | j        r.| j         D ]&}|j        rJ |                     |                      't	          d | j         D             |j                  }i |gfS )Nc                    g | ]	}|j         
S rW   r  r:  s     rY   
<listcomp>z4WorkerState._handle_find_missing.<locals>.<listcomp>y  s    ;;;R"&;;;rX   r  )r  r  r   r^   rK  r   )rc   r  r6  r  s       rY   _handle_find_missingz WorkerState._handle_find_missingo  s    & 	r6M= 	6- 6 6:55tzz"~~5555&;;4#:;;;
 
 
 D6zrX   r  c                    |                      |j                   i }g }|j        D ]N}| j                            |          }|s|j        r|j        dk    rd||<   7|j        s|j        dk    rd||<   O||fS )NrA   r<   )r.  r   r  r   r   )rc   r  r>  r  r\   r6  s         rY   _handle_refresh_who_hasz#WorkerState._handle_refresh_who_has~  s    RZ((( "%': 	0 	0C$$B z 0bh)33&-##Z 0BH$7$7&/# ,,rX   keys_or_tasks_or_stimulistr | Key | TaskStater_   c                D    d |D             }t          || j                  S )ziReturn all records from the transitions log involving one or more tasks or
        stimulus_id's
        c                J    h | ] }t          |t                    r|j        n|!S rW   r   r   r\   r   r  s     rY   r;  z$WorkerState.story.<locals>.<setcomp>  s:     
 
 
9:Z9--4AEE1
 
 
rX   )r$   r  )rc   r<  keys_or_stimulis      rY   r^   zWorkerState.story  s4    
 
>V
 
 
 OTX666rX   keys_or_tasksKey | TaskStatelist[StateMachineEvent]c                B    d |D             fd| j         D             S )z;Return all state machine events involving one or more tasksc                J    h | ] }t          |t                    r|j        n|!S rW   r@  rA  s     rY   r;  z-WorkerState.stimulus_story.<locals>.<setcomp>  s-    PPPQAy118qPPPrX   c                :    g | ]}t          |d d          v |S )r\   Nr  )r   r  rH  s     rY   r8  z.WorkerState.stimulus_story.<locals>.<listcomp>  s/    SSSr'"eT2J2Jd2R2R2R2R2RrX   )r   )rc   rC  rH  s     @rY   stimulus_storyzWorkerState.stimulus_story  s6     QP-PPPSSSST.SSSSrX   rW   r   r   r   r   c               b   i d| j         d| j        d| j        dd | j                                        D             dd | j                                        D             dt                              | j                  d	d
 | j	        
                                D             dd | j        D             dt          | j                  dd | j        D             dd | j        D             d| j        dd | j        D             d| j        d| j        d| j        d| j        | j        t          | j                                                  t          | j                                                  d}fd|
                                D             }t3          |          S )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        r  r  r  rB   c                    g | ]	}|j         
S rW   r  r:  s     rY   r8  z(WorkerState._to_dict.<locals>.<listcomp>  s    ;;;bf;;;rX   r9   c                    g | ]	}|j         
S rW   r  r:  s     rY   r8  z(WorkerState._to_dict.<locals>.<listcomp>  s    GGGrBFGGGrX   rz  r  c                R    i | ]$\  }}|d  |                                 D             %S )c                    g | ]	}|j         
S rW   r  r:  s     rY   r8  z3WorkerState._to_dict.<locals>.<dictcomp>.<listcomp>  s    222rBF222rX   )sorted)r   re  tsss      rY   r   z(WorkerState._to_dict.<locals>.<dictcomp>  sE       As 22SZZ\\222  rX   r;   c                    h | ]	}|j         
S rW   r  r:  s     rY   r;  z'WorkerState._to_dict.<locals>.<setcomp>  s    :::R"&:::rX   r  r  c                    h | ]	}|j         
S rW   r  r:  s     rY   r;  z'WorkerState._to_dict.<locals>.<setcomp>  s    @@@RV@@@rX   r  c                    h | ]	}|j         
S rW   r  r:  s     rY   r;  z'WorkerState._to_dict.<locals>.<setcomp>  s    FFF2FFFrX   r  r  c                    g | ]	}|j         
S rW   r  r:  s     rY   r8  z(WorkerState._to_dict.<locals>.<listcomp>  s    "L"L"Lb26"L"L"LrX   r  r  r   r  )r  task_countstask_cumulative_elapsedc                $    i | ]\  }}|v	||S rW   rW   )r   r   r   r   s      rY   r   z(WorkerState._to_dict.<locals>.<dictcomp>  s)    BBBA'1A1A11A1A1ArX   r   )r  r  r  rB   rO  r9   r   r  rz  r  r   r;   r  r  r  r  r  r  r  r   r  r  r  current_countcumulative_elapsedr0   )rc   r   rb  s    ` rY   rc  zWorkerState._to_dict  s*   
t|

 t|
 ;;tz'8'8':':;;;	

 GGT-=-D-D-F-FGGG
 DMM$),,
   ".4466  
 ::4>:::
 T]++
 @@d.?@@@
 FF1EFFF
  !7
  !"L"LD4K"L"L"L!
" D-#
$ 48%
& D-'
( !$"9)
* Z 1 ? ? A ABB'+D,=,P,P,R,R'S'S/
 
 
2 CBBBBBB w7777rX   c                    |j         | j        v s|j         | j        v sJ t          |j        t
                    sJ |j        rJ d S ra   )r\   rz  r  r   r   r   r   r  s     rY   _validate_task_memoryz!WorkerState._validate_task_memory  sR    v""bf&;&;&;&;")S)))))&&&&&&rX   c                @   |j         dk    s|j        dk    r|| j        v sJ || j        vsJ n.|j         dk    s|j        dk    sJ || j        vsJ || j        v sJ |j        J |j        | j        vsJ |j        rJ |j        D ]}|| j	        vsJ || j
        vsJ dS )aj  Validate tasks:

        - ts.state == executing
        - ts.state == long-running
        - ts.state == cancelled, ts.previous == executing
        - ts.state == cancelled, ts.previous == long-running
        - ts.state == resumed, ts.previous == executing, ts.next == fetch
        - ts.state == resumed, ts.previous == long-running, ts.next == fetch
        r;   r?   N)r   r   r;   r  r   r\   rz  r   r   rB   r9   rc   r6  r  s      rY   _validate_task_executingz$WorkerState._validate_task_executing  s    8{""bk[&@&@''''T......8~--1N1N1N1NT^++++*****{&&&vTY&&&&&&&&= 	/ 	/Cdj((((d......	/ 	/rX   c                >    |j         dk    r |j        rJ | j        v sJ | j        vsJ n,|j        sJ |j         dk    sJ | j        vsJ | j        v sJ |j         j        vsJ |j        rJ |j        rJ t           fd|j	        D                       sJ dS )zWValidate tasks:

        - ts.state == ready
        - ts.state == constrained
        rB   r9   c              3  P   K   | ] }|j         j        v p|j         j        v V  !d S ra   r  r  s     rY   r	  z3WorkerState._validate_task_ready.<locals>.<genexpr>  sK       
 
?BCGty :CGt{$:
 
 
 
 
 
rX   N)
r   r   rB   r9   r\   rz  r   r   r  r   r  s   ` rY   _validate_task_readyz WorkerState._validate_task_ready  s    8w////####T------++++8},,,,TZ'''')))))vTY&&&&7&&&& 
 
 
 
FHo
 
 
 
 
 	
 	
 	
 	
 	
rX   c                     |j          j        vsJ |j        rJ | j        v sJ |j        sJ |j         fd|j        D             k    sJ |j        D ]}| j        vsJ | j        vsJ d S )Nc                L    h | ] }|j         j        v|j         j        v|!S rW   r  r  s     rY   r;  z5WorkerState._validate_task_waiting.<locals>.<setcomp>  s@     '
 '
 '
wdi''CG4;,F,F ,F,F,FrX   )	r\   rz  r   rF   r   r   r   rB   r9   r]  s   `  rY   _validate_task_waitingz"WorkerState._validate_task_waiting  s    vTY&&&&7T\!!!!""""" '
 '
 '
 '
'
 '
 '
 
 
 
 

 = 	/ 	/Cdj((((d......	/ 	/rX   c                   |j         | j        vsJ |j         | j        vsJ || j        v sJ |j        D ]}|| j        vsJ || j        vsJ |j        sJ |j        | j        v sJ |j         | j        |j                 v sJ dS )zValidate tasks:

        - ts.state == flight
        - ts.state == cancelled, ts.previous == flight
        - ts.state == resumed, ts.previous == flight, ts.next == waiting
        N)	r\   rz  r  r  r   rB   r9   r   r  r]  s      rY   _validate_task_flightz!WorkerState._validate_task_flight  s     vTY&&&&vT[((((T)))))= 	/ 	/Cdj((((d......~~!77777v/???????rX   c                ,   |j         | j        vsJ |j         | j        vsJ | j        |j        vsJ |j        rJ |j        sJ |j        D ])}|j         | j        |         v sJ || j        |         v sJ *|j        D ]}|| j	        vsJ || j
        vsJ d S ra   )r\   rz  r  r  r   r   r  r  r   rB   r9   )rc   r6  re  r  s       rY   _validate_task_fetchz WorkerState._validate_task_fetch/  s    vTY&&&&vT[((((|2:----7z 	- 	-A6T]1-----)!,,,,,,= 	/ 	/Cdj((((d......	/ 	/rX   c                *   j         | j        vsJ j         | j        vsJ j        rJ j        rJ t          fd| j                                        D                       rJ | j        v sJ j	        D ]}|| j
        vsJ || j        vsJ d S )Nc              3  *   K   | ]}j         |v V  d S ra   r  )r   r  r6  s     rY   r	  z5WorkerState._validate_task_missing.<locals>.<genexpr>A  s*      QQhrv)QQQQQQrX   )r\   rz  r  r   r   r  r  r  r  r   rB   r9   r]  s    ` rY   _validate_task_missingz"WorkerState._validate_task_missing<  s    vTY&&&&vT[((((:7QQQQ$-:N:N:P:PQQQQQQQQT,,,,,= 	/ 	/Cdj((((d......	/ 	/rX   c                    |j         J |j        dv r|                     |           d S |j        dk    sJ |                     |           d S )Nr  r=   )r   r   r^  rf  r  s     rY   _validate_task_cancelledz$WorkerState._validate_task_cancelledG  sb    w;777))"-----;(****&&r*****rX   c                    |j         dv r#|j        dk    sJ |                     |           n/|j         dk    sJ |j        dk    sJ |                     |           |j        D ]}|| j        vsJ || j        vsJ d S )Nr  r<   r=   rF   )r   r   r^  rf  r   rB   r9   r]  s      rY   _validate_task_resumedz"WorkerState._validate_task_resumedO  s    ;7777g%%%%))"----;(****7i''''&&r***= 	/ 	/Cdj((((d......	/ 	/rX   c                8   |j         | j        vsJ |j         | j        vsJ |j        rJ |j        rJ | j                                        D ]}||vsJ 	|| j        vsJ || j        vsJ || j	        vsJ |j
        rJ |j        rJ |j        rJ |j        rJ d S ra   )r\   rz  r  r   r   r  r  r;   r  r  r   r   r   r   )rc   r6  rP  s      rY   _validate_task_releasedz#WorkerState._validate_task_released[  s    vTY&&&&vT[((((7;#**,, 	! 	!CS=====''''-----00000 &&&&7<<rX   c                   	 |j         | j        v r| j        |j                  |u sJ |j        dk    r|                     |           d S |j        dk    r|                     |           d S |j        dk    r|                     |           d S |j        dk    r|                     |           d S |j        dk    r|                     |           d S |j        dv r|                     |           d S |j        dv r| 	                    |           d S |j        dk    r| 
                    |           d S |j        d	k    r|                     |           d S |j        d
k    r|                     |           d S d S # t          $ rO}t                              |           t!          |j         |j        |                     |                    |d }~ww xY w)Nr@   rF   rA   r8   rE   )rB   r9   r  r=   r<   rC   r   )r\   r  r   r[  rd  rk  rm  ro  ra  r^  rf  rh  rq  r  r%  r   r   r^   )rc   r6  r  s      rY   r  zWorkerState.validate_taskr  s"   	v##z"&)R////x8##**2.....Y&&++B/////Y&&++B/////[((--b11111Y&&++B/////555))"-----:::--b11111X%%**2.....W$$))"-----Z'',,R00000 (' 	 	 	Q"F"($**R..  	sO   AE8  E8 * E8  E8 . E8 E8 0E8  E8 2 E8  E8 8
GA
GGc                ,     j                                         D ]}|j        D ]%}| j        k    sJ |j         j        |         v sJ &|j        D ]>} j         |j                 |u sJ ||j        v sJ                      |                      ?|j	        D ]C} j         |j                 |u sJ |j
        t          v sJ                      |                      D j                                        D ]S\  }}| j        k    sJ |D ]>}| j         v sJ                      |                      | j         |         j        v sJ ?Tt                      } j                                        D ]d\  }}|D ]\}|                    |           |j
        dk    sJ                      |                      ||j        v sJ | d|j                    ]et!          |           j        k    sJ  j        D ]*}|j
        dk    sJ                      |                      + j        D ]*}|j
        dk    sJ                      |                      + j        D ]*}|j
        dk    sJ                      |                      + j        D ]>}|j
        dk    s1|j
        dv r|j        dk    sJ                      |                      ? j        D ]>}|j
        dk    s1|j
        dv r|j        dk    sJ                      |                      ? j        D ]>}|j
        d	k    s1|j
        dv r|j        d	k    sJ                      |                      ? j        D ]*}|j
        d
k    sJ                      |                      +t5          g  j                                         j         j         j         j         j         j         j        R  D ]5} j         |j                 |u sJ  j         |j                  d|             6t7           fdt5           j         j                  D                       }	 j        |	k    sJ d j        d|	              j        D ](}
|
 j         v sJ                      |
                      ) j        D ](}
|
 j         v sJ                      |
                      ) j                                         D ]}                     |           tA          d  j                                         D                       } j!        "                                |k    s#J  j!        "                                |f             j#        r j$         j#        k     sJ  %                                 d S )Nr<   z; ts.who_has=rA   rB   r9   r;   r	  r?   r=   rF   z is not c              3  @   K   | ]}j         |         j        pd V  dS )r   N)r  r   )r   r\   rc   s     rY   r	  z-WorkerState.validate_state.<locals>.<genexpr>  sB       
 
,/DJsO"'a
 
 
 
 
 
rX   zself.nbytes=z; expected c              3  2   K   | ]}|j         |j        fV  d S ra   )r   r   r:  s     rY   r	  z-WorkerState.validate_state.<locals>.<genexpr>  s<       %
 %
&(RY!%
 %
 %
 %
 %
 %
rX   )&r  r  r   r  r\   r  r   r   r^   r   r   rJ   r   r   r  r   r  r  r  rB   r9   r;   r   r  r  rF   r   sumrz  r  r   r  r   r  rX  r  r  _validate_resources)rc   r6  r  r  ts_waitrH  r   	fetch_tssrP  expect_nbytesr\   expect_state_counts   `           rY   validate_statezWorkerState.validate_state  s   *##%% 	N 	NB* 7 7----vv!666666  < < z#'*c1111S^+++TZZ^^++++. N Nz'+.'9999}(8888$**W:M:M8888N !M//11 	7 	7LFDT\)))) 7 7DJ

1A!6666667
 EE	+1133 	D 	DKFC D Db!!!x7***DJJrNN***+++-C-Cbj-C-C++++D 9~~!11111) 	9 	9B8y((($**R..((((* 	7 	7B8w&&&

2&&&&" 	= 	=B8},,,djjnn,,,,. 	 	B8{**4449S9S9Szz"~~ :T9S9S# 	 	B8~--4449V9V9Vzz"~~ :W9V9V& 	 	B8x''4449P9P9Pzz"~~ :Q9P9P, 	9 	9B8y((($**R..((((  	
$$&&	
#	
 J	
 		

  	
 N	
 	
 L	
 	
 	
 
	Q 
	QB :bf%+++
260B-P-PB-P-P++++ 
 
 
 
38DK3P3P
 
 
 
 
 {m+++-X-X-X-X-X+++9 	6 	6C$*$$$djjoo$$$$; 	6 	6C$*$$$djjoo$$$$*##%% 	# 	#Br""""$ %
 %
,0J,=,=,?,?%
 %
 %
 
 
  ..004FFFF++--I
FFF
 & 	I*T-HHHHH  """""rX   c                6   | j                                         | j                                        k    sJ | j                                         }| j                                        D ]*\  }}|dk    sJ | j                    ||xx         |z  cc<   +| j        D ]H}|j                                        D ],\  }}|dk    sJ ||j        f            ||xx         |z  cc<   -It          d |                                D                       s
J |            dS )zKAssert that available_resources + resources held by tasks = total_resourcesg&.r   c              3  <   K   | ]}t          |          d k     V  dS )g&.>N)abs)r   r   s     rY   r	  z2WorkerState._validate_resources.<locals>.<genexpr>  s,      ;;qCFFTM;;;;;;rX   N)	r  rH  r  r   r   r  r   r  r  )rc   totalr   r   r6  s        rY   rw  zWorkerState._validate_resources  s:   #((**d.F.K.K.M.MMMMM$))++,2244 	 	DAqu999d6999!HHHMHHHH( 	 	B06688  1AvvvB$<=vvvaA ;;ELLNN;;;;;BBUBBBBBrX   )r  r   r  r   rz  r
  r  r  r  r  r	  r  r  r   r  r   r  r  r  rO   r  rO   )r  rP  re   r  r   )re   r   )r\   r    r   r  r   rk   re   r   )r   r*  re   r   r6  r   re   r   )re   r   )r   rk   re   r  )re   rD  )rP  r  re   rQ  )r6  r   rA  r   re   r   )re   r  )re   r[  )r6  r   r   r   r   rk   re   r)  )r6  r   r   rk   re   r  )r6  r   r  rk   r   rk   re   r  )r6  r   r   r-   r   r   r   rk   r   rk   r   r   r   rk   re   r  )r6  r   rS  r   r   rk   re   r  )r6  r   rC  rO   r   rk   re   r  )
r6  r   r  r   r   r   r   rk   re   r  )r6  r   r  r   r  r  r   r   r   rk   re   r  )r  rk   rS  r   r   r   re   r   )
r6  r   r]   r  rS  r   r   rk   re   r  )r6  r   re   r   )r>  r  r   rk   re   r  )r  rP  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  ru  re   r  )r  rx  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  rm  re   r  )r  rq  re   r  )r  rs  re   r  )r  r  re   r  )r  r  re   r+  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r  r  re   r  )r<  r=  re   r_   )rC  rD  re   rE  r   r   )rrS   rT   rU   r   rV   r2  r   mathinfrd   r  r`  r  r  r!  r#  r)  r.  r2  r7  rC  r<  r=  rS  rZ  rX  rc  rf  rk  rm  rq  rt  rs  rw  ro  ri  r~  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r]  rY  r  r  r   r  registerr  r  r  r  r  r  r  r  r  r  r  r   r"  r$  r&  r(  r*  r.  r1  r3  r5  r9  r;  r^   rI  rc  r[  r^  ra  rd  rf  rh  rk  rm  ro  rq  r  r|  rw  rW   rX   rY   r  r  $  s         ( LLL   
 &%%% %$$$ 
 $### MMM
 MMM  )((( 6555  (''' '&&& $### +*** !    '&&& '&&& 4333  OOO &%%%
 *)))  !     KKK 
 
 +*** NNN  
 1000 )((( o&&I
 "37)-2604-17</3x.2h?$ ?$ ?$ ?$ ?$ ?$B   , # # # X#  2 2 2 X2  ) ) ) X) + + + X+    '! '! '! '!R(! (! (! (!TY Y Y Y,C- C- C- C-J>1 >1 >1 >1@) ) ) )0$: $: $: $:L   (   0
 
 
 
F
 
 
 
N N N N
K 
K 
K 
K" " " "M M M M	 	 	 	K K K K   $# # # #*   "M M M MN N N N	" 	" 	" 	"( ( ( ($
 
 
 
( ( ( (,   2   :! ! ! !$% % % %N! ! ! !
      6! ! ! !FN N N N          $	K 	K 	K 	K	 	 	 	
 
 
 
(( ( ( (6
 
 
 
	
 	
 	
 	

 
 
 

 
 
 
6\- \- \- \-|# # # #23	 >3	 ;3	 	&'T3	 	 !?	3	
 	!"@3	 	"#A3	 	%&D3	 	!"?3	 	73	 	73	 	$%R3	 	93	 	 !=3	 	#$C3	 	%&G3	  	$%A!3	" 	;#3	 3	$ 	 9%3	& 	&'I'3	( 	 !=)3	* 	"#A+3	, 	%&G-3	. 	5/3	0 	713	2 	933	4 	653	6 	573	8 	793	: 	9;3	< 	 ;=3	> 	"#<?3	@ 	#$@A3	B 	()JC3	D 	%&DE3	 3	 3	F 	 ;G3	H 	7I3	J 	7K3	L 	 !=M3	N 	 ;O3	P 	 ;Q3	R 	;S3	T 	8U3	V 	9W3	X 	"#AY3	Z 	 ;[3	\ 	 !<]3	^ 	 !=_3	` 	#$Ca3	b 	7c3	d 	 !=e3	 3	  5 5 5 5n   Z" Z" Z" Z"x
 
 
 
9 9 9 99 9 9 9) ) ) )^     # # # # # # # #$ %- %- %- %-N # # # #. \- \- \- \-|   ( # # # #( - - - -8  #  #  #  #D # # # #$ # # # #  ? ? ? ?    "     ( ( ( (
     $ $ $ $   8 
 
 
 
    "         - - - -47 7 7 7T T T T 57 #8 #8 #8 #8 #8 #8R' ' ' '
/ / / /D
 
 
 
./ / / /@ @ @ @"/ / / /	/ 	/ 	/ 	/+ + + +
/ 
/ 
/ 
/       .   <_# _# _# _#BC C C C C CrX   r  c                     e Zd ZU dZded<   ded<   d6dZddd7dZd8dZ	 d9d:dZd;dZ	d<d=d#Z
ej        d>d&            Zej        d?d-            Zej        d@d0            Zej        dAd1            Zej        dBd5            ZdS )C
BaseWorkerzWrapper around the :class:`WorkerState` that implements instructions handling.
    This is an abstract class with several ``@abc.abstractmethod`` methods, to be
    subclassed by :class:`~distributed.worker.Worker` and by unit test mock-ups.
    r  r   zset[asyncio.Task]_async_instructionsc                    || _         t                      | _        t          j                            d          | _        d S )Nzdistributed.worker.validate)r   r   r  r   r   r   r&  r   s     rY   rd   zBaseWorker.__init__	  s/    
#&55 [__%BCC


rX   N)r   	task_namerk   r  )Callable[P, Awaitable[StateMachineEvent]]rS  P.argsr   r   r   P.kwargsre   r   c              *   t                      t                    dfd            }t          j         |            |          }| j                            |           |                    t          | j        |                     dS )z:Execute an asynchronous instruction inside an asyncio taskre   rP  c                    K                        d          5    i  d {V cd d d            S # 1 swxY w Y   d S )Nasync-instructionr  )record)rS  r  r   ledgers   rY   wrapperz4BaseWorker._start_async_instruction.<locals>.wrapper  s      #677 3 3!T426222222223 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3s   599r  )r  r   Nr  )	r)   r   asynciocreate_taskr  r   add_done_callbackr   _finish_async_instruction)	rc   r  r  r   rS  r   r  r  r  s	     ` ``  @rY   _start_async_instructionz#BaseWorker._start_async_instruction  s     &''	t	3 	3 	3 	3 	3 	3 	3 	3 
	3 "77999=== $$T***D267SSS	
 	
 	
 	
 	
rX   r  asyncio.Task[StateMachineEvent]r  r)   c                   | j                             |           	 |                                }n[# t          j        $ r" t
                              d| d           Y dS t          $ r t
                              d|             w xY w|	                    d          5  | 
                    |           ddd           n# 1 swxY w Y   |                     |||           dS )z\The asynchronous instruction just completed; process the returned
        stimulus.
        zAsync instruction for z ended with CancelledErrorNz-Unhandled exception in async instruction for r  r  )r  r,  resultr  CancelledErrorr%  warningr  r   r  r  _finalize_metrics)rc   r  r  r   r  s        rY   r  z$BaseWorker._finish_async_instruction%  sD    	 ''---
	;;==DD% 	 	 	NNTDTTTUUUFF 	 	 	STSSTTT	 ]]2]33 	' 	'  &&&	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	tVW55555s!   1 -B	!(B	"CCCr  rP  c                   t          |t                    s|J t          |t                    rId}|j        D ]2}| j        j                            |          }|r|j        dk    rd} n3|j        rdnd}nt          |t          t          f          rd}d}nt          |t                    rd}d}nt          |t                    rMd|t          |j                  f}| j        j                            |j                  }|r|j        dk    rdnd}nt          |t                    rd|t          |j                  f}d}nPt          |t                    rd|t          |j                  f}d}n!t          |t                    s
J |            d S |                    |	          D ]#\  }}	}
|                     g |||
R |	           $d S )
N)z
gather-depr@   Fr8   rA   failedbusyexecute)coarse_time)r   r  rx  rz  r   r  r   r  r  r  r  r!   r\   r  r  rs  finalizedigest_metric)rc   r  r  r   activityr\   r6  r  labelr  units              rY   r  zBaseWorker._finalize_metricsD  s    $ 011 	#???d122 "	&Hy F FZ%))#.. "(h.."'KE-1YEkkI46RSTT 	&H"KK011 	&H KK122 	!7Idh,?,?@H!%%dh//B#%O"(h*>*>%%KKK122 
	!7Idh,?,?@H"KKo.. 	!7Idh,?,?@H%KK d$899??4???F"(//k/"J"J 	@ 	@E5$7757$77????	@ 	@rX   r  c           
         | j         j        | }|D ]}t          |t                    r(|                     |                                           @t          |t                    r|j        sJ | j        rbd	                    t          t          t          d|j                  d                             }t          |          dk    r|dd         dz   }nd}|                     d|j         d	| d
| j        |j        |j        |j        |j                   t          |t&                    rP| j         j        |j                 }|                     d|j        d| j        |j        |j        |j                   pt          |t0                    r,|                     d|j         d| j        |j                   t5          |          dS )az  Forward one or more external stimuli to :meth:`WorkerState.handle_stimulus`
        and process the returned instructions, invoking the relevant Worker callbacks
        (``@abc.abstractmethod`` methods below).

        Spawn asyncio tasks for all asynchronous instructions and start tracking them.

        See also
        --------
        WorkerState.handle_stimulus
        r     r   P   NM   z...zgather_dep(z, {z}))r  r   zexecute(r   )r   r   zretry_busy_worker_later()r   r  r   r!  batched_sendr&  r  r  r&  rt   ru   rk   r   r  r  r  
gather_depr  r   r  r  r\   r  r   r  retry_busy_worker_laterr  )rc   r  r  rg  keys_strr6  s         rY   r  zBaseWorker.handle_stimulusw  s    2tz159  (	& (	&D$ 677 '&!!$,,..1111D),, $&~%%%: %#yyS%DN2K2KA2N)O)OPPH8}}r))#+CRC=5#8$H--@$+@@8@@@OKN!%!2 $ 0 .     D'** &Z%dh/--,tx,,,LHJ $ 0 .     D"677 &--=t{===0K     oo%Q(	& (	&rX      timeoutrO   c                   K   | j         sdS | j         D ]}|                                 t          j        | j         |           d{V \  }}|D ]"}t                              d| d|            #dS )z$Cancel all asynchronous instructionsN)r  z$Failed to cancel asyncio task after z
 seconds: )r  cancelr  waitr%  r:   )rc   r  r  r]  pendings        rY   closezBaseWorker.close  s      ' 	F, 	 	DKKMMMM #<(@'RRRRRRRRR
7 	 	DLLPwPP$PP   	 	rX   r   r  c                    dS )a  Send a fire-and-forget message to the scheduler through bulk comms.

        Parameters
        ----------
        msg: dict
            msgpack-serializable message to send to the scheduler.
            Must have a 'op' key which is registered in Scheduler.stream_handlers.
        NrW   )rc   r   s     rY   r  zBaseWorker.batched_send  rj  rX   r  r  rI  r  r   r   c               
   K   dS )a  Gather dependencies for a task from a worker who has them

        Parameters
        ----------
        worker : str
            Address of worker to gather dependencies from
        to_gather : list
            Keys of dependencies to gather from worker -- this is not
            necessarily equivalent to the full list of dependencies of ``dep``
            as some dependencies may already be present on this worker.
        total_nbytes : int
            Total number of bytes for all the dependencies in to_gather combined
        NrW   )rc   r  r  r  r   s        rY   r  zBaseWorker.gather_dep  
        rX   r\   r    c               
   K   dS )zExecute a taskNrW   )rc   r\   r   s      rY   r  zBaseWorker.execute  r  rX   c                
   K   dS )z9Wait some time, then take a peer worker out of busy stateNrW   )rc   r  s     rY   r  z"BaseWorker.retry_busy_worker_later  r  rX   r  r   r  c                    dS )z!Log an arbitrary numerical metricNrW   )rc   r  r  s      rY   r  zBaseWorker.digest_metric  rj  rX   )r   r  )r  rk   r  r  rS  r  r   r   r   r  re   r   )r  r  r  r)   r   r   re   r   ra   )r  rP  r  r)   r   r   re   r   )r  rP  re   r   )r  )r  rO   re   r   )r   r  re   r   )
r  rk   r  rI  r  r   r   rk   re   rP  )r\   r    r   rk   re   rP  )r  rk   re   rP  )r  r   r  rO   re   r   )rS   rT   rU   r   rV   rd   r  r  r  r  r  abcabstractmethodr  r  r  r  r  rW   rX   rY   r  r     s         
 ****D D D D #
 
 
 
 
 
.6 6 6 6F #	1@ 1@ 1@ 1@ 1@f5& 5& 5& 5&n     	    	   , 	    	H H H H 	0 0 0 0 0 0rX   r  c                  z    e Zd ZU ded<   ded<   ded<   ded<    ee          ZddZdddZdddZddZ	ddZ
dS ) r  r   _previous_tsz#Counter[tuple[str, TaskStateState]]_current_countr   
_new_tasksz.defaultdict[tuple[str, TaskStateState], float]_cumulative_elapsedre   r   c                    d | _         t                      | _        t                      | _        t          t                    | _        d S ra   )r  r   r  r   r  r   rO   r  ri   s    rY   rd   zTaskCounter.__init__  s7     %ii%%#.u#5#5   rX   T	by_prefixr   r   c                    |r| j         S t                      }| j                                         D ]\  \  }}}||xx         |z  cc<   |S )a  Return current count of tasks.

        Parameters
        ----------
        by_prefix: bool, optional
            True (default)
                Return counter of (task prefix, task state) -> count
            False
                Return counter of task state -> count
        )r  r   r   )rc   r  r   r]  r   ns         rY   rX  zTaskCounter.current_count   s`      	'&&'.yy!06688 	 	MJQJJJ!OJJJJ
rX   Mapping[Any, float]c                p   | j         r_| j        J t                      }|| j        z
  }|| _        | j                                         D ]\  }}| j        |xx         ||z  z  cc<   |r| j        S t          t                    }| j                                        D ]\  \  }}}	||xx         |	z  cc<   |S )a  Ever-increasing cumulative task runtimes, including tasks that have left a
        state or even that don't exist anymore, updated as of the moment when this
        method is called.

        Parameters
        ----------
        by_prefix: bool, optional
            True (default)
                Return mapping of (task prefix, task state) -> seconds
            False
                Return mapping of task state -> seconds
        )r  r  r*   r   r  r   rO   )
rc   r  nowelapsedr   n_tasksr   r]  r   r  s
             rY   rY  zTaskCounter.cumulative_elapsed  s      	A$000++CD--G #D"17799 A A
7(+++w/@@++++ 	,++2=e2D2D!5;;== 	 	MJQJJJ!OJJJJ
rX   r6  r   c                :    | j                             |           dS )zvA new task has just been created and will be immediately fed into the
        recommendations for transitions
        N)r  r   r  s     rY   r(  zTaskCounter.new_task0  s      	BrX   prev_statesdict[TaskState, TaskStateState]c                   |s	| j         sdS t                      }| j        rJ| j        J || j        z
  }| j                                        D ]\  }}| j        |xx         ||z  z  cc<   || _        |                                D ]\  }}|j        dk    r!| j        |j        |j        fxx         dz  cc<   || j         v r| j                             |           U| j        |j        |f         dz
  }|dk    r|| j        |j        |f<   |dk    sJ | j        |j        |f= | j         D ]#}| j        |j        |j        fxx         dz  cc<   $| j         	                                 dS )z+Tasks have just transitioned to a new stateNr>   r  r   )
r  r*   r  r  r   r  r   r   r0  r1  )	rc   r  r  r  r   r  r6  
prev_state	dec_counts	            rY   r  zTaskCounter.transitions6  s    	4? 	Fkk 	A$000D--G"17799 A A
7(+++w/@@++++)//11 	C 	CNB
x;&&#BIrx$7888A=888T_$$''++++ /	:0EFJ	q==AJD'	:(=>>$>>>>+BIz,ABB/ 	: 	:B 	28 344494444rX   Nr   )T)r  r   re   r   )r  r   re   r  r  )r  r  re   r   )rS   rT   rU   rV   r2  r   rd   rX  rY  r(  r  rW   rX   rY   r  r    s         7777  HGGGo&&I6 6 6 6    &    :                        rX   r  c                  N    e Zd ZU ded<   ded<   dddZddZddZddZddZdS )DeprecatedWorkerStateAttributerk   r  r   targetNc                    || _         d S ra   )r  )rc   r  s     rY   rd   z'DeprecatedWorkerStateAttribute.__init__]  s    rX   ownerrh   re   r   c                    || _         d S ra   r  )rc   r  r  s      rY   __set_name__z+DeprecatedWorkerStateAttribute.__set_name__`  s    			rX   c                j    t          j        d| j         d| j        p| j         dt                     d S )NzThe `Worker.z,` attribute has been moved to `Worker.state.`)warningswarnr  r  FutureWarningri   s    rY   _warn_deprecatedz/DeprecatedWorkerStateAttribute._warn_deprecatedc  sQ    949 9 9![5DI9 9 9	
 	
 	
 	
 	
rX   instanceWorker | Nonetype[Worker]r   c                t    |d S |                                   t          |j        | j        p| j                  S ra   )r  r  r   r  r  )rc   r  r  s      rY   __get__z&DeprecatedWorkerStateAttribute.__get__j  s:    4x~t{'?di@@@rX   r7   r  c                r    |                                   t          |j        | j        p| j        |           d S ra   )r  setattrr   r  r  )rc   r  r  s      rY   __set__z&DeprecatedWorkerStateAttribute.__set__q  s7     8ty%@@@@@rX   ra   )r  r   )r  rh   r  rk   re   r   r   )r  r  r  r  re   r   )r  r7   r  r   re   r   )	rS   rT   rU   rV   rd   r  r  r  r  rW   rX   rY   r  r  Y  s         III       
 
 
 
A A A AA A A A A ArX   r  r   rW   )rS  r  re   r  )
__future__r   r  r  rG  loggingr  r  r  sysr  r   collectionsr   r   r   collections.abcr   r   r	   r
   r   r   r   r   r   r   dataclassesr   r   	functoolsr   r   r   r   	itertoolsr   typingr   r   r   r   r   r   r   tlzr   r   dask.typingr    
dask.utilsr!   r"   r#   distributed._storiesr$   distributed.collectionsr%   distributed.commr&   distributed.corer'   r(   distributed.metricsr)   r*   r+   distributed.protocolr,   distributed.protocol.serializer-   r.   distributed.sizeofr/   r  distributed.utilsr0   	getLoggerr%  typing_extensionsr1   r2   r3   r4   distributed.diagnostics.pluginr5   distributed.schedulerr6   distributed.workerr7   rG   rV   rH   rI   rJ   NO_VALUEr   rM   r  r[   r   r   r   r   version_infoDC_SLOTSr   r   r   r  r  r  r!  r)  r4  r;  r>  rA  rE  rK  rN  rP  rm  rq  rs  ru  rx  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r   r2  r  r   r  r  r  r  ABCr  r  r  rW   rX   rY   <module>r     s   " " " " " " " 



       



   3 3 3 3 3 3 3 3 3 3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
       ( ( ( ( ( ( ( ( E E E E E E E E E E E E       P P P P P P P P P P P P P P P P P P              7 7 7 7 7 7 7 7 7 7 - - - - - - + + + + + + - - - - - - 8 8 8 8 8 8 8 8 E E E E E E E E E E ' ' ' ' ' ' > > > > > > > > 4 4 4 4 4 4 / / / / / /		=	>	> 
* DCCCCCCCCC	#A <;;;;;//////)))))) $    &# # #
     &}5 5 5 5 5
) 
) 
)  
 
 
 
 #    	   #
 #
 #
 #
 #
	 #
 #
 #
L6 6 6 6 6#4 6 6 6 
  
  
  
  
y  
  
  
FZ Z Z Z Zi Z Z Z S S S S ".'99GT??r ,%,,8,,IE IE IE IE IE IE IE -,IEX ) ) ) ) ) ) ) )DK K K K K K K K.             k   
     ;   

 
 
 
 
[ 
 
 
     ,   & !
 !
 !
 !
 !
) !
 !
 !
H     1        *    # # # # #+ # # #     '        4   & ! ! ! ! !- ! ! ! =U =U =U =U =U =U =U =U@     "        $        ,   
     *    1 1 1 1 1. 1 1 1&     +        #5    !
 !
 !
 !
 !
. !
 !
 !
H     )   
 N
 N
 N
 N
 N
( N
 N
 N
b     (    /
 /
 /
 /
 /
* /
 /
 /
d >
 >
 >
 >
 >
* >
 >
 >
D A A A A A& A A A     *   
     (    
( 
( 
( 
( 
(* 
( 
( 
(     ,        +   
     %   
 
 
 
 
 
) 
 
 
     '        #    y%(=">>? ? ? ? ?{+ + + + +dL01
 1 1 1 1   "Y'C Y'C Y'C Y'C Y'C Y'C Y'C Y'CxNh0 h0 h0 h0 h0 h0 h0 h0Vk  k  k  k  k  k  k  k \A A A A A A A A A ArX   