
    0FieN                       d dl mZ d dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d dlZd d
lmZ d dlmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z"m#Z# erd dl$m%Z%m&Z&m'Z'm(Z(m)Z) dZ* ej+        e,          Z-ej.        /                    d          Z0h dZ1h dZ2ddhZ3 G d de          Z4 G d de           Z5d$d"Z6d#hZ7dS )%    )annotationsN)defaultdictdeque)	Container)partial)log2)time)TYPE_CHECKINGAnyClassVar	TypedDictcast)topk)Key)parse_timedelta)PeriodicCallback)CommClosedError)SchedulerPlugin)
log_errorsrecursive_to_dict)	SchedulerSchedulerState	TaskStateTaskStateStateWorkerStateg?zdistributed.admin.pdb-on-err>   readywaitingconstrained>   long-runningmemoryresumed	cancelled	executingreleasedc                  B    e Zd ZU ded<   ded<   ded<   ded<   ded<   d	S )
InFlightInfor   victimthieffloatvictim_durationthief_durationstrstimulus_idN)__name__
__module____qualname____annotations__     4lib/python3.11/site-packages/distributed/stealing.pyr&   r&   ?   sN         r3   r&   c                     e Zd ZU ded<   ded<   ded<   d ed  ed	d
          D                       z   Zded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   dVdZdWdXd!ZdYd"Z	d#d$dZd(Z
