
    0Fie                      U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlZd dlZd dlmZmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2 d dl3Z3d dl4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZB d dlCmDZDmEZEmFZF d dlGmHZH d dlImJZJ d dlKmLZLmMZMmNZNmOZO d dlKmPZQ d dlRmSZS d dlTmUZU d dlVmWZWmXZXmYZYmZZZm[Z[m\Z\m]Z]m^Z^m_Z_ d dlVm`Za d dlVmbZb d dlcmdZdmeZe d dlfmgZgmhZh d dlimjZj d dlkmlZl d dlmmnZn d d lompZpmqZqmrZr d d!lsmtZt d d"lumvZv d d#lwmxZxmyZy d d$lzm{Z{ d d%l|m}Z} d d&l~mZ d d'lmZ d d(lmZ d d)lmZ d d*lmZ d d+lmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d d,lmZmZmZ d d-lmZmZ d d.lmZ d d/lmZmZmZmZ d d0lmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ e&r.d d1lmZ d d2lmZ d d3lmZ d d4lmZ  ed5          Z e,d6          Z ej        eϦ          Ze3j                            d7          Ze}ed8Zd9ed:<   i Zd;ed<<   i Zd;ed=<   e[j        e[j        e[j        hZ G d> d?e+          Z G d@ dAe+          ZddEZddHZ G dI dJeet          Z ej        dK          ZdLedK<   ddMZdddPZdQ ZddddRdd^ZddbZdc Z eJdde          Zdfedg<    ej                    ZddiZdj Zdk Zdl Zdm Zdn ZdddqZdddtZddvZej        Zdw Z	 d dleZdx Zeedy<   [n# e$ r Y nw xY wdzd{dd|d}ddZeddfddZ	 	 	 dddZ	 	 dddZ	 	 dddZdS )    )annotationsN)defaultdictdeque)Callable
Collection	ContainerHashableIterableMappingMutableMapping)Executor)suppress)	timedelta)wrapsisawaitable)TYPE_CHECKINGAnyClassVarLiteralTextIO	TypedDictTypeVarcast)keymappluck)IOLoop)istask)	CPU_COUNT)Key)applyformat_bytesfuncname	key_splitparse_bytesparse_timedeltatmpdirtypename)
preloadingprofileutils)BatchedSend)LRU)Commconnectget_address_hostparse_address)resolve_address)address_from_user_args)PeriodicCallback)	ConnectionPoolErrorMessage	OKMessagePooledRPCCallStatuscoerce_to_addresscontext_meter_to_server_digesterror_messagepingpong)rpc)	send_recv)nvmlrmm)WorkerPlugin_get_plugin_name)	WorkSpace)
Reschedule)get_handlers)context_meterthread_timetime)
ServerNode)setproctitle)pickleto_serialize)_is_dumpable)PubSubWorkerExtension)Security)safe_sizeof)SpansWorkerExtension)ThreadPoolExecutor)secede)TimeoutError_maybe_complexget_iphas_argin_async_callis_python_shutting_downiscoroutinefunctionjson_load_robust
log_errorsoffloadparse_portsrecursive_to_dictrun_in_executor_with_contextset_thread_statesilence_logging_cmgrthread_statewait_for)gather_from_workers	pack_dataretry_operation)disable_gc_diagnosisenable_gc_diagnosis)get_versions) DeprecatedMemoryManagerAttributeDeprecatedMemoryMonitorWorkerDataParameterWorkerMemoryManager)AcquireReplicasEvent
BaseWorkerCancelComputeEventComputeTaskEventDeprecatedWorkerStateAttributeExecuteFailureEventExecuteSuccessEventFindMissingEventFreeKeysEventGatherDepBusyEventGatherDepFailureEventGatherDepNetworkFailureEventGatherDepSuccessEvent
PauseEventRefreshWhoHasEventRemoveReplicasEventRemoveWorkerEventRescheduleEventRetryBusyWorkerEventSecedeEventStateMachineEventStealRequestEvent	TaskStateUnpauseEventUpdateDataEventWorkerState)	ParamSpecClient)Nanny)	T_runspecPTzdistributed.admin.pdb-on-err)pubsubspanszdict[str, type]DEFAULT_EXTENSIONS"dict[str, Callable[[Worker], Any]]DEFAULT_METRICSDEFAULT_STARTUP_INFORMATIONc                      e Zd ZU ded<   dS )GetDataBusyzLiteral['busy']statusN__name__
__module____qualname____annotations__     2lib/python3.11/site-packages/distributed/worker.pyr   r      s         r   r   c                  $    e Zd ZU ded<   ded<   dS )GetDataSuccesszLiteral['OK']r   dict[Key, object]dataNr   r   r   r   r   r      s*         r   r   methodCallable[P, T]returnc                     d j          dt                     rt                     d fd	            }nt                     d fd            }|S )zO
    Decorator to close the worker if this method encounters an exception.
    zworker-z
-fail-hardargsP.argskwargsP.kwargsr   r   c                8  K   	  | g|R i | d {V S # t           $ rx}| j        t          j        t          j        fvr=|                     dt          |                     t                              |           t          |            d {V   d }~ww xY wNzworker-fail-hard)
	Exceptionr   r9   closedclosing	log_eventr<   logger	exception_force_closeselfr   r   er   reasons       r   wrapperzfail_hard.<locals>.wrapper   s      #VD:4:::6:::::::::   ;v}fn&EEENN#5}Q7G7GHHH$$Q'''"4000000000s    
BA3BBr   c                >   	  | g|R i |S # t           $ r}| j        t          j        t          j        fvr=|                     dt          |                     t                              |           | j	        
                    t          |             d }~ww xY wr   )r   r   r9   r   r   r   r<   r   r   loopadd_callbackr   r   s       r   r   zfail_hard.<locals>.wrapper   s    vd4T444V444   ;v}fn&EEENN#5}Q7G7GHHH$$Q'''	&&|T6BBBs    
BA>BB)r   r   r   r   r   r   )r   r   r   r   r   r   )r   r[   r   )r   r   r   s   ` @r   	fail_hardr      s     3v222F6"" 	v	 	 	 	 	 	 
	 	 
v	 	 	 	 	 	 
	 Nr   r   strc                $  K   	 t          |                     dd|          d           d{V  dS # t          t          f$ r  t          $ rB ddlm} |j        r t          	                    dd	           t          j        d
           Y dS w xY w)z
    Used with the fail_hard decorator defined above

    1.  Wait for a worker to close
    2.  If it doesn't, log and kill the process
    F)nannyexecutor_waitr      Nr   )	Schedulerz[Error trying close worker in response to broken internal state. Forcibly exiting worker NOWTexc_info   )re   closeKeyboardInterrupt
SystemExitBaseExceptiondistributedr   
_instancesr   criticalos_exit)r   r   r   s      r   r   r      s      JJU%JGG
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
 z*       	*))))) 	* 	 	
 	
 	
 	#s   ,2 ABBc            +      `
    e Zd ZU dZ ej                    Zded<    ej                    Zded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   d
ed<   d
ed<   d
ed <   d
ed!<   d"ed#<   d"ed$<   d"ed%<   d&ed'<   d(ed)<   d*ed+<   d,ed-<   d.ed/<   d0ed1<   d2Z	d3ed4<   d0ed5<   d0ed6<   d7ed8<   d0ed9<   d:ed;<   d7ed<<   d=ed><   d=ed?<   d=ed@<   d:edA<   dBedC<   dDedE<   dedF<   dGedH<   dIedJ<   dKedL<   dMedN<   d"edO<   dPedQ<   d"edR<   i Z
dedS<   dedT<   dUedV<   dUedW<   d:edX<   dYedZ<   ded[<   d\ed]<   d^ed_<   	 	 d#d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d`d2eed2d2d2d2d2dadbd2dcd2d2d2d2d2d2daddd2d2d2d2d2de)d$dZded<   ed%d            Z e            Z e            Z e            Z e            Z e            Z e            Z e            Z e            Z ed          Z ed          Z e            Z ed          Z e            Z e            Z  e            Z! e            Z" ed          Z# ed          Z$ e            Z% e            Z& e            Z' e            Z( e            Z) e            Z* e            Z+ e            Z, e            Z- ed          Z. ed          Z/ e            Z0 e            Z1 e            Z2 e            Z3 e            Z4ed&d            Z5ed'd            Z6d(dZ7ed             Z8d)dZ9ed             Z:ed             Z;e<j=        j>        d*d            Z=d+dZ?d,dZ@d ZAd ZBdcdd- fdZCd.dZDd/dZEd0dZFd/dZGeHd1d            ZId2dZJd3dZKd4d5dƄZL fdǄZMeN	 	 	 	 d6d7d΄            ZO	 d8d9dЄZPdф ZQd҄ ZR eSdӦ          	 	 d#d:dل            ZT	 d;d<dڄZUd=dۄZVeN	 	 d>d?d            ZWeNd@d            ZXdAdZYdBdZZeHdC fd            Z[dDdZ\dEdZ]dFdZ^dGdZ_eHdHd            Z`dIdZadJdZbeNd/d            ZcdKd ZddLdZe	 	 	 	 dMdNdZfd#dOdZgeHdPd            ZhdQdZid/dZjd/dZk	 	 	 	 dRdSdZl	 dTdUdZmd;dVdZndWdZodWdZpdXdZqedYd            Zrd;dZdZsd(dZtd[dZud/dZved             Zwed             Zxed              Zyed!             Zzed"             Z{ xZ|S (\  Workera  Worker node in a Dask distributed cluster

    Workers perform two functions:

    1.  **Serve data** from a local dictionary
    2.  **Perform computation** on that data and on data from peers

    Workers keep the scheduler informed of their data and use that scheduler to
    gather data from other workers when necessary to perform a computation.

    You can start a worker with the ``dask worker`` command line application::

        $ dask worker scheduler-ip:port

    Use the ``--help`` flag to see more options::

        $ dask worker --help

    The rest of this docstring is about the internal state that the worker uses
    to manage and track internal computations.

    **State**

    **Informational State**

    These attributes don't change significantly during execution.

    * **nthreads:** ``int``:
        Number of nthreads used by this worker process
    * **executors:** ``dict[str, concurrent.futures.Executor]``:
        Executors used to perform computation. Always contains the default
        executor.
    * **local_directory:** ``path``:
        Path on local machine to store temporary files
    * **scheduler:** ``PooledRPCCall``:
        Location of scheduler.  See ``.ip/.port`` attributes.
    * **name:** ``string``:
        Alias
    * **services:** ``{str: Server}``:
        Auxiliary web servers running on this worker
    * **service_ports:** ``{str: port}``:
    * **transfer_outgoing_count_limit**: ``int``
        The maximum number of concurrent outgoing data transfers.
        See also
        :attr:`distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit`.
    * **batched_stream**: ``BatchedSend``
        A batched stream along which we communicate to the scheduler
    * **log**: ``[(message)]``
        A structured and queryable log.  See ``Worker.story``

    **Volatile State**

    These attributes track the progress of tasks that this worker is trying to
    complete. In the descriptions below a ``key`` is the name of a task that
    we want to compute and ``dep`` is the name of a piece of dependent data
    that we want to collect from others.

    * **threads**: ``{key: int}``
        The ID of the thread on which the task ran
    * **active_threads**: ``{int: key}``
        The keys currently running on active threads
    * **state**: ``WorkerState``
        Encapsulated state machine. See
        :class:`~distributed.worker_state_machine.BaseWorker` and
        :class:`~distributed.worker_state_machine.WorkerState`

    Parameters
    ----------
    scheduler_ip: str, optional
    scheduler_port: int, optional
    scheduler_file: str, optional
    host: str, optional
    data: MutableMapping, type, None
        The object to use for storage, builds a disk-backed LRU dict by default.

        If a callable to construct the storage object is provided, it
        will receive the worker's attr:``local_directory`` as an
        argument if the calling signature has an argument named
        ``worker_local_directory``.
    nthreads: int, optional
    local_directory: str, optional
        Directory where we place local resources
    name: str, optional
    memory_limit: int, float, string
        Number of bytes of memory that this worker should use.
        Set to zero for no limit.  Set to 'auto' to calculate
        as system.MEMORY_LIMIT * min(1, nthreads / total_cores)
        Use strings or numbers like 5GB or 5e9
    memory_target_fraction: float or False
        Fraction of memory to try to stay beneath
        (default: read from config key distributed.worker.memory.target)
    memory_spill_fraction: float or False
        Fraction of memory at which we start spilling to disk
        (default: read from config key distributed.worker.memory.spill)
    memory_pause_fraction: float or False
        Fraction of memory at which we stop running new tasks
        (default: read from config key distributed.worker.memory.pause)
    max_spill: int, string or False
        Limit of number of bytes to be spilled on disk.
        (default: read from config key distributed.worker.memory.max-spill)
    executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], "offload"
        The executor(s) to use. Depending on the type, it has the following meanings:
            - Executor instance: The default executor.
            - Dict[str, Executor]: mapping names to Executor instances. If the
              "default" key isn't in the dict, a "default" executor will be created
              using ``ThreadPoolExecutor(nthreads)``.
            - Str: The string "offload", which refer to the same thread pool used for
              offloading communications. This results in the same thread being used
              for deserialization and computation.
    resources: dict
        Resources that this worker has like ``{'GPU': 2}``
    nanny: str
        Address on which to contact nanny, if it exists
    lifetime: str
        Amount of time like "1 hour" after which we gracefully shut down the worker.
        This defaults to None, meaning no explicit shutdown time.
    lifetime_stagger: str
        Amount of time like "5 minutes" to stagger the lifetime value
        The actual lifetime will be selected uniformly at random between
        lifetime +/- lifetime_stagger
    lifetime_restart: bool
        Whether or not to restart a worker after it has reached its lifetime
        Default False
    kwargs: optional
        Additional parameters to ServerNode constructor

    Examples
    --------

    Use the command line to start a worker::

        $ dask scheduler
        Start scheduler at 127.0.0.1:8786

        $ dask worker 127.0.0.1:8786
        Start worker at:               127.0.0.1:1234
        Registered with scheduler at:  127.0.0.1:8786

    See Also
    --------
    distributed.scheduler.Scheduler
    distributed.nanny.Nanny
    z!ClassVar[weakref.WeakSet[Worker]]r   z!ClassVar[weakref.WeakSet[Client]]_initialized_clientsNanny | Noner   zthreading.Lock_lockinttransfer_outgoing_count_limitzdict[Key, int]threadsactive_threads_lockzdict[int, str]active_threadszset[Key]active_keysz defaultdict[str, dict[str, Any]]profile_keysz.deque[tuple[float, dict[str, dict[str, Any]]]]profile_keys_historydict[str, Any]profile_recentz#deque[tuple[float, dict[str, Any]]]profile_historyzdeque[dict[str, Any]]transfer_incoming_logtransfer_outgoing_logtransfer_outgoing_count_totaltransfer_outgoing_bytes_totaltransfer_outgoing_bytestransfer_outgoing_countfloat	bandwidthlatencyprofile_cycle_intervalrD   	workspacezClient | None_clientz#defaultdict[str, tuple[float, int]]bandwidth_workersz$defaultdict[type, tuple[float, int]]bandwidth_typeszpreloading.PreloadManagerpreloads
str | Nonecontact_addressN"int | str | Collection[int] | None_start_port_start_host
_interfacer   	_protocol_dashboard_addressbool
_dashboard_http_prefixfloat | Nonedeath_timeoutlifetimelifetime_staggerlifetime_restartdict
extensionsrP   securityconnection_argsr   r   zdict[str, Executor]	executorsr,   batched_streamr   namescheduler_delayzdict[str, BatchedSend]stream_commsheartbeat_intervalservicesservice_specsr   metricsstartup_informationlow_level_profilerr8   	schedulerexecution_statezdict[str, WorkerPlugin]pluginstuple[WorkerPlugin, ...]_pending_plugins1sF/r   auto))scheduler_filenthreadsr   local_directoryr  r  	reconnectexecutor	resourcessilence_logsr   preloadpreload_argvr  r   r
  r  r  r  	interfacehostportprotocoldashboard_address	dashboardhttp_prefixr   r  r  validater   r   r   r   transition_counter_maxmemory_limitr   memory_target_fractionmemory_spill_fractionmemory_pause_fractionscheduler_snischeduler_ipscheduler_port
int | Noner  r  IOLoop | Noner  dict | None
Any | Noner  bool | Noner  :Executor | dict[str, Executor] | Literal['offload'] | Noner  dict[str, float] | Noner  r  list[str] | Noner   "list[str] | list[list[str]] | None Security | dict[str, Any] | Nonedict[str, type] | None%Mapping[str, Callable[[Worker], Any]]r!  r"  r#  r$  r%  r&  r'  r(  r)  int | Literal[False]r*  str | floatr   rn   r+  float | Literal[False] | Noner,  r-  r.  c       )            |	-|	rt          d          t          j        dt          d           |t          j        dt          d           t	          j                    x _        }-| _        t          j	                     _
        t          j                            d          }.t          j                            d           _        t          t          j                            d                    }/i  _        t          j	                     _        i  _        t'                       _        t+          t,          j                   _        t          j                            d	          }0t3          |0
           _        t3          |0
           _        t-          j                     _        | t          j                            d          } t3          |0
           _        t3          |0
           _        d _        d _         d _!        d _"        t          t          j                            d                     _#        t+          d            _$        t+          d            _%        d _&        d  _'        |!t          j                            d          }!tQ          |!d          }!|!sJ  )                    tT                     tQ          |           _+        | _,        | _-        | _.        |rRt_          |          \  }1}2|20                    d          dk    r'|21                    d          st          d|2           | _2        |ptf          }|6t          j                            d          }ti          |tj                    sJ i  _6        |r#|-7                    tq          |                     ti          |tj                    rts          dWi |}|pts                       _:        ti           j:        tr                    sJ  j:        ;                    d           _<        t{          j>                    x _?         _@        |+r
|+ j<        d<   t          jB        t          dd          d _D        t          jF                    dk    rt          dd           jD        d <   |
d!k    r jD        d!          jD        d"<   n<ti          |
tj                    r jD        G                    |
           n|

|
 jD        d"<   d" jD        vrt          |d#           jD        d"<   t          d$ j?        %           _I        | _J        d _K        i  _L        i  _M        | _N        i  _O        |pi  _P        | _Q        | _R        | _S        |rtk          |          ni  _T        |rtk          |          ni  _U        |t          j                            d&          }| _V        i d' jW        d( jX        d) jY        d* jZ        d+ j[        d, \                    t                    d- j^        d.t          d/ j`        d0 ja        d1 jb        d2 jc        d3 jd        d4 je        d5 jf        d6 jg        d7 jh         ji         jj         jk         jl         jm         jn         jo        d8}3 j^         \                    t                     \                    t                     \                    t                     \                    t                     \                    t                     \                    t                     \                    t                     jv         jw        d9
}4t          jy         f|3|4 j<        |d:|, |st          j                            d;          }|st          j                            d<          }|J |J t          j{         || j|        =           _}        |rt          | j+        >          }5|5d?         }6ne|@t          j                            d@d           r t          j                            d@          }6n#|t          |          }6nt          ||f          }6|5|6                    dA          }7t          |7          dk    r|7d         }|sJ | _        t           |'||&|(|)|*B           _        t
          j        }8t          j                            dC          }9 j        j        !|9dDurt           j        j        |9z            }8t          | j        j         j         jM        ||.| |%|8|/E
  
        }:t          jy         |:                                |6           _         j        j         j?         dF _        tQ          |d           _        t#           j         j        dGz            };|; j        dH<   t#           fdIdJ          };|; j        dK<   t#           j        dG          };|; j        dL<   | _        |t,          } fdM|                                D              _6        t1          dN           t          j                            dO          rttQ          t          j                            dP          d          }<t#           j        |<dGz            };|; j        d1<   t#           j        |!dGz            };|; j        dQ<   |"t          j                            dR          }"tQ          |"          }"|#t          j                            dS          }#tQ          |#          }#|$t          j                            dT          }$|$ _        |"rB|"t9          j                    dz  dz
  |#z  z  }" j@                            |" j        dUV           |" _        t@          j                                        d S )XNzThe `reconnect=True` option for `Worker` has been removed. To improve cluster stability, workers now always shut down in the face of network disconnects. For details, or if this is an issue for you, see https://github.com/dask/distributed/issues/6350.zThe `reconnect` argument to `Worker` is deprecated, and will be removed in a future release. Worker reconnection is now always disabled, so passing `reconnect=False` is unnecessary. See https://github.com/dask/distributed/issues/6350 for details.   
stacklevelzThe `loop` argument to `Worker` is ignored, and will be removed in a future release. The Worker always binds to the current loopz'distributed.worker.connections.outgoingz'distributed.worker.connections.incomingz/distributed.worker.transfer.message-bytes-limitz&distributed.admin.low-level-log-length)maxlenzdistributed.worker.validater   zdistributed.scheduler.bandwidthc                     dS Nr   r   r   r   r   r   <lambda>z!Worker.__init__.<locals>.<lambda>K  s    F r   c                     dS rF  r   r   r   r   rH  z!Worker.__init__.<locals>.<lambda>M  s    6 r   MbP?z distributed.worker.profile.cyclems)default:r   [z;Host address with IPv6 must be bracketed like '[::1]'; got zdistributed.worker.resources)levelworkerserver_hostnamezDask-Actor-Threads)thread_name_prefix)r^   actorzDask-GPU-Threadsgpur^   rL  zDask-Default-Threads2msintervalr   z$distributed.worker.profile.low-levelgatherrunrun_coroutineget_dataupdate_data	free_keys	terminatepingupload_file
call_stackr*   profile_metadataget_logskeysversionsactor_executeactor_attribute)z
plugin-addzplugin-removeget_monitor_infobenchmark_diskbenchmark_memorybenchmark_network	get_story)
r   zcancel-computezacquire-replicaszcompute-taskz	free-keyszremove-replicaszsteal-requestzrefresh-who-hasworker-status-changezremove-worker)handlersstream_handlersr  r  zdistributed.worker.preloadzdistributed.worker.preload-argv)file_dirtimeoutaddresszscheduler-addressz://)r   r  r*  r+  r,  r-  z"distributed.worker.memory.transferF)
r  r   r   r  r  transfer_incoming_count_limitr(  r)  transfer_incoming_bytes_limittransfer_message_bytes_limit)r  iolooprP    	heartbeatc                 2                          ddi          S )Nop
keep-alive)batched_sendr   s   r   rH  z!Worker.__init__.<locals>.<lambda>#  s    d&7&7|8L&M&M r   i`  r|  find-missingc                .    i | ]\  }}| |          S r   r   ).0r  	extensionr   s      r   
<dictcomp>z#Worker.__init__.<locals>.<dictcomp>-  s4     
 
 
&5dID))D//
 
 
r   zdask worker [not started]z"distributed.worker.profile.enabledz#distributed.worker.profile.intervalzprofile-cyclez$distributed.worker.lifetime.durationz#distributed.worker.lifetime.staggerz#distributed.worker.lifetime.restartzworker-lifetime-reachedr   r   )
ValueErrorwarningswarnDeprecationWarning
contextlib	ExitStack_Worker__exit_stackr   	threadingLockr   daskconfiggetr   r%   r   r   r   setr   r   r*   creater   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r&   _setup_loggingr   r   r   r   r   r1   count
startswithr   r   
isinstancer  r  enter_contextrc   rP   r  get_connection_argsr  r   currentr   io_loopr+   _offload_executorrS   r  r@   device_get_countupdater,   r  r  r  r	  r  r  r  r  r   r   r   r  r  r  rX  rY  rZ  r[  r\  _handle_remote_stimulusrx   r   r=   r`  get_call_stackget_profileget_profile_metadatarc  rd  re  rf  rg  
plugin_addplugin_removerh  ri  rj  rk  rl  rr   rp   rs   r   r   r~   handle_worker_status_change_handle_remove_workerrJ   __init__r)   process_preloadsr  r   r\   r:   splitlenr   ro   memory_managermathinfr*  r   r   r   rq   r>   r  rs  r  r
  r4   ry  periodic_callbacksfind_missing_addressr   itemsrK   trigger_profilecycle_profiler   random
call_laterclose_gracefullyr   r   r   add)=r   r/  r0  r  r  r   r  r  r  r  r  r  r  r   r  r   r  r   r
  r  r  r  r!  r"  r#  r$  r%  r&  r'  r   r  r  r(  r   r   r   r   r)  r*  r   r+  r,  r-  r.  r   stackrt  rv  rD  _host_addressrn  ro  cfgscheduler_addrprotocol_addressru   transfer_incoming_bytes_fractionstatepcprofile_trigger_intervals=   `                                                            r   r  zWorker.__init__  s   r    x   W '     M>"	    %/$8$:$::E
^%%
(,5)
 )