d[d*Zd\d]d,Zd^d/ZdYd0Zd_d7Zd`d<Zdad>Zdbd?Zdbd@ZdbdAZdcdCZdddGZddHdedLZdYdMZdfdOZdgdPZdXdQZdhdUZdS )iWorkStealingr   	schedulerz%dict[str, tuple[set[TaskState], ...]]	stealablez dict[TaskState, tuple[str, int]]key_stealable)g      ?c              #  ,   K   | ]}d d|dz
  z  z   V  dS )         Nr2   ).0is     r4   	<genexpr>zWorkStealing.<genexpr>N   sH       C CA!a%LC C C C C Cr3   r;      zClassVar[tuple[float, ...]]cost_multipliersr)   _callback_timeintcountzdict[TaskState, InFlightInfo]	in_flightzdefaultdict[WorkerState, float]in_flight_occupancyzdefaultdict[WorkerState, int]in_flight_taskszdict[str, dict[int, float]]metricszasyncio.Event_in_flight_event_request_counterc                   || _         i | _        i | _        |j        D ]}|                     |           t          t          t          t          j	        
                    d          d                    | _        | j                             |            t          j	        
                    d          }t          |          | j         j        d<   d| _        i | _        t#          t$                    | _        t#          t$                    | _        t+          j                    | _        t#          t$                    t#          t$                    d	| _        d| _        | j        | j         j        d
<   d S )Nworkerz,distributed.scheduler.work-stealing-intervalms)defaultz&distributed.admin.low-level-log-length)maxlenstealingr   )request_count_totalrequest_cost_totalzsteal-response)r7   r8   r9   workers
add_workerr   r)   r   daskconfiggetrC   
add_pluginr   eventsrE   rF   r   rD   rG   rH   asyncioEventrJ   rI   rK   move_task_confirmstream_handlers)selfr7   rN   rQ   s       r4   __init__zWorkStealing.__init__\   sF   "' 	+ 	+FOO6O****" NOO  
 
 	!!$'''!IJJ,1,@,@,@j)
#.s#3#3 *3// '#.s#3#3"-c"2"2
 
 !";?;Q&'7888r3   Nr   returnNonec                   K   d| j         j        v rdS t          | j        | j        dz            }|                                 || j         j        d<   | j                                         dS )aF  Start the background coroutine to balance the tasks on the cluster.
        Idempotent.
        The scheduler argument is ignored. It is merely required to satisfy the
        plugin interface. Since this class is simultaneously an extension, the
        scheduler instance is already registered during initialization
        rR   Ni  )callbackcallback_time)r7   periodic_callbacksr   balancerC   startrJ   set)r`   r7   pcs      r4   ri   zWorkStealing.start{   sy       :::F\1Dt1K
 
 
 	


8:)*5!!#####r3   c                   K   | j         j                            dd          }|r|                                 | j                                         d{V  dS )zStop the background task balancing tasks on the cluster.
        This will block until all currently running stealing requests are
        finished. Idempotent
        rR   N)r7   rg   popstoprJ   wait)r`   rk   s     r4   rn   zWorkStealing.stop   sd      
 ^.22:tDD 	GGIII#((***********r3   r2   )excluderp   Container[str]dictc               &    t          | |d          S )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        T)rp   members)r   )r`   rp   s     r4   _to_dict_no_nestzWorkStealing._to_dict_no_nest   s     !wEEEEr3   msgc                8    | j                             d|          S NrR   )r7   	log_event)r`   rv   s     r4   logzWorkStealing.log   s    ~''
C888r3   rN   c                b    t          d t          d          D                       | j        |<   d S )Nc              3  2   K   | ]}t                      V  d S N)rj   )r>   _s     r4   r@   z*WorkStealing.add_worker.<locals>.<genexpr>   s&      &@&@suu&@&@&@&@&@&@r3   rA   )tupleranger8   )r`   r7   rN   s      r4   rV   zWorkStealing.add_worker   s1    !&&@&@eBii&@&@&@!@!@vr3   r,   kwargsc                    | j         |= d S r}   )r8   )r`   r7   rN   r   s       r4   remove_workerzWorkStealing.remove_worker   s    N6"""r3   c                d    | j         j        }d|v r|d                                          |d= d S d S rx   )r7   rg   rn   )r`   pcss     r4   teardownzWorkStealing.teardown   s@    n/
O  """J r3   keyr   ri   r   finishargsc                    |dk    r)| j         j        |         }|                     |           d S |dk    r>| j         j        |         }|                     |           |                     |           d S d S )N
processing)r7   tasksput_key_in_stealableremove_key_from_stealable_remove_from_in_flight)r`   r   ri   r   r   r   tss          r4   
transitionzWorkStealing.transition   s     \!!%c*B%%b)))))l""%c*B**2...''+++++ #"r3   r   r   infor&   c                ,   || j         |<   | j                                         |d         }|d         }| j        |xx         |d         z  cc<   | j        |xx         |d         z  cc<   | j        |xx         dz  cc<   | j        |xx         dz  cc<   d S )Nr(   r'   r*   r+   r;   )rF   rJ   clearrG   rH   r`   r   r   r(   r'   s        r4   _add_to_in_flightzWorkStealing._add_to_in_flight   s    !r##%%%Wh (((D1B,CC((( '''40@+AA'''V$$$)$$$U###q(#####r3   InFlightInfo | Nonec                   | j                             |d           }|r|d         }|d         }| j        |xx         |d         z  cc<   | j        |xx         |d         z  cc<   | j        |xx         dz  cc<   | j        |xx         dz  cc<   | j         s2| j                                         | j                                         |S )Nr(   r'   r+   r*   r;   )rF   rm   rG   rH   r   rJ   rj   r   s        r4   r   z#WorkStealing._remove_from_in_flight   s    ~!!"d++ 		,ME(^F$U+++t4D/EE+++$V,,,5F0GG,,, (((A-((( '''1,'''> ,(..000%))+++r3   c                p    || j         vr,|                     |           |                     |           d S d S r}   )rF   r   r   )r`   r   s     r4   recalculate_costzWorkStealing.recalculate_cost   sE    T^##**2...%%b))))) $#r3   c                    |                      |          \  }}|O|J |j        sJ |j        }|j        }| j        |         |                             |           ||f| j        |<   d S d S r}   )steal_time_ratioprocessing_onaddressr8   addr9   )r`   r   cost_multiplierlevelwsrN   s         r4   r   z!WorkStealing.put_key_in_stealable   s    !%!6!6r!:!:&$$$####!BZFN6"5)--b111&,e_Dr""" '&r3   c                    | j                             |d           }|d S |\  }}	 | j        |         |                             |           d S # t          $ r Y d S w xY wr}   )r9   rm   r8   removeKeyError)r`   r   resultrN   r   s        r4   r   z&WorkStealing.remove_key_from_stealable   sz    #''D11>F	N6"5)0044444 	 	 	DD	s   &A 
AA%tuple[float, int] | tuple[None, None]c                   |j         j        }|t          v rdS |j        sdS | j                            |          }|s|j        sJ ||j        j        v sJ dS |                                }|| j        j	        z  t          z   }||z  }t          t          t          |          dz                       }|dk     rd}n|t          | j                  k    rdS ||fS )a;  The compute to communication time ratio of a key

        Returns
        -------
        cost_multiplier: The increased cost from moving this task as a factor.
        For example a result of zero implies a task without dependencies.
        level: The location within a stealable list to place this value
        NN)r   r   r=   r;   )prefixname