% .2[__5.
 .
* (3KOOMNN(
 (
$ #,>#3#3  55'77!IJJ$)$8$8$8!$F333%n..{'DEEH%*&%9%9%9"%*&%9%9%9"-.*-.*'($'($$T[__5V%W%WXX!,N"
 "
  +>>::!)%)[__5W%X%X"!01GQU!V!V!V%%%%F###,];;. 	+D11OA|!!#&&**<3J3J33O3O* *'* *   $(y(FGGIi..... 	J 4< H H HIIIh%% 	,++(++H .HJJ$-22222#}@@JJ#)>#3#33	DL 	D6CD !23 .'>RSSS
 
  ""Q&&$6&8% % %DN5!
 y  (,y(ADN9%%$'' 	1N!!(++++!(0DN9%DN**(:-C) ) )DN9% *5tyIII	  '%^"3#'(/7tG}}}R)<DD$%%%" 	  %!%1W!X!X"4
dk
48
 T/
 	

 4+
 55mDD
 
 H
 4+
 $-
 t'
  9
 
 DI
 
  T/!
" t3#
$ /!/ $ 5"1 $ 5!%!71
 
 
8 Z"::;MNN $ < <=Q R R 889IJJ55mDD#;;<OPP!99:KLL#;;<NOO$($D!7
 
 		
+ 0+	
 	
 	
 	
 	
  	Dkoo&BCCG 	N;??+LMML"""'''"3'<$2F
 
 
  	O">4;MNNNC ^NN!dkoo6I4&P&P!![__-@AANN#.|<<NN.n/MNNN-33E::#$$))+A.OOO!1%#9"7"7
 
 
 )-%+/;??0,
 ,
( ,80==,/#03SS- -) $)LL*G#9*G)E
 
 
 	D%(((.11/i 
  
 #22Dd"S"S"Sdnd.E.LMM/1,MMMMuUU02-d/6624/'+J
 
 
 
9C9I9I9K9K
 
 
 	0111;???@@ 	:'6 EFFPT( ( ($ "$"68PSW8WXXB13D#I.!$"46Lt6STTB79D#O4{'MNNH"8,,##{/TUU*+;<<##{/TUU 0 	1,q04DDDHL##$/8Q $    !d#####r   ro   r  r   MutableMapping[Key, object]c                    | j         j        S )a  {task key: task payload} of all completed tasks, whether they were computed
        on this Worker or computed somewhere else and then transferred here over the
        network.

        When using the default configuration, this is a zict buffer that automatically
        spills to disk whenever the target threshold is exceeded.
        If spilling is disabled, it is a plain dict instead.
        It could also be a user-defined arbitrary dict-like passed when initialising
        the Worker or the Nanny.
        Worker logic should treat this opaquely and stick to the MutableMapping API.

        .. note::
           This same collection is also available at ``self.state.data`` and
           ``self.memory_manager.data``.
        )r  r   r~  s    r   r   zWorker.dataW  s    " "''r   transfer_incoming_bytes)target*transfer_incoming_bytes_throttle_thresholddata_neededtransfer_incoming_count_totalin_flight_tasks_countrv  rt  set[TaskState]c                    t          j        dt                     d | j        j                                        D             S )Nz\The `Worker.data_needed` attribute has been removed; use `Worker.state.data_needed[address]`c                    h | ]	}|D ]}|
S r   r   )r  tsstss      r   	<setcomp>z%Worker.data_needed.<locals>.<setcomp>  s%    LLLsLL"LLLLr   )r  r  FutureWarningr  r  valuesr~  s    r   r  zWorker.data_needed  sF    6	
 	
 	

 MLdj4;;==LLLLr   c                h    t          j        dt                     t          | j        j                  S )Nz_The `Worker.waiting_for_data_count` attribute has been removed; use `len(Worker.state.waiting)`)r  r  r  r  r  waitingr~  s    r   waiting_for_data_countzWorker.waiting_for_data_count  s3    .	
 	
 	

 4:%&&&r   c                h   | j         | j        k    r
d| j          nd}d| j        j         d| j        | d| j        j          dt          | j                   d| j        j         d| j        j	         d	t          | j        j
                   d
| j        j         dt          | j        j                   dS )Nz, name:  < z
, status: z
, stored: z, running: r  z	, ready: z, comm: z, waiting: >)r  address_safe	__class__r   r   r  r   r  executing_countr  readyr  r  )r   r  s     r   __repr__zWorker.__repr__  s    )-d6G)G)G%$)%%%R3' 3 3$*; 3t 3 3{'3 349~~3 3 
23 3 6:Z5H3 3 $**++	3 3
 Z53 3 DJ.//3 3 3	
r   c                    | j         j        S N)_deque_handlerr   r~  s    r   logszWorker.logs  s    "((r   topicstr | Collection[str]msgNonec                   t          |          s t          dt          |          d          d||d}| j        t	          j                    k    r|                     |           dS | j                            | j        |           dS )a  Log an event under a given topic

        Parameters
        ----------
        topic : str, list[str]
            Name of the topic under which to log an event. To log the same
            event under multiple topics, pass a list of topic names.
        msg
            Event message to log. Note this must be msgpack serializable.

        See also
        --------
        Client.log_event
        z4Message must be msgpack serializable. Got type(msg)=z	 instead.z	log-event)r{  r  r  N)	rN   	TypeErrortype	thread_idr  	get_identr}  r   r   )r   r  r  full_msgs       r   r   zWorker.log_event  s     C   	RT#YYRRR   
 

 >Y02222h'''''I""4#4h?????r   c                    | j         S )z For API compatibility with Nanny)rs  r~  s    r   worker_addresszWorker.worker_address  s     |r   c                    | j         d         S )NrL  )r  r~  s    r   r  zWorker.executor  s    ~i((r   valuer9   c                   | j         }t          j                             | |           dt                       }|                     |           |t
          j        k    r5|t
          j        k    r%|                     t          |                     dS |t
          j        k    r?|t
          j	        t
          j
        fv r'|                     t          |                     dS dS dS )zrOverride Server.status to notify the Scheduler of status changes.
        Also handles pausing/unpausing.
        zworker-status-change-stimulus_idN)r   rJ   __set__rI   _send_worker_status_changer9   runninghandle_stimulusr}   pausedclosing_gracefullyr   )r   r  prev_statusr  s       r   r   zWorker.status  s    
 k!!$...6dff66''444&.((Ufn-D-D  !D!D!DEEEEEfn$$M%9
 *
 *
   +!F!F!FGGGGG	 %$ *
 *
r   r  c                L    |                      d| j        j        |d           d S )Nrm  )r{  r   r  )r}  _statusr  )r   r  s     r   r  z!Worker._send_worker_status_change  s<    ,,+* 	
 	
 	
 	
 	
r   c                  K   	 | j         j        \  }}n# t          $ r d\  }}Y nw xY w| j                            d          }|r|                                 t          t                    }| j        	                                D ]K\  }}t          |t                    r!|d         dk    r|d d         |dd          z   }||xx         |z  cc<   L| j                                         t          | j        j                            d          | j        t          | j                  t'          t(          | j                  d	t          |          | j        j        ||d
| j        j        | j        j        | j        j        | j        | j        | j        d| j                  }| j                                        }|	                                D ]A\  }}d|v r3|                     d          \  }	}
}||!                    |	i           |<   <|||<   B| j"        	                                D ]N\  }}	  ||           }tG          |          r| d {V }|!                    ||           ?# tH          $ r Y Kw xY w|S )NrG  r   r   executer   rA  F)	by_prefix)totalworkerstypes)memorydisk)incoming_bytesincoming_countincoming_count_totaloutgoing_bytesoutgoing_countoutgoing_count_total)task_countsr   digests_total_since_heartbeatmanaged_bytesspilled_bytestransferevent_loop_interval.)%r   spilled_totalAttributeErrorr  r  collect_digestsr   r   r  r  r  tupleclearr  r  task_countercurrent_countr   r   r   r(   r   nbytesr  transfer_incoming_countr  r   r   r   _tick_interval_observedmonitorrecent	partition
setdefaultr  r   r   )r   spilled_memoryspilled_disk	spans_extdigestskvoutmonitor_recentk0r  k1metricresults                 r   get_metricszWorker.get_metrics  s     	0+/9+B(NLL 	0 	0 	0+/(NLLL	0 261D1DW1M1M	 	(%%''' 1<E0B0B6<<>> 	 	DAq!U## "!	(9(9bqbEAabbEMAJJJ!OJJJJ*00222
/===NN 677$*>?? 
 +/w--*+($ 
 #'*"D"&*"D(,
(P"&">"&">(,(J  !% <+
 
 
0 ,,.."((** 	 	DAqaxxKK,,	Ar-.r2&&r**A++-- 	 	IAvv&& *#)\\\\\\Fq&))))    
s    &&38I,,
I98I9c                   K   i }| j                                         D ]=\  }}	  ||           }t          |          r| d {V }|||<   .# t          $ r Y :w xY w|S r  )r  r  r   r   )r   r*  r#  fr$  s        r   get_startup_informationzWorker.get_startup_informationJ  s      ,2244 	 	DAqAdGGq>>  Aq		    s   'A
AAc                |    t          |           j        | j        | j        j        | j        j        | j        j        dS )N)r  idr  r  r*  )	r  r   r0  r  rs  r  r  r  r*  r~  s    r   identityzWorker.identityW  s;    JJ''/
+ /<
 
 	
r   excluder3  Container[str]c                  t                                                    }| j        |                                 t          j        j        | j        | j        d}fd|                                D             }|	                    |           |	                    | j
                                                 |	                    | j                                                 t          |          S )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Worker.identity
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        r2  )r   r  r  r   r   c                $    i | ]\  }}|v	||S r   r   )r  r#  r$  r3  s      r   r  z#Worker._to_dict.<locals>.<dictcomp>r  s)    DDD$!Q1G3C3CA3C3C3Cr   )super_to_dictr   rc  r  r  r   r   r  r  r  r  r`   )r   r3  infoextrar  s    `  r   r8  zWorker._to_dict`  s     ww00kMMOOk(%)%?%)%?
 
 EDDD%++--DDDEDJ'''88999D'000AABBB w7777r   c                    | j         rF| j         j        r<| j         j                                        s | j                             |           dS dS dS dS )aJ  Implements BaseWorker abstract method.

        Send a fire-and-forget message to the scheduler through bulk comms.

        If we're not currently connected to the scheduler, the message will be silently
        dropped!

        See also
        --------
        distributed.worker_state_machine.BaseWorker.batched_send
        N)r  commr   send)r   r  s     r   r}  zWorker.batched_send|  sv     	*#(	* ',3355	*
 $$S)))))	* 	* 	* 	* 	* 	*r   c           
        K    j         d                                           j         d                                          t                      } j         j         _        t
                              d            j        rJ  j        j	        rJ 	 	 t                      }t           j        j        fi  j         d {V }d|_        t          j                   |_        |                    t%          d+i dddd	d
 j        d j        j        d j        j        d j        dt                      d j        j        d j        j        d j        d j        d j        dt7          j                    dt;                      d                                  d {V d                                  d {V ddt                       d j         dg           d {V  |!                    dg          }| d {V }|"                    d          r t
          #                    |d                    t                      }||z   dz  } $                    ||z
             |d          |z
   _%        nv# tL          $ rB t
                              d! j        j                   tO          j(        d"           d {V  Y n)tR          $ r t
                              d#           Y nw xY wd|d         d$k    rd|*                                 d {V  d%|v r|d%         ntW          |          }t
          ,                    d&|            t[          d'|           j.        /                    |           t`          j1         _        tO          j2         fd(|d)         3                                D               d {V  t
                              d* j        j                   t
                              d            j         d         /                                  j         d         /                                  j4        5                     j6        |           d S ),Nr|  ry  1-------------------------------------------------TzWorker->Schedulerr{  zregister-workerreplyFrs  r   r  r  nowr  r*  r  r  r   pidre  r  r:  r  zworker-connect-	server_idmsgpackserializersdeserializerswarningrA  rI   Waiting to connect to: %26sg?z&Timed out when connecting to schedulerOKmessagez Unable to connect to scheduler: z#Unexpected response from register: c              3  L   K   | ]\  }}                     ||           V  dS ))r  pluginNr  )r  r  rN  r   s      r   	<genexpr>z2Worker._register_with_scheduler.<locals>.<genexpr>  sK         D& T&99     r   zworker-pluginsz        Registered to: %26sr   )7r  stoprI   r   rs  r   r9  r   r  tasksr/   r  r  r  weakrefref_serverwriter  r   r  total_resourcesr  r*  r  service_portsr   r   getpidrk   r+  r.  r0  readr  rI  _update_latencyr  OSErrorasynciosleeprU   r   reprerrorr  r  startr9   r  rX  r  r   r   handle_scheduler)	r   ra  _startr<  futureresponse_endmiddler  s	   `        r   _register_with_schedulerzWorker._register_with_scheduler  sG     -22444,11333'#'<D H 9:####,	F+F$T^%;TTt?STTTTTTTT/	&{400jj   ,,#e !% 4 4  ${//	
 "&!4!4 "YY !FFF #'*"<"< &*%8%E%E )-(<(< "&!3!3 #jj IKKK ". '+&6&6&8&8 8 8 8 8 8 8 8  %)$@$@$B$BBBBBBBB!" %?dff$>$>$>#$ #'''%( "++ !         . )==!'<<<<<<<<	** 8NN8I#6777vv 4-1,$$TE\222'/'7&'@$ ) ) )94>;QRRRmC((((((((((( F F FDEEEEEFW,	FZ H%%**,,)2h)>)>(9%%DNNCLLACAABBBO8OOPPP!!$'''nn   $,-=$>$D$D$F$F  
 	
 	
 	
 	
 	
 	
 	
 	14>3IJJJH-33555,22444	t4d;;;;;s   G,J A	K4#K43K4c                \    |dz  | j         dz  z   | _         |                     d|           d S )N皙?ffffff?r   )r   digest_metric)r   r   s     r   r[  zWorker._update_latency  s5    ~t(;;9g.....r   c           
        K   t                               d j                   	 t                      t	           j        j         j                                          d {V  fd j	        D             d  j
                                        D                        d {V }t                      }|z   dz  }                     |z
             |d         dk    rAt                               d j        d	                                d
           d {V  d S |d         |z
   _        |d         dz   j        d         _         j                                          j                                         d S # t*          $ r t                               d           Y d S t.          $ r6 t                               d                                             d {V   w xY w)NzHeartbeat: %sc           	         i | ]>}|j         j        v |t          t          j         j        |         j                  z
  ?S r   )r  rR  r   r   
start_time)r  keyr   ra  s     r   r  z$Worker.heartbeat.<locals>.<dictcomp>  sQ       dj... eTZ-=c-B-M!N!NN...r   c                ^    i | ]*\  }}t          |d           ||                                +S )ry  )hasattrry  )r  r  r  s      r   r  z$Worker.heartbeat.<locals>.<dictcomp>  sJ       'iy+66)--//  r   )rs  rA  r  	executingr  rA  r   missingz%Scheduler was unaware of this worker z. Shutting down.F)r   rI   zheartbeat-intervalrx  ry  z6Failed to communicate with scheduler during heartbeat.z6Unexpected exception during heartbeat. Closing worker.)r   debugrs  rI   rh   r  heartbeat_workerr   r+  r   r  r  r[  r`  r   r  r  callback_timer   r  r   r\  r   r   )r   re  endrg  ra  s   `   @r   ry  zWorker.heartbeat  sd     _dl333,	FFE,/,"..00000000    #/  
 +/?+@+@+B+B          H" &&CckQ&F  u---!Y..\DL\\\   jjuj---------#+F#3f#<D -.5 #K0> "((*** &&((((( 	W 	W 	WUVVVVVV 	 	 	UVVV**,,	s   C<F $AF $G)*?G)r<  r.   c                   K   	 |                      |           d {V  |                     d           d {V  d S # |                     d           d {V  w xY w)Nz)worker-handle-scheduler-connection-brokenr  )handle_streamr   )r   r<  s     r   rb  zWorker.handle_scheduler  s      	Q$$T***********$O*PPPPPPPPPPP$**$O*PPPPPPPPPPs	   = A	list[Key]c                *    t          | j                  S r  )listr   r~  s    r   rd  zWorker.keys  s    DIr   who_hasdict[Key, list[str]]r   c                J   K    fd|D             }g }t                      }dt                       }|ri }|D ]7}t          ||                   |z
  }|r|||<   "|                    |           8|sn~t          | j         j                   d{V \  }	}}
}                     |	|           ~	||
z  }|                    |           |r!t           j	        j
        |           d{V }||r-t                              d|           dt          |          d	S d
diS )z@Endpoint used by Scheduler.rebalance() and Scheduler.replicate()c                &    g | ]}|j         v|S r   r   r  r#  r   s     r   
<listcomp>z!Worker.gather.<locals>.<listcomp>  s%    AAAaaty.@.@.@.@.@r   zgather-)r~  r>   whoNr  )rd  zCould not find data: %szpartial-fail)r   rd  r   rK  )r  rI   appendrf   r>   rs  r\  r  rh   r  r~  r   r`  r}  )r   r~  missing_keysfailed_keysmissing_workersr  	to_gatherr#  r   r   new_failed_keysnew_missing_workerss   `           r   rX  zWorker.gather  s     AAAA7AAA$'EE((( 	I! * *gaj//O; *#*IaLL&&q))))  *!txT\        # T{;;;?*K""#6777  /N*! ! !      3  	:  	$LL2K@@@,d;6G6GHHHd##r   r   r  ra  c                $   t          |r| j                                        n| j                            |          | j        j        | j        j                  }t          j                    dk    r| j        j        |d<   | j        j	        |d<   |S )N)ra  )range_queryr  	last_timer   gpu_namegpu_memory_total)
r  r  r  r  r  r  r@   r  r  r  )r   r  ra  r*  s       r   rh  zWorker.get_monitor_infoG  s     ;##%%%\--E-::,$l,
 
 
  ""Q&&!%!6F:)-)FF%&r   c           
     
   K   t                                                       d {V  t                       t           j                  }|D ]}t           j        | j         j         j	                  } j	        
                    d          } j        dv r=|                                }t          t           j        j                            |d<   	   j        |fi | d {V  | _         n_# t$          $ r3}t'          |          dk    r|j        t(          j        k    rY d }~߂ d }~ww xY wt-          d j         d j                   t/           t0          j                            d           j        	          }                     | j                    j        rb	 d
d l}|j         j!        "                     j#         j$          j                   n*# tJ          $ r tL          '                    d           Y nw xY wt           j                   _(         j)         j         _)         j*        +                                 d {V   ,                     j(                   	 d j-        j.         j(         j/        fz  }n%# t`          $ r  j-        j.          j(         }Y nw xY wtL          1                    d j                   tL          1                    d|            j)         j2        k    r tL          1                    d j)                    j3        4                                D ]K\  }	}
tL          1                    d5                    |	 j(        dz   tm          |
          z                        LtL          1                    d j        j                   tL          1                    d           tL          1                    d j7        j8                    j9        j:        r2tL          1                    dtw           j9        j:                             tL          1                    d j<                   t{          d j        z             t}          j?         fd j@        D             ddi d {V }d |D             }t'          |          dk    rat'          |          dk    rFtL          A                    d           |D ])}tL          A                    t          |                     *|d
         d _@         j         j7        _         C                                 d {V   D                                  S )N)r"  r#  r!  r$  r  rP  )tcptlsdefault_hostr   zCould not start Worker on host z with port zdistributed.worker.http.routes)servermodulesprefixr   )r  z4To start diagnostics web server please install Bokehz%s%s:%dz      Start worker at: %26sz         Listening to: %26sz          Worker name: %26sz  {:>16} at: {:>26}rM  rJ  r?  z              Threads: %26dz               Memory: %26sz      Local Directory: %26szdask worker [%s]c              3  F   K   | ]}                     |d           V  dS )F)rN  catch_errorsNrO  r  rN  r   s     r   rP  z&Worker.start_unsafe.<locals>.<genexpr>  sG         vEBB     r   return_exceptionsTc                <    g | ]}t          |t                    |S r   )r  r   )r  r  s     r   r  z'Worker.start_unsafe.<locals>.<listcomp>  s'    XXXcZY=W=WXcXXXr   zVMultiple plugin exceptions raised. All exceptions will be logged, the first is raised.r   )Er7  start_unsaferj   r_   r   r3   r   r   r   r  get_listen_argscopyrW   r0   r  rs  listen_start_addressr\  r  errno
EADDRINUSEr  rF   r  r  r  r   start_http_serverr   r   distributed.dashboard.workerr&  rP  r/   http_applicationhttp_serverImportErrorr   ru  ipr  r   ra  start_serviceslistenerr  r#  r   r9  r  rX  r  formatr   r  r  r  r*  r"   r  rK   r]  rX  r  r`  r_  rh  start_periodic_callbacks)r   portsr#  start_addressr   r   routesr   listening_addressr#  r$  plugins_msgsplugins_exceptionsexcr  s   `             r   r  zWorker.start_unsafeZ  s     gg""$$$$$$$$$D,-- 	 	D2%/  M ]228<<F~//)/$T^%;<<* *~&	!dk-::6::::::::: '4#    u::>>ag1A&A&AHHHH	 0$2B 0 0!-0 0   KOO$DEE$
 
 

 	vt'>???? 	
3333 %,44)$,	 5      U U USTTTTTU #4<009DIm!!#########
 	DG$$$	C )T]-A47DI,V V 	C 	C 	C#'=#7 B B B	C 	14<@@@13DEEE9)))KK5tyAAA&,,.. 	Q 	QDAqKK-44Q#A8NOOPPPP14>3IJJJH14:3FGGG+ 	KK-T0=>>   	143GHHH'$,6777$^   "3  

 #
 
 
 
 
 
 
 
 YX\XXX!""a''%&&**l   . , ,CLLc++++$Q'' "!\
++---------%%'''sB   C77
D4(D/.D//D4-G$ $$H
H4J J32J3r   Tworker-closerr  r   r   c                
   K    j         t          j        t          j        t          j        fv r=t
                              d j         |                                             d{V  dS  j         t          j        k    rd}t                       	  
                     j        d|d           n*# t          $ r t
                              d           Y nw xY w	 t
                              d j        |           n+# t          $ r t
                              d|           Y nw xY w j         t           vr t
                              d	 j                    |st
                              d
           t          j         _         t#          d           |rU j        rN                      j                  5 }|                    |           d{V  ddd           n# 1 swxY w Y   t+          j                    d{V   fd j                                        D             }t3          j        d |D               d{V   j                                        D ]=}t9          |d          r+|                                }t;          |          r| d{V  >                                   j                                          d{V   j!                                        D ]}	|	"                                  j#        rqtI           fdtJ          j&        D                       sLtJ          j'        D ]?}
d|
_(        |
j)        r|
                                 d{V  +|
                                 @ *                                 d{V   j                                         d{V   j+        dk    rt3          j,        d           d{V   -                    ddi            j.        rZt_          t`                    5   j.                            tc                               d{V  ddd           n# 1 swxY w Y    j2                                        D ]}|tf          j4        u rfd}tk                      r ||d           2	 t3          j6        |||           d{V  Q# tn          $ r- t
          8                    d|d            |||           Y w xY w "                                 t          j         _         t#          d           ts          j                    d{V   j:        ;                    ddd           dS )a|  Close the worker

        Close asynchronous operations running on the worker, stop all executors and
        comms. If requested, this also closes the nanny.

        Parameters
        ----------
        timeout
            Timeout in seconds for shutting down individual instructions
        executor_wait
            If True, shut down executors synchronously, otherwise asynchronously
        nanny
            If True, close the nanny
        reason
            Reason for closing the worker

        Returns
        -------
        str | None
            None if worker already in closing state or failed, "OK" otherwise
        z8Attempted to close worker that is already %s. Reason: %sNFzclosing-worker)actionr   zFailed to log closing eventz!Stopping worker at %s. Reason: %szStopping worker. Reason: %sz%Closed worker has not yet started: %sz Not waiting on executor to closezdask worker [closing]r  rq  c                Z    g | ]'}t          |d           |                              (S )teardown)rr  r  r  s     r   r  z Worker.close.<locals>.<listcomp>  sE     
 
 
vz**
OOD!!
 
 
r   c              3  8   K   | ]}t          |          |V  d S r  r   )r  tds     r   rP  zWorker.close.<locals>.<genexpr>"  s-      IIbRIrIIIIIIr   r   c              3  D   K   | ]}|k    |j         t          v |V  d S r  )r   WORKER_ANY_RUNNING)r  wr   s     r   rP  zWorker.close.<locals>.<genexpr>4  sE        99-?!?!? !?!?!?!? r   Tucxg?r{  zclose-stream)secondsc                    t          | t                    r7| j        j                                         |                     |           d S |                     |           d S )N)waitrr  )r  )r  rS   _work_queuequeuer  shutdown)r  r  rr  s     r   _closezWorker.close.<locals>._close[  sj    h(:;; 1(.44666%%4%AAAAA%%4%00000r   )r  r  zKCould not close executor %r by dispatching to thread. Trying synchronously.r   zdask worker [closed]rK  )<r   r9   r   r   failedr   ru  finishedinitri   r   rs  r   r   r9  r  r  rK   r   r>   r  rq   r   r  r  r]  rX  r  rr  r   stop_servicesr   r  r  rQ  r   anyr   r   r   _asynchronousasynchronous_stop_listenersr   r^  r}  r  r   rU   r   r  r+   r  rZ   	to_threadRuntimeErrorr`  rJ   r  __exit__)r   rr  r   r   r   r	teardownsr  r*  r  cr  r  s   ``           r   r   zWorker.close  s     B ;6=&.&-HHHLLJ  
 --//!!!!!!!4;&+%% E	<NN4<4DPV)W)WXXXX 	< 	< 	<:;;;;;	<	?KK;T\6RRRR 	? 	? 	?KK5v>>>>>	?;000KK?MMM 	<KK:;;; n,--- 	8TZ 	8$*%% 8(((7777777778 8 8 8 8 8 8 8 8 8 8 8 8 8 8 tW5555555555
 
 
 
,--//
 
 
	
 nIIIIIIJJJJJJJJ//11 	! 	!Iy'** !"**v&& ! LLLLLLLm$$&&&&&&&&&)0022 	 	BGGIIII< 	"     *     "
  4 " "A
 '+AO~ "ggii
 				""$$$$$$$$$hnn >U""-$$$$$$$$$40111 	L,'' L L)//	'0J0J0JKKKKKKKKKL L L L L L L L L L L L L L L --//  	  	H52221 1 1 1 1 '((  u55555!+           $   LLe !% !   
 F!)      			m+,,,t$$$$$$$$$""4t444tsZ   B1 1$CC!C> >%D&%D&6GG#&G#/QQ
Q
R;;4S21S2worker-close-gracefullyc                  K   | j         t          j        t          j        fv r|                                  d{V  | j         t          j        k    rdS t                              d| j        |           | j	        
                    | j        gdddt                                  d{V  || j        }|                     | |           d{V  dS )zGracefully shut down a worker

        This first informs the scheduler that we're shutting down, and asks it
        to move our data elsewhere. Afterwards, we close as normal
        Nz)Closing worker gracefully: %s. Reason: %sFTzworker-close-gracefully-)r   close_workersremover  )r   r   )r   r9   r   r  r  r   r   r9  rs  r  retire_workersrI   r   r   )r   restartr   s      r   r  zWorker.close_gracefully  s      ;6>6+DEEE--//!!!!!!!;&-''F?vVVV n++\N;466;;	 , 
 
 	
 	
 	
 	
 	
 	
 	
 ?+Gjj7{6j:::::::::::r   c                   K   t          j        d           |                                  d {V  | j        t          j        k    sJ d S )Nz)wait_until_closed has moved to finished())r  r  r  r   r9   r   r~  s    r   wait_until_closedzWorker.wait_until_closed  sR      ABBBmmoo{fm++++++r   c                      j         vrAt          d j                   j         <    fd} j                            |            j                                      |           d S )N1msrV  c                    K   t          fi j         d {V } d| _        |                     ddi           d {V                      |            d S )NzWorker->Workerr{  connection_stream)r/   r  r  rV  ra  )r<  rs  bcommr   s    r   batched_send_connectz3Worker.send_to_worker.<locals>.batched_send_connect  s      $ #3        -	jj$(;!<=========D!!!!!r   )r	  r,   r   _ongoing_background_tasks	call_soonr=  )r   rs  r  r  r  s   ``  @r   send_to_workerzWorker.send_to_worker  s    $+++TY???E).Dg&" " " " " " " *445IJJJ'"'',,,,,r   zget-datard  Collection[str]r  rF  (GetDataBusy | Literal[Status.dont_reply]c                   K    j         }t          |j                  t           j                  k    r|dz  } j        t
          j        k    rd}d}nd}|dur8 j        |k    r-t          	                    d j        | j        ||           ddiS  xj        dz  c_         xj
        dz  c_
         fd	|D             }t          |          t          |          k     rot          |          |                                z
  D ]J}| j        j        v r:d
dlm}	  |	t%           j        j        |                    j        |           ||<   Kdd |                                D             d}
 fd|D             }t)          |                                          } xj        |z  c_         xj        |z  c_        	 t1          j        dt4                    5 }|                    |
|           d {V }|                    |           d {V }d d d            n# 1 swxY w Y   |dk    s
J |            nD# t:          $ r7 t                              d j        |           |                                  w xY w	  xj        |z  c_         xj        dz  c_        n%#  xj        |z  c_         xj        dz  c_        w xY wtA          d|j!        |j"        z
            } j#        $                    |j"         j%        z   |j!         j%        z   |j"        |j!        z   dz  |||||||z  d	           t
          j&        S )NrA  r   z= Throttling outgoing data transfers because worker is paused.r  FzUWorker %s has too many open connections to respond to data request from %s (%d/%d).%sr   busyc                >    i | ]}|j         v |j         |         S r   r  r  s     r   r  z#Worker.get_data.<locals>.<dictcomp>  s(    @@@Adi49Q<r   r   ActorrP  rK  c                4    i | ]\  }}|t          |          S r   )rM   r  r#  r$  s      r   r  z#Worker.get_data.<locals>.<dictcomp>  s$    'T'T'Ttq!<??'T'T'Tr   )r   r   c                D    i | ]}|j         j        |         j        pd S )r   r  rR  r  r  s     r   r  z#Worker.get_data.<locals>.<dictcomp>  s-    KKK!TZ-a07<1KKKr   networkfuncrE  rG  z$failed during get data with %s -> %srJ  )	ra  rQ  rg  durationr  rd  r  
compressedr   )'r   r0   peer_addressrs  r   r9   r  r   r   ru  r   r  r  rd  r  actorsdistributed.actorr  r  r  sumr  r   r   rG   meterrI   rV  rZ  r\  r   abortmaxrQ  ra  r   r  r  
dont_reply)r   r<  rd  r  rF  max_connectionsthrottle_msgr   r#  r  r  bytes_per_tasktotal_bytesmr  re  r  s   `                r   r[  zWorker.get_data  s>      <D-..2B4<2P2PPP-1O;&-''OO L L 5((,??LL%,   f%%$$)$$**a/** A@@@@@@t99s4yy  YY,  
)))777777#eTZ.q122DL!D  DG 'T'Ttzz||'T'T'TUU LKKKdKKK.//1122$$3$$**k9**	.$YT::: Fa#'::c{:#K#KKKKKKK
!%!E!EEEEEEEF F F F F F F F F F F F F F F t###X#### 	 	 	6  
 JJLLL	 $ ((K7((((A-((( ((K7((((A-(((((( uafqw.//"))4#77!557QV+q0$&$((83
 
	
 	
 	
   sC    H= :H!H= !H%%H= (H%)H= <J# =AI>>J# #"Kc                    |dt                       }|                     t          ||                     d |                                D             ddS )Nzupdate-data-)r   r  c                4    i | ]\  }}|t          |          S r   )sizeofr  s      r   r  z&Worker.update_data.<locals>.<dictcomp>$  s$    AAADAq1fQiiAAAr   rK  )r  r   )rI   r  r   r  )r   r   r  s      r   r\  zWorker.update_data  s`    
 111K_$KPPPQQQAADJJLLAAATRRRr   c                T  K   |                                 D ]_\  }}|| j        j        v r.| j        j        |xx         || j        j        |         z
  z  cc<   n|| j        j        |<   || j        j        |<   `t	          | j        j        | j        j        | j                   d {V  d S )N)r  rP  )r  r  rW  available_resourcesrh   r  set_resourcesr   )r   r  r  quantitys       r   r
  zWorker.set_resources&  s      $??,, 	5 	5KAxDJ...
.q111tz9!<<1111 5=
.q1,4DJ&q))N(j0'
 
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
r   rN  WorkerPlugin | bytesr  ErrorMessage | OKMessagec                ~  K   t          |t                    rt          j        |          }t          |t                    st          j        dt          d           t          t          |          }|t          |          }|sJ || j
        v r|                     |           d {V  || j
        |<   t                              d|z             t          |d          rX	 |                    |           }t!          |          r| d {V }n)# t"          $ r}|s t%          |          cY d }~S d }~ww xY wdd	iS )
NzkRegistering duck-typed plugins has been deprecated. Please make sure your plugin subclasses `WorkerPlugin`.rA  rB  )r  zStarting Worker plugin %ssetupr  r   rK  )r  bytesrL   loadsrB   r  r  r  r   rC   r  r  r   r9  rr  r  r   r   r<   )r   rN  r  r  r*  r   s         r   r  zWorker.plugin_add6  s      fe$$ 	*\&))F&,// 	MJ"	    lF++<#F++D4<$$$$/////////#T/$677767## 	((T22v&& *#)\\\\\\F ( ( (# $Q''''''''(
 $s   $-D 
D8D3-D83D8c                F  K   t                               d|            	 | j                            |          }t	          |d          r-|                    |           }t          |          r| d {V }n&# t          $ r}t          |          cY d }~S d }~ww xY wddiS )NzRemoving Worker plugin r  r  r   rK  )	r   r9  r  poprr  r  r   r   r<   )r   r  rN  r*  r   s        r   r  zWorker.plugin_remove^  s      4d44555	$\%%d++Fvz** *55v&& *#)\\\\\\F 	$ 	$ 	$ ########	$ $s   AA9 9
BBBBr   c                    t           j        |         }|t           j        k    rF| j        t          vr8t
                              d| j        |           |                     |           d S || _        d S )Nz*Invalid Worker.status transition: %s -> %s)	r9   lookupr  r  r  r   r`  r  r   )r   r   r  
new_statuss       r   r  z"Worker.handle_worker_status_changel  st    ]6*
 &333$666LL<dlJ   ++K88888 %DKKKr   clstype[StateMachineEvent]Callable[..., None]c                6      fd}dj          d|_         |S )Nc                 B     di | }                     |           d S )Nr   )r  )r   eventr  r   s     r   r  z)Worker._handle_remote_stimulus.<locals>._  s.    CMM&MME  '''''r   z_handle_remote_stimulus())r   )r   r  r  s   `` r   r  zWorker._handle_remote_stimulus  s>    	( 	( 	( 	( 	( 	( @???
r   stimsr   c                    	  t                      j        |  dS # t          $ rC}t          |d          r-|                                \  }}|                     ||            d}~ww xY w)zOverride BaseWorker method for added validation

        See also
        --------
        distributed.worker_state_machine.BaseWorker.handle_stimulus
        distributed.worker_state_machine.WorkerState.handle_stimulus
        to_eventN)r7  r  r   rr  r   r   )r   r  r   r  r  r  s        r   r  zWorker.handle_stimulus  s{    	#EGG#U++++ 	 	 	q*%% +ZZ\\
suc***		s    
A(>A##A(rp  c                    | j         j        |         }|j         dk    t          |j                  || j         j        v p|| j         j        v || j        v dS )Nrs  )rs  waiting_for_dataheapr   )r  rR  r   r"  r  constrainedr   )r   rp  r  s      r   stateofzWorker.stateof  s^    Zc"[0 $R%8 9 9$***JbDJ4J.J49$	
 
 	
r   keys_or_stimuliIterable[str]list[tuple]c                $   K    | j         j        | S r  )r  story)r   r&  s     r   rl  zWorker.get_story  s      tz11r   Iterable[Key]r   c                    d}|D ]@}| j         j        |         }|j        r#t          t	          |j                            c S |}A|sJ |S )a  For diagnostics, we want to attach a transfer to a single task. This task is
        typically the next to be executed but since we're fetching tasks for potentially
        many dependents, an exact match is not possible. Additionally, if a key was
        fetched through acquire-replicas, dependents may not be known at all.

        Returns
        -------
        The task to attach startstops of this transfer to
        N)r  rR  
dependentsnextiter)r   rd  causerp  r  s        r   
_get_causezWorker._get_cause  sd      	 	C!#&B} 1D//00000EEr   rQ  r0  rP  c                r    t           fd|D                       }|j                            d| j        z   | j        z   |d           t	          d||z
            }||z  } j                            | j        z   | j        z   ||z   dz   j        z   | fd|D             |||d           |dk    r j        d	z  |d
z  z    _         j        |         \  }	}
|	|z   |
dz   f j        |<   t          t          t          |                                                    }t          |          dk    r&|\  } j        |         \  }	}
|	|z   |
dz   f j        |<                        d||z                                  d|            j        d                             t          |                     d S )Nc              3  `   K   | ](}j         j        |                                         V  )d S r  )r  rR  
get_nbytesr  rp  r   s     r   rP  z7Worker._update_metrics_received_data.<locals>.<genexpr>  s9      MM$**3/::<<MMMMMMr   r  )r  ra  rQ  sourcerJ  g       @c                @    i | ]}|j         j        |         j        S r   r  r5  s     r   r  z8Worker._update_metrics_received_data.<locals>.<dictcomp>  s(    KKKsdj.s3:KKKr   )ra  rQ  rg  r  rd  r  r   r  i@B rk  rj  r   ztransfer-bandwidthztransfer-durationztransfer-count)r  
startstopsr  r  r  r   r   r   r  mapr  r  r  r   rl  countersr  )r   ra  rQ  r   r0  rP  r  r  r   bwcntr  typs   `            r   _update_metrics_received_dataz$Worker._update_metrics_received_data  s    MMMMMMMMM$!55t33 	 	
 	
 	
 udUl++(*	"))!55t33 4<3.1EE$KKKKdKKK$&	 		
 	
 	
 ""!^d2Y5EEDN,V4GB.09ncAg-FD"6*D$++--0011E5zzQ.s3C-/)^S1W,E$S)/x1GHHH.999&'++CII66666r   r  Collection[Key]total_nbytesc          	     ^  K   | j         t          vr4t          j        t	          d          ||dt                                 S | j        j                            d|||t                      f           t          
                    dt          |          |           	 t          j        dt
                    5 }t          | j        ||| j                   d	{V }d	d	d	           n# 1 swxY w Y   |d
         dk    rQ| j        j                            d|||t                      f           t#          ||dt                                 S |d
         dk    sJ |                     |          }|                     |j        |j        |d         ||           | j        j                            d|t-          |d                   |t                      f           t/          |||d         dt                                 S # t0          $ ro t                              d|           | j        j                            d|||t                      f           t5          ||dt                                 cY S t6          $ r}t                              |           | j        j                            d|||t                      f           | j        rt:          rdd	l}	|	                                 t          j        |||dt                                 cY d	}~S d	}~ww xY w)zImplements BaseWorker abstract method

        See also
        --------
        distributed.worker_state_machine.BaseWorker.gather_dep
        zWorker is shutting downworker-closing-)rP  r@  r  zrequest-depzRequest %d keys from %sr  r  )r>   rd  rP  r  Nr   r  zgather-dep-busyzgather-dep-busy-rK  r   )ra  rQ  r   r0  rP  zreceive-depzgather-dep-success-)rP  r@  r   r  z+Worker stream died during communication: %szgather-dep-failedzgather-dep-network-failure-r   zgather-dep-failed-) r   r  rz   from_exceptionr  rI   r  logr  r   ru  r  rG   r  get_data_from_workerr>   rs  ry   r1  r>  ra  rQ  r  r|   r\  r   r{   r   r  LOG_PDBpdb	set_trace)
r   rP  r  r@  r  r  re  r0  r   rG  s
             r   
gather_depzWorker.gather_dep  s      ;000 )7677)6dff66	    	
}fidffUVVV.IGGG@	$YT::: a!5yT\" " "                    
 !V++
%%&	;O   *!!- ;466 ; ;    H%----OOI..E..gVf% /    JN!!HV,<(=(={DFFS   ))f%:$&&::	     		 		 		JFSSSJN!!$fidffM   0)B$&&BB       	 	 	 QJN!!$fidffM   "  w  


(7)999	        	sR   H
 :$C*H
 *C..H
 1C.2AH
 B7H
 
A6L,	L,BL'!L,'L,c                z   K   t          j        d           d{V  t          |dt                                 S )zWait some time, then take a peer worker out of busy state.
        Implements BaseWorker abstract method.

        See Also
        --------
        distributed.worker_state_machine.BaseWorker.retry_busy_worker_later
        g333333?Nzretry-busy-worker-rP  r  )r]  r^  r   rI   )r   rP  s     r   retry_busy_worker_laterzWorker.retry_busy_worker_laterK  sS       mD!!!!!!!!!#'DDFF'D'D
 
 
 	
r   r	   c                2    t          j        | ||           dS )zBImplement BaseWorker.digest_metric by calling Server.digest_metricN)rJ   rl  )r   r  r  s      r   rl  zWorker.digest_metricX  s     tU33333r   c                    |                      t          dt                                            | j        d         j        | j        d         _        d S )Nzfind-missing-r  ry  r  )r  rw   rI   r  rw  r~  s    r   r  zWorker.find_missing\  s[    -:R$&&:R:RSSSTTT AE@WA

 	/===r   c                ,    t          | |||||          S N)functionr   r   r  rY  )r   r<  rQ  r   r  r   s         r   rY  z
Worker.runi      4tFQUVVVVr   c                ,    t          | |||||          S rP  rR  )r   r<  rQ  r   r   r  s         r   rZ  zWorker.run_coroutinel  rS  r   r   c                V  K   |pi }|                     dd          }|}| j        j        |         }t          ||          }t	          |          dz   |z   }	 t          |          rct                              |           }		  ||i | d {V }
t                              |	           n# t                              |	           w xY w|rH| j	        
                    | j        d         t          |||| j        || j        | j        	  	         d {V }
n]t                              |           }		  ||i |}
t                              |	           n# t                              |	           w xY wdt!          |
          dS # t"          $ r}dt!          |          dcY d }~S d }~ww xY w)	Nseparate_threadTr  rS  rK  r   r*  r`  r   r   )r  r  r  getattrr$   r[   _worker_cvarr  resetr   run_in_executorr  apply_function_actorr  r   r   rM   r   )r   rS  rQ  r   r   rV  rp  r  r  tokenr*  exs               r   rf  zWorker.actor_executeo  s	      2 **%6==
!#&uh''~~#h.	F"4(( .$((...#'4#8#8#8888888F &&u----L&&u----  .#y88N7+((',
  
  
 
 
 
 
 
 %((...!T42622F &&u----L&&u----"l6.B.BCCC 	F 	F 	F%L4D4DEEEEEEEE	FsO   )F B* F *CA'F .E 6F E--F 
F(F#F(#F(c                    	 t          | j        j        |         |          }dt          |          dS # t          $ r}dt          |          dcY d }~S d }~ww xY w)NrK  rW  r`  rX  )rY  r  r  rM   r   )r   rS  	attributer  r_  s        r   rg  zWorker.actor_attribute  s    	FDJ-e4i@@E"l5.A.ABBB 	F 	F 	F%L4D4DEEEEEEEE	Fs   14 
AAAAr    c               p  K   | j         t          vr t          |dt                                 S | j        j        |         }|j        }	 | j        j        r|j        rJ |j        dv s
J |            |j	        J |j	        \  }}}| 
                    |||          \  }}	|j        J |j                            dd          }
	 | j        |
         }n5# t          $ r( t          d|
dt!          | j                             w xY w| j                            |           d	|j        v r t'          j        |j        d	         
          nt+          j                    }|                                 	 t                      |_        t3          |          rrt4                              |           }	 t9          |||	| j                   d{V }t4                              |           n# t4                              |           w xY wdt?          tA          |                    v rdtC          j"        d          5  tG          |tH          |||	| j%        || j&        | j'        | j        
  
         d{V }ddd           n# 1 swxY w Y   n[tC          j"        d          5  | j(        )                    |tT          |||	| j                   d{V }ddd           n# 1 swxY w Y   | j        +                    |           |,                    ddd           n6# | j        +                    |           |,                    ddd           w xY w|d         | j-        |<   |d         dk    r| j.        ?t_          d|d         |d         z
            }| j.        d                             |           ta          |||d         |d         |d         |d         |d         dt                                 S |d         }tc          |td                    r%t          |j3        dt                                 S | j         th          j5        k    rltc          |tl          j7                  rRt3          |          rCtp          9                    d|d           t          |j3        dt                                 S tp          :                    d|t?          tw          |                    dd         ty          |d          t{          |	d          |d                     t}          j?        ||||d         |d         d!t                       "          S # t          $ r[}|j        d#k    rtp          A                    d$|d%&           t}          j?        |||d't                       (          cY d}~S d}~ww xY w))zExecute a task. Implements BaseWorker abstract method.

        See also
        --------
        distributed.worker_state_machine.BaseWorker.execute
        rB  )rp  r  )rs  	cancelledresumedNr  rL  zInvalid executor z; expected one of: span)re  rS   threadr{  task-finishedr   rQ  ra  ztask-durationr*  r  r  ztask-finished-)rp  run_idr  ra  rQ  r  r  r  actual-exceptionzreschedule-zAsync task z- cancelled during worker close; rescheduling.zcancelled-by-worker-close-zUCompute Failed
Key:       %s
Function:  %s
args:      %s
kwargs:    %s
Exception: %r
rx  max_lenexception_textztask-erred-)rp  rh  ra  rQ  r  rs  z#Exception during execution of task Tr   zexecute-unknown-error-)rp  rh  r  )Br   r  r   rI   r  rR  rh  r(  r"  run_spec_prepare_args_for_executionr   r  r  KeyErrorr  sortedr   r  r  annotater  nullcontext	__enter__ro  r[   rZ  r  apply_function_asyncr  r[  r   r  rG   r  ra   apply_functionr  r   r   r   r\  apply_function_simplediscardr  r   r"  r  rv   r  rE   rp  r9   r   r]  CancelledErrorr   r9  rI  r#   convert_args_to_strconvert_kwargs_to_strru   rC  r   r`  )r   rp  r  r  rh  rQ  r   r   args2kwargs2r  r   span_ctxr^  r*  r  task_excr  s                     r   r  zWorker.execute  s%      ;000 #s8R$&&8R8RSSSS Zc"\	z" M....x#HHHH"HHH;***%'["HdF!==b$OONE7>---~))*i@@HN8,    A A A(.t~(>(>A A     %%%
 R^++ 2>&#9::::+-- 
    04 $&x00 +(,,T22E2';$!# 0	( ( " " " " " " %**51111**51111)Sa\\99 ',Z88  'C*$!# 0 / 4 0( ( " " " " " "              " ',Z88  '+y'@'@1$!# 0( ( " " " " " "                ((---!!$d3333  ((---!!$d3333 &x 0DLd|..<+"1fVnvg&FGGHL155h???*! * /!(+ 9 9 9	 	 	 	 01H(J// W&26?UTVV?U?UVVVVv~--x)?@@ .'11 . V#VVV   ',Q,Q,Q    NN" HX&&''.#E4888%gt<<<'(   '5WoF^2$&&22     	 	 	 x;&&A#AA!     '5=TVV==	        	s   A:U C U 2DA-U 4<L 1G) L )H5L :8I>2L >JL JL 0KL KL !K"L %2U 3M

B(U 3AU 5B U 6BU 
V5AV0*V50V5r  r   r  ,tuple[tuple[object, ...], dict[str, object]]c                   t                      }i }|j        D ]c}|j        }	 | j        |         ||<   # t          $ r< ddlm}  |t          | j        j	        |                   | j
        ||           ||<   Y `w xY wt          ||t          t          t          f          }	t          ||t          t          t          f          }
t                      }||z
  dk    r|j                            d||d           |	|
fS )Nr   r  )	key_typesg{Gzt?z	disk-read)r  ra  rQ  )rI   dependenciesrp  r   ro  r  r  r  r  r  rs  rg   r  r   r  r8  r  )r   r  r   r   ra  r   depr#  r  r{  r|  rQ  s               r   rn  z"Worker._prepare_args_for_executionQ	  s     ? 	S 	SCAS)A,Q S S S333333%TZ%6q%9 : :DL!TRRQS $U0CDDDFDUC4GHHHvv%<%M  K%QU!V!VWWWg~s   3AA98A9c                8   t                      | j        z   }| j        t          j                    c}| _        | j                            ||f           | j                            |t          | j	                  f           | j	        
                                 d S r  )rI   r  r   r*   r  r   r  r   r  r   r  )r   rA  profs      r   r  zWorker.cycle_profilei	  s    fft++$($79I9I!d!##S$K000!((#tD4E/F/F)GHHH!!!!!r   c                  
 | j         sdS t                      }| j        5  | j                                         }ddd           n# 1 swxY w Y   t	          j                    

fd|D             
i }| j        rd |D             }
                                D ]\  }}|t          ||                   }|	                    |          }t          j        |d| j        d          }t          j        |d|           t          j        |d| j        |         d           t                      }	|                     d|	|z
             dS )zz
        Get a frame from all actively computing threads

        Merge these frames into existing profile counts
        Nc                "    i | ]}||         S r   r   )r  identframess     r   r  z*Worker.trigger_profile.<locals>.<dictcomp>}	  s    CCC5%CCCr   c                8    i | ]}|t          j        |          S r   )r*   ll_get_stack)r  r  s     r   r  z*Worker.trigger_profile.<locals>.<dictcomp>	  s%    WWWuw3E::WWWr   Tzdistributed/worker.py)rQ  zprofile-duration)r   rI   r   r  sys_current_framesr  r  r$   r  r*   processr   	llprocessr   rl  )r   ra  r   llframesr  framerp  llframer  rQ  r  s             @r   r  zWorker.trigger_profileq	  s    " 	F% 	8 	8!05577N	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8$&&CCCCNCCC" 	XWWWWWH"LLNN 	 	LE5 u 566",,u--4!4;R   !'47774!23!7>U    vv-te|<<<<<s   AA
A
r  c                  K   t                      | j        z   }|r| j        j        n| j        nfd| j        D             |d}nt          j        |f          }|d }n.t          j        |f          dz   }|t                    k    rd }|dk    r|t                    n/|t                    n|}fdt          ||          D             t          j        t          d           }	st          j                    S |4|||k     r,| j        }
n| j                 }
t          j        |	|
          }	|	S )Nc                2    g | ]\  }}|v 	||         fS r   r   )r  tdrp  s      r   r  z&Worker.get_profile.<locals>.<listcomp>	  s+    UUUtq!CSTHH1S6{HHHr   r   r   c                     g | ]
}|         S r   r   )r  ihistorys     r   r  z&Worker.get_profile.<locals>.<listcomp>	  s    AAAawqzAAAr   )rI   r  r  r*   r   r   bisectbisect_leftbisect_rightr  r}  rangemerger   r  r   r   )r   ra  rQ  rp  r  rA  istartistopiistopr  r  r  s      `       @r   r  zWorker.get_profile	  su      fft++ 	Vl*GG[*GGUUUUt/HUUUG=FF'%::F<EE'$99A=EG$$Q;;5=7mmGG%*]S\\\FAAAA5+@+@AAAG}eAw//0 	$>###=emus{{{,*3/=v..Dr   c                l  K   d u }t                      | j        z   }p|fd| j        D             fd| j        D             d}|rf|d                             || j        d         f           |d                             |d | j                                        D             f           |S )Nc                J    g | ]\  }}|cxk     rk     n n
||d          f S r  r   )r  r  r  ra  rQ  s      r   r  z/Worker.get_profile_metadata.<locals>.<listcomp>	  sN       $(AqEADTDTDTDTPTDTDTDTDTDTAgJDTDTDTr   c                v    g | ]5\  }}|cxk     rk     n n |d  |                                 D             f6S )c                &    i | ]\  }}||d          S r  r   )r  r#  r  s      r   r  z:Worker.get_profile_metadata.<locals>.<listcomp>.<dictcomp>	  s"    999tq!Q'
999r   )r  )r  r  r$  ra  rQ  s      r   r  z/Worker.get_profile_metadata.<locals>.<listcomp>	  se       Aq1####t##### 99qwwyy999:###r   )countsrd  r  r  rd  c                &    i | ]\  }}||d          S r  r   r  s      r   r  z/Worker.get_profile_metadata.<locals>.<dictcomp>	  s"    KKKAq!G*KKKr   )rI   r  r   r   r  r   r   r  )r   ra  rQ  
add_recentrA  r*  s    ``   r   r  zWorker.get_profile_metadata	  s      T\
fft++{s    ,0,@       5  		
 	
  	8##S$*=g*F$GHHH6N!!KK1B1H1H1J1JKKKL   r   Collection[str] | Nonec                   | j         5  t          j                    fd| j                                        D             d d d            n# 1 swxY w Y   |fd|D             d                                 D             S )Nc                (    i | ]\  }}||         S r   r   )r  tidrp  
sys_framess      r   r  z)Worker.get_call_stack.<locals>.<dictcomp>	  s#    WWWxsCc:c?WWWr   c                *    i | ]}|v ||         S r   r   )r  rp  r  s     r   r  z)Worker.get_call_stack.<locals>.<dictcomp>	  s$    HHH3#--c6#;---r   c                >    i | ]\  }}|t          j        |          S r   )r*   ra  )r  rp  r  s      r   r  z)Worker.get_call_stack.<locals>.<dictcomp>	  s)    PPP:3W'..PPPr   )r   r  r  r   r  )r   rd  r  r  s     @@r   r  zWorker.get_call_stack	  s    % 	X 	X,..JWWWW4;N;T;T;V;VWWWF	X 	X 	X 	X 	X 	X 	X 	X 	X 	X 	X 	X 	X 	X 	X HHHH$HHHFPPPPPPs   9AAAdict[str, float]c                h   K   | j                             | j        t          | j                   d {V S r  )r   r\  r  ri  r  r~  s    r   ri  zWorker.benchmark_disk	  sJ      Y..M>4+?
 
 
 
 
 
 
 
 	
r   c                \   K   | j                             | j        t                     d {V S r  )r   r\  r  rj  r~  s    r   rj  zWorker.benchmark_memory	  s3      Y..t}>NOOOOOOOOOr   rs  c                >   K   t          | j        |           d {V S )N)r>   rs  )rk  r>   )r   rs  s     r   rk  zWorker.benchmark_network	  s-      &48WEEEEEEEEEEr   r   c                    | j         5  | j        r| j        cd d d            S |                                 cd d d            S # 1 swxY w Y   d S r  )r   r   _get_clientr~  s    r   clientzWorker.client	  s    Z 	* 	*| *|	* 	* 	* 	* 	* 	* 	* 	* ''))		* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	*s   AAA
Ac           
        |t           j                            d          }t          |d          }	 ddlm}  |            }ddlm} |j        r|j        j	        | j        j	        k    s^t          |j        t                    r|j        | j        j	        k    s/t          |j        |          r!|j        j        | j        j	        k    r|| _        n# t          $ r Y nw xY w| j        syddlm} t#          | j                  } || j        | j        | j        d|dd	|
          | _        t(          j                            | j                   |s| j        j        dk    sJ | j        S )zGet local client attached to this worker

        If no such client exists, create one

        See Also
        --------
        get_client
        N!distributed.comm.timeouts.connectsr   )default_client)Clusterr   TrP  )r   r  set_as_defaultr  direct_to_workersr  rr  r  )r  r  r  r&   distributed.clientr  distributed.deploy.clusterr  r  rs  r  
_start_argr   scheduler_addressr   r  r   rY   r   r  r   r   r  r   )r   rr  r  r  r  r   r  s          r   r  zWorker._get_client	  s    ?koo&IJJG!'3//	&999999#^%%F
 ;:::::  &$,0FFF
 v0#66 G )T^-CCC!&"3W== D);t~?UUU  &'  	 	 	D	* | 	8111111(33L!6Y#)"&	 	 	DL '++DL999 8|*i7777|s   C 
CCc                >    | j         t          j                             S )a  Get the key of the task we are currently running

        This only makes sense to run within a task

        Examples
        --------
        >>> from dask.distributed import get_worker
        >>> def f():
        ...     return get_worker().get_current_task()

        >>> future = client.submit(f)  # doctest: +SKIP
        >>> future.result()  # doctest: +SKIP
        'f-1234'

        See Also
        --------
        get_worker
        )r   r  r  r~  s    r   get_current_taskzWorker.get_current_task3
  s    & "9#6#8#899r   c                    | j                             |           |                     t          ||                     d S )NrK  )r>   r  r  r   )r   rP  r  s      r   r  zWorker._handle_remove_workerH
  s>    .f+VVVWWWWWr   c                   	 | j                                          d S # t          $ r}t                              d|           t                              |           t          rdd l}|                                 t          |d          r-|
                                \  }}|                     ||            d }~ww xY w)NzValidate state failedr   r   r   )r  validate_stater   r   r`  r   rF  rG  rH  rr  r   r   )r   r   rG  r  r  s        r   r  zWorker.validate_stateL
  s    	J%%''''' 	 	 	LL01L===Q  


q*%% +ZZ\\
suc***	s    
B?BB::B?c                H    t          j        dt          d           | j        S )Nz_The `Worker.incoming_transfer_log` attribute has been renamed to `Worker.transfer_incoming_log`rA  rB  )r  r  r  r   r~  s    r   incoming_transfer_logzWorker.incoming_transfer_log]
  1    -		
 	
 	
 	
 ))r   c                H    t          j        dt          d           | j        S )Nz`The `Worker.outgoing_count` attribute has been renamed to `Worker.transfer_outgoing_count_total`rA  rB  )r  r  r  r   r~  s    r   r  zWorker.outgoing_countg
  1    5		
 	
 	
 	
 11r   c                H    t          j        dt          d           | j        S )NzbThe `Worker.outgoing_current_count` attribute has been renamed to `Worker.transfer_outgoing_count`rA  rB  )r  r  r  r   r~  s    r   outgoing_current_countzWorker.outgoing_current_countq
  s1    /		
 	
 	
 	
 ++r   c                H    t          j        dt          d           | j        S )Nz_The `Worker.outgoing_transfer_log` attribute has been renamed to `Worker.transfer_outgoing_log`rA  rB  )r  r  r  r   r~  s    r   outgoing_transfer_logzWorker.outgoing_transfer_log{
  r  r   c                H    t          j        dt          d           | j        S )NzfThe `Worker.total_in_connections` attribute has been renamed to `Worker.transfer_outgoing_count_limit`rA  rB  )r  r  r  r   r~  s    r   total_in_connectionszWorker.total_in_connections
  r  r   )NN)Tr/  r   r0  r1  r  r   r  r1  r   r2  r  r   r  r3  r  r4  r  r5  r  r6  r  r7  r  r1  r   r4  r  r8  r   r9  r  r:  r   r   r
  r   r  r;  r  r<  r  r<  r!  r   r"  r   r#  r   r$  r   r%  r   r&  r   r'  r   r   r   r  r  r  r5  r(  r5  r   r4  r   r4  r   r5  r)  r=  r*  r>  r   rn   r+  r?  r,  r?  r-  r?  r.  r   )r   r  )r   r  )r   r   )r   r   )r  r  r  r   r   r  )r  r9   r   r  )r  r   r   r  )r   r  )r3  r4  r   r  )r  r   r   r  )r   r  )r   r   r   r  )r<  r.   r   r  )r   r{  )r~  r  r   r   )Fr   )r  r   ra  r   r   r   )r   TTr  )
rr  r   r   r   r   r   r   r   r   r   )Nr  r   r   )
r<  r.   rd  r  r  r   rF  r8  r   r  r  )r   r   r  r   r   r   )r  r   r   r  )NT)rN  r  r  r   r  r   r   r  )r  r   r   r  )r   r   r  r   r   r  )r  r  r   r  )r  r   r   r  )rp  r   r   r   )r&  r'  r   r(  )rd  r+  r   r   )ra  r   rQ  r   r   r   r0  r   rP  r   r   r  )
rP  r   r  r?  r@  r   r  r   r   r   )rP  r   r   r   )r  r	   r  r   r   r  )r   TNr   NT)NNr   N)r   r3  r   r   )r   r   )rp  r    r  r   r   r   )r  r   r   r  r   r   r   r  )NNNF)r  r   )r   N)ra  r   rQ  r   r   r   )rd  r  r   r   )r   r  )rs  r   r   r  r   r   )rr  r   r   r   )rP  r   r  r   r   r  )}r   r   r   __doc__rS  WeakSetr   r   r   r   r  r   r   r  propertyr   rl   r*  r+  r,  r-  rm   memory_monitorrt   r  r	  busy_workerscomm_nbytescomm_threshold_bytesr$  data_needed_per_workerexecuted_countr  
generationhas_whatr  in_flight_tasksin_flight_workersrD  long_runningr  stimulus_logstimulus_storyr*  r  rR  target_message_sizetotal_out_connectionsrW  transition_counterr)  r(  validate_taskr  r  r  r  r   r  r  rJ   r   setterr  r+  r.  r1  r8  r}  rh  r[  ry  r   rb  rd  rX  rh  r  r]   r   r  r  r  r;   r[  r\  r
  r  r  r  r  r  r%  rl  r1  r>  rI  rL  rl  r  rY  rZ  rf  rg  r  rn  r  r  r  r  r  ri  rj  rk  r  r  r  r  r  r  r  r  r  r  __classcell__)r  s   @r   r   r     s,        N N` 5DGO4E4EJEEEE>Mgo>O>OOOOO&&&&''''""""2222HHHH""""888800000000&&&&&&&&        NNN!!!!::::9999''''6:K::::NNN""""""""####LLL""""III((((!H!!!!!!!!////;;;;####$$$$.... $(%)w$
 &*#"&* $!%OS-1#'$($(;?59&*"&-19H ( $37#(,",.*. $##'+(,7< %+ %)@D?C?C %)mw$ w$ w$ w$ w$ w$x ('''( ( ( X(& 4355L==??<<>><<>>,,..N ,+--F88::1133L008QRRRK99;   1022K;;=QQQ3355N4466O//11J--//H33.  N 54<STTTO6688
(
(
*
*C1133L--//H1133L3355N**,,E**,,E**,,E88-   ;:.   5466O7799;;==--//H2244MM M M XM ' ' ' X'

 

 

 

 ) ) X)@ @ @ @:   X ) ) X) H H H H$
 
 
 