fast_tasksdependenciesr7   get_task_durationr   long_runningget_nbytes_deps	bandwidthLATENCYrD   roundr   lenrB   )r`   r   splitcompute_timenbytestransfer_timer   r   s           r4   r   zWorkStealing.steal_time_ratio   s     	J: 	4~77;; 	 ####)66666:##%%!99GC',6E$//!34455199EEc$/0000:%%r3   r'   r   r(   c                4   	 || j         v rdS d| j         }| xj        dz  c_        |j        }|                     |           t                              d|||j        ||j                   | j                            |          | j        	                    ||          z   }| j                            |          | j        	                    ||          z   }| j        j
        |j                                     d||d           |||||d}|                     ||           |S # t          $ r  t                              d||           Y d	S t           $ r?}	t                              |	           t$          rd
d l}
|
                                  d }	~	ww xY w)Nz	in-flightzsteal-r;   z#Request move %s, %s: %2f -> %s: %2fzsteal-request)opr   r-   )r'   r(   r*   r+   r-   z(Worker comm %r closed while stealing: %rzcomm-closedr   )rF   rK   r   r   loggerdebug	occupancyr7   r   get_comm_coststream_commsr   sendr   r   r   	Exception	exceptionLOG_PDBpdb	set_trace)r`   r   r'   r(   r-   r   r*   r+   r   er   s              r4   move_task_requestzWorkStealing.move_task_request  s   1	T^##"{ ;4#8::K!!Q&!!&C**2...LL5    #n>> ,,R889O "^== ,,R778N N'7<<&s;OO   !#2"0*" "D ""2t,,, 	! 	! 	!KKBFBOOO == 	 	 	Q  


	s#   	D& DD& &&F	F:FFrM   stater-   
str | Nonec               L  K   	 | j         j        |         }n,# t          $ r t                              d|           Y d S w xY w	 | j        |         d         |k    r|                     d||||f           d S n+# t          $ r |                     d||||f           Y d S w xY w|                     |          }|sJ |d         }|d         }t                              d||||           | j         j        r|j	        |k    sJ 	 |||j
        |j
        |g}	|t          v s1|t          v rp|| j         j                            |j
                  k    rH|                     d|j
        | j         j        vg|	R            | j                             ||	           n|t           v r|                     d
g|	R            n|t          v r|                     |           ||_	        |                    |           |                    |           |                     |           | j                             |j
        |           |                     dg|	R            nt-          d|           nL# t.          $ r?}
t                              |
           t2          rdd l}|                                  d }