D D D DL  
 
 
 57 8 8 8 8 8 8 8 88* * * *&N< N< N< N<`/ / / /. . . .` Q Q Q YQ   ($ ($ ($ ($T    &p p p p pd  "$s s s s Zsl +D; ; ; ; ;8, , ,- - -$ $#J//
 (,\! \! \! \! 0/\!J #'S S S S S
 
 
 
    !	%  %  %  %  Z% N       Z % % % %(         Y 
 
 
 
2 2 2 2   &-7 -7 -7 -7^ [ [ [ Y[z
 
 
 
4 4 4 4    ZW W W WW W W W
 ")F )F )F )F )FVF F F F F m m m Ym^   0" " " "= = = =D - - - - -` 6:    .Q Q Q Q Q
 
 
 

P P P PF F F F * * * X*: : : : :x: : : :*X X X X   " * * X* 2 2 X2 , , X, * * X* 2 2 X2 2 2 2 2r   r   rZ  zcontextvars.ContextVar[Worker]c                 r    	 t                                           S # t          $ r t          d          dw xY w)a  Get the worker currently running this task

    Examples
    --------
    >>> def f():
    ...     worker = get_worker()  # The worker on which this task is running
    ...     return worker.address

    >>> future = client.submit(f)  # doctest: +SKIP
    >>> future.result()  # doctest: +SKIP
    'tcp://127.0.0.1:47373'

    See Also
    --------
    get_client
    worker_client
    zNo worker foundN)rZ  r  LookupErrorr  r   r   r   
get_workerr  
  sE    $6!!! 6 6 6*++56s    6Tr   c                   |t           j                            d          }t          |d          }| r|rt	          |           } 	 t                      }| r|j        j        | k    r|                    |          S n# t          $ r Y nw xY wddl
m} 	  |j                    }n# t          $ r d}Y nw xY w|r| r|j        j        | k    r|S | r || |          S t          d          )a  Get a client while within a task.

    This client connects to the same scheduler to which the worker is connected

    Parameters
    ----------
    address : str, optional
        The address of the scheduler to connect to. Defaults to the scheduler
        the worker is connected to.
    timeout : int or str
        Timeout (in seconds) for getting the Client. Defaults to the
        ``distributed.comm.timeouts.connect`` configuration value.
    resolve_address : bool, default True
        Whether to resolve `address` to its canonical form.

    Returns
    -------
    Client

    Examples
    --------
    >>> def f():
    ...     client = get_client(timeout="10s")
    ...     futures = client.map(lambda x: x + 1, range(10))  # spawn many tasks
    ...     results = client.gather(futures)
    ...     return sum(results)

    >>> future = client.submit(f)  # doctest: +SKIP
    >>> future.result()  # doctest: +SKIP
    55

    See Also
    --------
    get_worker
    worker_client
    secede
    Nr  r  rq  r   r   z.No global client found and no address provided)r  r  r  r&   comm_resolve_addressr  r  rs  r  r  r  r   r  )rs  rr  r2   rP  r   r  s         r   
get_clientr  
  sP   N +//"EFFgs++G 0? 0&w//7  	7&*2g==%%g%666 >     *)))))!!    Kw K&"2":g"E"E	 Kvgw////IJJJs$   A= =
B
	B
B$ $B32B3c            
        t                      } t                       t                      t          j        z
  }| j                            | j        t          t          j	        |dt                                            dS )a  
    Have this task secede from the worker's thread pool

    This opens up a new scheduling slot and a new thread for a new task. This
    enables the client to schedule tasks on this node, which is
    especially useful while waiting for other jobs to finish (e.g., with
    ``client.gather``).

    Examples
    --------
    >>> def mytask(x):
    ...     # do some work
    ...     client = get_client()
    ...     futures = client.map(...)  # do some remote work
    ...     secede()  # while that work happens, remove ourself from the pool
    ...     return client.gather(futures)  # return gathered results

    See Also
    --------
    get_client
    get_worker
    zsecede-)rp  compute_durationr  N)