~
ww xY w| j                             |           | j                             |           d S # | j                             |           | j                             |           w xY w)Nz,Key released between request and confirm: %sr-   zstale-responsezalready-abortedr(   r'   z%Confirm move %s, %s -> %s.  State: %s
reschedule)r-   zalready-computingconfirmzUnexpected task state: r   )r7   r   r   r   r   rF   rz   r   validater   r   _WORKER_STATE_UNDEFINED_WORKER_STATE_CONFIRMrU   rY   _reschedule_WORKER_STATE_REJECTr   remove_from_processingadd_to_processingr   send_task_to_worker
ValueErrorr   r   r   r   r   check_idle_saturated)r`   r   r   r-   rN   r   r   r(   r'   _log_msgr   r   s               r4   r^   zWorkStealing.move_task_confirmM  s     	%c*BB 	 	 	LLGMMMFF		~b!-0K??*CLMMM @  	 	 	HH'eV[IJJJFF	 **2..Wh<c65RWXXX>" 	.#v----)	8UFNEM;OH 000111T^377FFFF$T^-CC "    **3K*HHHH...-999::::///..r222#( --b111''+++))"---225="EEE)/h//0000 !B5!B!BCCC 	 	 	Q  


	 N//666N//77777 N//666N//7777sK    %A A 1A8 8$B B E&I+ *K- +
J45:J//J44K- -6L#c                     j         }g }t                      }t                      5  d}t          |j                                                  r%t                    t          |j                  k    r	 d d d            d S |j        }|sMt          d|j                                         j
                  } fd|D             }|s	 d d d            d S t          |          dk     rt          | j
        d          }|sJ sJ t           j                  D ]K\  }}s nBt          |          D ]/} j        |j                 |         }	|	rs t          |	          D ]}
s n|
 j        vs|
j        |us	|
|j        vr|	                    |
           9|dz  }t+          ||
          x}sR 
                    |          } 
                    |          } j                             |
|          } j                             |
|          } j                             |
          }||z   |z   |||z   d	z  z
  k    r                     |
||           ||z   }|                    |||
j        ||j        ||j        |f            j        d
         |xx         dz  cc<    j        d         |xx         |z  cc<    
                    |          }                     |          } j                             |||          s                    |           |	                    |
           щ j                             | 
                    |                     1M|r'                     d|f            xj         dz  c_         t                      }|j!        r#|j!        d         "                    ||z
             d d d            d S # 1 swxY w Y   d S )Nr   
   r   c                    g | ]?}                     |          d k    r$                    |          |j        k    r|v=|@S )g?)_combined_occupancy_combined_nprocessingnthreads)r>   r   potential_thievesr`   s     r4   
<listcomp>z(WorkStealing.balance.<locals>.<listcomp>  se     % % %//33c9922266DD"333	  433r3      T)r   reverser;   r<   rS   rT   )occrequestzsteal-duration)#r7   r	   r   rj   idlevaluesr   rU   	saturatedr   r   sorted	enumeraterB   listr8   r   r9   r   r   discard
_get_thiefr   r   r   appendr   rI   r   is_unoccupiedr   rz   rE   digestsr   )r`   srz   ri   r?   potential_victimsr   r~   r'   r8   r   r(   	occ_thief
occ_victimcomm_cost_thiefcomm_cost_victimcomputecostnproc_thiefrn   r   s   `                   @r4   rh   zWorkStealing.balance  sT   N\\ a	> a	>A #AFMMOO 4 4$ ,=(>(>#ai..(P(Pa	> a	> a	> a	> a	> a	> a	> a	> GHk$ $(	((**0H% % %!% % % % %/% % %! ) )a	> a	> a	> a	> a	> a	> a	> a	>* $%%**$*%4+CT% % %! %$$$$$$$%d&;<< ? ?q( E"#455 < <F $v~ >u EI$ !,= ! "9oo 42 420 "!Ed&888!/v==!)::: &--b111$Q)3Ar;L)M)M M %$$($<$<U$C$C	%)%=%=f%E%E
*..*F*Fr5*Q*Q+/>+G+GF+S+S("&."B"B2"F"F &7'A)-=-G1,LLM M !222vuEEE#*-=#=DJJ$)$)$&F$($*N$.$)M$-	!"   !L)>?FFF!KFFF L)=>uEEEMEEE(,(@(@(G(GI*.*D*DU*K*KK#'>#?#? %y+$ $ A !2 9 9% @ @ @ &--b111N77D$<$<V$D$D 8    u<|   )S)***

a

66Dy >	*+//u===Ca	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	> a	>s    AOA	OK4OO!$O!r   c                ,    |j         | j        |         z   S r}   )r   rG   r`   r   s     r4   r   z WorkStealing._combined_occupancy  s    |d6r:::r3   c                F    t          |j                  | j        |         z   S r}   )r   r   rH   r   s     r4   r   z"WorkStealing._combined_nprocessing  s    2=!!D$8$<<<r3   c                    | j                                         D ]}|D ]}|                                 | j                                         d S r}   )r8   r   r   r9   )r`   r7   r8   r   s       r4   restartzWorkStealing.restart  s_    ..00 	 	I  				 	  """""r3   
keys_or_tsstr | TaskStater   c                    d |D             g }| j                             d          D ]R\  }}|d         dk    r	|d         }n|g}|D ]2}t          fd|D                       r|                    |           3S|S )Nc                J    h | ] }t          |t                    s|j        n|!S r2   )
isinstancer,   r   )r>   r   s     r4   	<setcomp>z%WorkStealing.story.<locals>.<setcomp>  s-    SSSz#s33<SSSr3   rR   )topicr   r   r;   c              3      K   | ]}|v V  	d S r}   r2   )r>   xkeyss     r4   r@   z%WorkStealing.story.<locals>.<genexpr>  s'      ,,QqDy,,,,,,r3   )r7   
get_eventsanyr   )r`   r   outr~   Ltr  s         @r4   storyzWorkStealing.story  s    SS
SSSN--J-?? 	" 	"DAqty  aDC " ",,,,!,,,,, "JJqMMM" 
r3   )r7   r   r}   )r7   r   rb   rc   )rb   rc   )rp   rq   rb   rr   )rv   r   rb   rc   r   )r7   r   rN   r   rb   rc   )r7   r   rN   r,   r   r   rb   rc   )r   r   ri   r   r   r   r   r   r   r   rb   rc   )r   r   r   r&   rb   rc   )r   r   rb   r   )r   r   rb   rc   )r   r   rb   r   )r   r   r'   r   r(   r   rb   r,   )
r   r,   r   r,   r-   r,   rN   r   rb   rc   )r   r   rb   r)   )r   r   rb   rD   )r   r   rb   r   )r.   r/   r0   r1   r   r   rB   ra   ri   rn   ru   rz   rV   r   r   r   r   r   r   r   r   r   r   r^   rh   r   r   r   r  r2   r3   r4   r6   r6   G   s        444433334:UU C C"'%2,,C C C > > 5     JJJ,,,,88882222((((####R R R R>$ $ $ $ $ + + + + =? 	F 	F 	F 	F 	F 	F9 9 9 9A A A A A# # # #       , , , , ) ) ) )   * * * *
5 5 5 5	 	 	 	$& $& $& $&L4 4 4 4n OSB8 B8 B8 B8 B8 B8Hf> f> f> f>P; ; ; ;= = = =# # # #     r3   r6   r7   r   r   r   r   set[WorkerState]rb   WorkerState | Nonec                    |                      |          }|||z  }|r|}n	|j        sd S t          |t          | j        |                    S )Nr   )valid_workersloose_restrictionsminr   worker_objective)r7   r   r   r  valid_thievess        r4   r   r     si     ++B//M )M9 	 -& 	4 gi.H"&M&MNNNNr3   zsplit-shuffle)r7   r   r   r   r   r  rb   r  )8
__future__r   r\   loggingcollectionsr   r   collections.abcr   	functoolsr   mathr   r	   typingr
   r   r   r   r   tlzr   rW   dask.typingr   
dask.utilsr   distributed.compatibilityr   distributed.corer   distributed.diagnostics.pluginr   distributed.utilsr   r   distributed.schedulerr   r   r   r   r   r   	getLoggerr.   r   rX   rY   r   r   r   r   r&   r6   r   r   r2   r3   r4   <module>r&     st   " " " " " "   * * * * * * * * % % % % % %                   @ @ @ @ @ @ @ @ @ @ @ @ @ @              & & & & & & 6 6 6 6 6 6 , , , , , , : : : : : : ; ; ; ; ; ; ; ;               		8	$	$ +//8
9
9            9   J J J J J? J J JZ
O 
O 
O 
O 


r3   