r  
tpe_secederI   rd   ro  r   r   r  r   rp  )rP  r  s     r   rT   rT   
  s{    . \\FLLLvv//H
K %*$&&**	
 	
 	
    r   )r  rF  rH  r>   r5   rd  r?  rP  r  r   rF  r8  rH  GetDataBusy | GetDataSuccessc                 K   || j         }|| j        }|                     |           d{V }d|_        	 t	          |||d||           d{V }	 |d         }|dk    r|                    d           d{V  n# t          $ r t          d|          w xY w||                     ||           S # |                     ||           w xY w)a  Get keys from worker

    The worker has a two step handshake to acknowledge when data has been fully
    delivered.  This function implements that handshake.

    See Also
    --------
    Worker.get_data
    Worker.gather_dep
    utils_comm.gather_data_from_workers
    Nz#Ephemeral Worker->Worker for gatherr[  )rF  rH  r{  rd  r  r   rK  zUnexpected response)	rF  rH  r/   r  r?   rV  ro  r  reuse)	r>   rd  rP  r  rF  rH  r<  re  r   s	            r   rE  rE    s9     ( o)V$$$$$$$$D5DI "#'
 
 
 
 
 
 
 
 
	'h'F ~~jj&&&&&&&&&	  	> 	> 	>2H===	>
 		&$		&$s#   B4 A> "B4 >BB4 4Ctaskr   r   c                z   t          |           r| d         t          u rVt          t          t          | dd                              s,| d         | d         t          |           dk    r| d         ni fS t          t          t          | dd                              s| d         | dd          i fS t          | fi fS )Nr   rA  r         )r   r!   r  r9  rV   r  execute_task)r  s    r   _normalize_taskr  C  s    d|| )7eCNDH(E(E$F$F7DGD		QT!WWBFFSabb2233 	)7DHb(($"$$r   c                    t          |           r*| d         | dd         }} |t          t          |           S t          | t                    r"t	          t          t          |                     S | S )zEvaluate a nested task

    >>> inc = lambda x: x + 1
    >>> execute_task((inc, 1))
    2
    >>> execute_task((sum, [1, 2, (inc, 3)]))
    7
    r   r   N)r   r9  r  r  r}  )r  r  r   s      r   r  r  M  sm     d|| !Wd122hdtSt,,--	D$		 Cd++,,,r   d   )maxsizezLRU[Callable[..., Any], bytes]cache_dumpsr  c                `   	 t           5  t          |          }ddd           n# 1 swxY w Y   n# t          $ rS t          j        |           }t          |          dk     r)t           5  |t          | <   ddd           n# 1 swxY w Y   Y n#t          $ r t          j        |           }Y nw xY w|S )z)Dump a function to bytes, cache functionsNi )_cache_lockr  ro  rL   dumpsr  r  )r  r*  s     r   dumps_functionr  d  s9   	$ 	' 	' &F	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' + + +d##v;; + +$*D!+ + + + + + + + + + + + + + + $ $ $d##$MsQ   / #/ '/ '/ 8B+'A>2B+>B	B+B	B+B+*B+c                   t          j                    }|5  |||<   ddd           n# 1 swxY w Y   t          t                      ||          5  t                              |d                   }		 t          | |||          }
t                              |	           n# t                              |	           w xY w	 ddd           n# 1 swxY w Y   |5  ||= ddd           n# 1 swxY w Y   |
S )Run a function, collect information

    Returns
    -------
    msg: dictionary with status, result/error, timings, etc..
    N)ro  r  rp  rP  )r  r  rb   rI   rZ  r  rv  r[  )rQ  r   r   r  rp  r   r   
time_delayr  r^  r  s              r   ru  ru  s  s     !!E	 $ $ #u$ $ $ $ $ $ $ $ $ $ $ $ $ $ $	66'
 
 
 	& 	&
   !:;;	&'$
KKCu%%%%Lu%%%%%	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 
 " "5!" " " " " " " " " " " " " " "JsH   (,,!C3B C B<<CCCC))C-0C-c                   t          j                    }	 t          j        dt                    5 }t          j        dt
                    5   | |i |}ddd           n# 1 swxY w Y   ddd           n# 1 swxY w Y   dd|t          |          |t          |          ndd}nA# t          t          f$ r  t          $ r#}t          |          }d|d	<   ||d
<   Y d}~nd}~ww xY w|j        |z   |d<   |j        |z   |d<   ||d<   |S )r  thread-noncpur  z
thread-cpuNrg  rK  r{  r   r*  r  r  
task-erredr{  ri  ra  rQ  rf  )r  r  rG   r  rI   rH   r  r  r   r   r   r<   ra  rQ  	rQ  r   r   r  r  r  r*  r  r   s	            r   rv  rv    s    !!E%
  t<<< 	3$\DDD 3 3!426223 3 3 3 3 3 3 3 3 3 3 3 3 3 3	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3( "Vnn$*$6DLLLD
 
! )*    	 $ $ $ A D	"#$  7Z'CL&:%CKCMJs^   B* A8	A!A8!A%	%A8(A%	)A8,B* 8A<<B* ?A< B* *C(C##C(c                  K   t          j                    }	 t          j        dt                    5 } | |i | d{V }ddd           n# 1 swxY w Y   dd|t          |          |t          |          ndd}nA# t          t          f$ r  t          $ r#}t          |          }d|d<   ||d	<   Y d}~nd}~ww xY w|j        |z   |d
<   |j        |z   |d<   ||d<   |S )r  r
  r  Nrg  rK  r  r  r{  ri  ra  rQ  rf  )r  r  rG   r  rI   r  r  r   r   r   r<   ra  rQ  r  s	            r   rt  rt    s}      !!E
 t<<< 	5#8T4V44444444F	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5* "Vnn$*$6DLLLD
 
% )*    	 $ $ $ A D	"#$$ 7Z'CL&:%CKCMJs:   A? AA? AA? AA? ?B=B88B=c                   t          j                    }|5  |||<   ddd           n# 1 swxY w Y   t          t                      ||d          5  t                              |d                   }	  | |i |}	t                              |           n# t                              |           w xY w|5  ||= ddd           n# 1 swxY w Y   |	cddd           S # 1 swxY w Y   dS )r  NT)ro  r  rp  rS  rP  )r  r  rb   rI   rZ  r  r[  )
rQ  r   r   r  rp  r   r   r  r^  r*  s
             r   r]  r]     s    !!E	 $ $ #u$ $ $ $ $ $ $ $ $ $ $ $ $ $ $ 
66'	
 
 
     !:;;	&Xt.v..Fu%%%%Lu%%%%  	& 	&u%	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	&                  sZ   (,,!C!4B<C!B33C!8C<C!C	C!C	C!!C%(C%c                     G d d          }|                                  } d| v r |t          | d                   | d<   d| v r |t          | d                   | d<   | S )zMake a worker msg, which contains args and kwargs, safe to cast to str:
    allowing for some arguments to raise exceptions during conversion and
    ignoring them.
    c                      e Zd Zd Zd ZdS )get_msg_safe_str.<locals>.Reprc                "    || _         || _        d S r  _f_val)r   r-  vals      r   r  z'get_msg_safe_str.<locals>.Repr.__init__'  s    DGDIIIr   c                6    |                      | j                  S r  r  r~  s    r   r  z'get_msg_safe_str.<locals>.Repr.__repr__+  s    7749%%%r   N)r   r   r   r  r  r   r   r   Reprr  &  s2        	 	 		& 	& 	& 	& 	&r   r  r   r   )r  ry  rz  )r  r  s     r   get_msg_safe_strr     s    & & & & & & & & ((**C}}d.F<<F32CMBBHJr   rk  r1  c           	        d}d t          t          |                     D             }t          |           D ]\  }}	 t          |          }n# t          $ r d}Y nw xY w|||<   |t          |          dz   z  }|C||k    r=d                    d                    |d|dz                                d|         c S d	                    d                    |                    S )
zwConvert args to a string, allowing for some arguments to raise
    exceptions during conversion and ignoring them.
    r   c                    g | ]}d S r  r   r  r  s     r   r  z'convert_args_to_str.<locals>.<listcomp>;  s    )))1B)))r    < could not convert arg to str >rA  Nz({}, r   z({}))r  r  	enumerater_  r   r  join)r   rk  lengthstrsr  argsargs          r   ry  ry  6  s     F))c$ii(()))DD// 
. 
.3	699DD 	6 	6 	65DDD	6Q#d))a-6G#3#3<<		$wQw- 8 899(7(CCCC}}TYYt__---s   AAAr   r  c           	     $   d}d t          t          |                     D             }t          |                                           D ]\  }\  }}	 t	          |          }n# t
          $ r d}Y nw xY wt	          |          dz   |z   }|||<   |t          |          dz   z  }|C||k    r=d                    d                    |d|d	z                                d|         c S d
                    d                    |                    S )zyConvert kwargs to a string, allowing for some arguments to raise
    exceptions during conversion and ignoring them.
    r   c                    g | ]}d S r  r   r  s     r   r  z)convert_kwargs_to_str.<locals>.<listcomp>N  s    +++1B+++r   r  z: rA  Nz{{{}r   r   z{{{}}})r  r  r!  r  r_  r   r  r"  )	r   rk  r#  r$  r  argnamer%  r&  skwargs	            r   rz  rz  I  s     F++c&kk**+++D&v||~~66 0 0>GS	699DD 	6 	6 	65DDD	6g%,Q#f++/!6G#3#3==4!a%=!9!9::8G8DDDDtyy///s   A##A21A2r   c           
     
  K   |pi }t          j        |          }t          |          }|s|s
J d            |rt          j        |          }|rt          j        |          }t          |d          r| |d<   t          |d          r| |d<   t                              dt          |                     	 |s	 ||i |}n)|r ||i | d {V }n | j        j        |g|R i | d }dt          |          d}n# t          $ rx}	t                              dt          t          |                    d d         t          |d	          t          |d	          d
           t          |	          }Y d }	~	nd }	~	ww xY w|S )NzCombination not supporteddask_workerdask_schedulerzRun out-of-band function %rrK  rW  z2Run Failed
Function: %s
args:     %s
kwargs:   %s
rx  rj  Tr   )rL   r  r[   rX   r   r9  r#   r  r  rM   r   rI  r   ry  rz  r<   )
r  r<  rQ  r   r   r  is_coror*  re  r   s
             r   rY  rY  ]  s      \rF|H%%H!(++G77777777 "|D!! &f%%x'' ' &}x)** *#) 
KK-x/A/ABBBD 	Xt.v..FF 'x888888888:0:8UdUUUfUUU #l6.B.BCC  	$ 	$ 	$D""##ETE*d333!&$777 	 	
 	
 	
 !##	$ Os   74C> >
F A.E;;F c                 :    d } | t           d<   d }|t          d<   d S )Nc                H   K   t          t          j                   d {V }|S r  )r^   r@   	real_timerP  r*  s     r   
gpu_metricz#add_gpu_metrics.<locals>.gpu_metric  s,      t~........r   rT  c                (    t          j                    S r  )r@   one_timer  s    r   gpu_startupz$add_gpu_metrics.<locals>.gpu_startup  s    }r   )r   r   )r3  r6  s     r   add_gpu_metricsr7    s?       (OE   *5&&&r   c                H   K   t          t          j                   d {V }|S r  )r^   rA   r1  r2  s     r   
rmm_metricr9    s,      s}--------r   rA   r  
Fseprx  fileflushr<  rx  r=  TextIO | Noner>  r   r  c                h   	 t                      }t          t          t          |                    | ||d}|t          j        k    rd|d<   n*|t          j        k    rd|d<   n|t          d|          |                    d|           n# t          $ r Y nw xY wt          j        || |||d dS )	a  
    A drop-in replacement of the built-in ``print`` function for remote printing
    from workers to clients. If called from outside a dask worker, its arguments
    are passed directly to ``builtins.print()``. If called by code running on a
    worker, then in addition to printing locally, any clients connected
    (possibly remotely) to the scheduler managing this worker will receive an
    event instructing them to print the same output to their own standard output
    or standard error streams. For example, the user can perform simple
    debugging of remote computations by including calls to this ``print``
    function in the submitted code and inspecting the output in a local Jupyter
    notebook or interpreter session.

    All arguments behave the same as those of ``builtins.print()``, with the
    exception that the ``file`` keyword argument, if specified, must either be
    ``sys.stdout`` or ``sys.stderr``; arbitrary file-like objects are not
    allowed.

    All non-keyword arguments are converted to strings using ``str()`` and
    written to the stream, separated by ``sep`` and followed by ``end``. Both
    ``sep`` and ``end`` must be strings; they can also be ``None``, which means
    to use the default values. If no objects are given, ``print()`` will just
    write ``end``.

    Parameters
    ----------
    sep : str, optional
        String inserted between values, default a space.
    end : str, optional
        String appended after the last value, default a newline.
    file : ``sys.stdout`` or ``sys.stderr``, optional
        Defaults to the current sys.stdout.
    flush : bool, default False
        Whether to forcibly flush the stream.

    Examples
    --------
    >>> from dask.distributed import Client, print
    >>> client = distributed.Client(...)
    >>> def worker_function():
    ...     print("Hello from worker!")
    >>> client.submit(worker_function)
    <Future: finished, type: NoneType, key: worker_function-...>
    Hello from worker!
    )r   r<  rx  r>  r   r=  rA  Nz|Remote printing to arbitrary file objects is not supported. file kwarg must be one of None, sys.stdout, or sys.stderr; got: printr;  )r  r  r9  r   r  stdoutstderrr  r   r  builtinsrA  )r<  rx  r=  r>  r   rP  r  s          r   rA  rA    s    f' #c4..))
 
 3:CKKSZCKKWNRW W   	#&&&&-    0 NDcsUCCCCCCs   B 
BBr   rL  str | Warningcategorytype[Warning] | NonerC  r   r6  c                    	 t                      }|                    dt          j        |           t          j        |          d           n# t          $ r Y nw xY wt          j        | ||dz   |           dS )a  
    A drop-in replacement of the built-in ``warnings.warn()`` function for
    issuing warnings remotely from workers to clients.

    If called from outside a dask worker, its arguments are passed directly to
    ``warnings.warn()``. If called by code running on a worker, then in addition
    to emitting a warning locally, any clients connected (possibly remotely) to
    the scheduler managing this worker will receive an event instructing them to
    emit the same warning (subject to their own local filters, etc.). When
    implementing computations that may run on a worker, the user can call this
    ``warn`` function to ensure that any remote client sessions will see their
    warnings, for example in a Jupyter output cell.

    While all of the arguments are respected by the locally emitted warning
    (with same meanings as in ``warnings.warn()``), ``stacklevel`` and
    ``source`` are ignored by clients because they would not be meaningful in
    the client's thread.

    Examples
    --------
    >>> from dask.distributed import Client, warn
    >>> client = Client()
    >>> def do_warn():
    ...    warn("A warning from a worker.")
    >>> client.submit(do_warn).result()
    /path/to/distributed/client.py:678: UserWarning: A warning from a worker.
    r  )rL  rF  r   N)r  r   rL   r  r  r  r  )rL  rF  rC  r6  rP  s        r   r  r    s    B
 	!<00"L22 
	
 
	
 
	
 
	
    * M'8Z!^V<<<<<s   A 
AA1 kiB100 kiB1 MiB10 MiBz100 MiB1 srootdirsizesr'  r  c           
        t          |          }i }|D ]t}t          |           5 }t          j        |          }t	          t          t          t          d                              }t          |          }t          j
        |          }t                      }	d}
t                      |	|z   k     rt          |t          j        |          z  d          5 }|                    |           |                                 t!          j        |                                           ddd           n# 1 swxY w Y   |
|z  }
t                      |	|z   k     |
t                      |	z
  z  ||<   ddd           n# 1 swxY w Y   v|S )z~
    Benchmark disk bandwidth

    Returns
    -------
    out: dict
        Maps sizes of outputs to measured bandwidths
    )dirr  r   ab)modeN)r&   r'   pathlibPathr}  r9  r   r  r%   r  	randbytesrI   openchoicerV  r>  r   fsyncfileno)rO  rP  r  r%  size_strrR  namessizer   ra  r  r-  s               r   ri  ri  .  s    x((H
C 5 5    	5C,s##CS%**--..Ex((D#D))DFFEE&&58+++#e 4 444@@@ )AGGDMMMGGIIIHQXXZZ((() ) ) ) ) ) ) ) ) ) ) ) ) ) )  &&58+++ "TVVe^4CM!	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5" Js7   B2E=AD5)E=5D99E=<D9=4E==F	F	z2 kiB10 kiBrK  rL  rM  200 msc                :   t          |          }i }| D ]}t          |          }t          j        |          }t	                      }d}t	                      ||z   k     r%|dd         }~||z  }t	                      ||z   k     %|t	                      |z
  z  ||<   |S )z
    Benchmark memory bandwidth

    Returns
    -------
    out: dict
        Maps sizes of outputs to measured bandwidths
    r   N)r&   r%   r  rW  rI   )	rP  r  r%  r\  r^  r   ra  r  r  s	            r   rj  rj  S  s     x((H
C 1 18$$%%ffux'''SbS	ATME ffux'''
 %0HJr   rJ  r`  rK  rL  rM  z50 MiBrs  )ConnectionPool | Callable[[str], RPCType]c                  K   t          |          }i } ||           4 d{V }|D ]}t          |          }t          t          j        |                    }t                      }	d}
t                      |	|z   k     r9|                    |           d{V  |
|dz  z  }
t                      |	|z   k     9|
t                      |	z
  z  ||<   	 ddd          d{V  n# 1 d{V swxY w Y   |S )z
    Benchmark network communications to another worker

    Returns
    -------
    out: dict
        Maps sizes of outputs to measured bandwidths
    Nr   r  rA  )r&   r%   rM   r  rW  rI   echo)rs  r>   rP  r  r%  r  r\  r^  r   ra  r  s              r   rk  rk  p  s      x((H
Cs7|| 5 5 5 5 5 5 5q 
	5 
	5Hx((D 0 6 677DFFEE&&58+++ff$f'''''''''! &&58+++ "TVVe^4CMM
	55 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 Js   B+C##
C-0C-)r   r   r   r   r  )r   r   )NNTr  )r>   r5   rd  r?  rP  r   r  r   rF  r8  rH  r8  r   r  )r  r   r   r   )r   r  r  )rk  r1  r   r   )r   r  rk  r1  r   r   r  )
r<  r   rx  r   r=  r?  r>  r   r   r  )
rL  rE  rF  rG  rC  r   r6  r   r   r  )NrI  rN  )rO  r   rP  r'  r   r  )r_  ra  )rP  r'  r   r  )rd  rN  )rs  r   r>   re  rP  r'  r   r  (   
__future__r   r]  r  rD  r  contextvarsr  loggingr  r   rU  r  r  r  r  rS  collectionsr   r   collections.abcr   r   r   r	   r
   r   r   concurrent.futuresr   r   datetimer   	functoolsr   inspectr   typingr   r   r   r   r   r   r   r   tlzr   r   tornado.ioloopr   r  	dask.corer   dask.systemr   dask.typingr    
dask.utilsr!   r"   r#   r$   r%   r&   r'   r(   r   r)   r*   r+   distributed.batchedr,   distributed.collectionsr-   distributed.commr.   r/   r0   r1   r2   r  distributed.comm.addressingr3   distributed.compatibilityr4   distributed.corer5   r6   r7   r8   r9   r:   r;   r<   r=   r>   RPCTyper?   distributed.diagnosticsr@   rA   distributed.diagnostics.pluginrB   rC   distributed.diskutilsrD   distributed.exceptionsrE   distributed.httprF   distributed.metricsrG   rH   rI   distributed.noderJ   distributed.proctitlerK   distributed.protocolrL   rM   distributed.protocol.serializerN   distributed.pubsubrO   distributed.securityrP   distributed.sizeofrQ   r  distributed.spansrR   distributed.threadpoolexecutorrS   rT   r  distributed.utilsrU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   rc   rd   re   distributed.utils_commrf   rg   rh   distributed.utils_perfri   rj   distributed.versionsrk   distributed.worker_memoryrl   rm   rn   ro    distributed.worker_state_machinerp   rq   rr   rs   rt   ru   rv   rw   rx   ry   rz   r{   r|   r}   r~   r   r   r   r   r   r   r   r   r   r   r   typing_extensionsr   r  r   distributed.nannyr   distributed.schedulerr   r   r   	getLoggerr   r   r  r  rF  r   r   r   r   r  r  r  r  r   r   r   r   r   
ContextVarrZ  r  r  rE  r  r  r  r  r  r  ru  rv  rt  r]  r  ry  rz  rY  r   _global_workersr7  _rmmr9  r   rA  UserWarningr  ri  rj  rk  r   r   r   <module>r     s   " " " " " " "               				   



       * * * * * * * *                  ( ' ' ' ' '                        	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	         ! ! ! ! ! !        ! ! ! ! ! !      	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 3 2 2 2 2 2 2 2 2 2 + + + + + + ' ' ' ' ' ' K K K K K K K K K K K K D D D D D D > > > > > > 6 6 6 6 6 6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 , + + + + + & & & & & & - - - - - - - - I I I I I I I I + + + + + + - - - - - - ) ) ) ) ) ) @ @ @ @ @ @ @ @ @ @ ' ' ' ' ' ' . . . . . . 5 5 5 5 5 5 5 5 7 7 7 7 7 7 4 4 4 4 4 4 ) ) ) ) ) ) 4 4 4 4 4 4 2 2 2 2 2 2 = = = = = = ? ? ? ? ? ?                                     & S R R R R R R R R R L L L L L L L L - - - - - -                                                                  :  
++++++ *)))))''''''//////	#AA		8	$	$
+//8
9
9 $!' '     
 79 8 8 8 8BD  D D D D N
M
     )       Y   
   D   DE&2 E&2 E&2 E&2 E&2Z E&2 E&2 E&2PL 0F{/En/U/U U U U U6 6 6 60AK AK AK AK AKH! ! !R $(&*-  -  -  -  -  - `% % % %  $ /2c#.>.>.> > > > >in        F7 7 7t- - -`  @  ,. . . . .&0 0 0 0 0(% % % %P #
5 
5 
5
   (OE  	 	 	D	 MD MD MD MD MD MDd &1	8= 8= 8= 8= 8=x M" " " " "L M    @ W	      s    M MM