
    0Fie
2                       d dl mZ d dlZd dlZd dlmZ d dlmZmZm	Z	 d dl
mZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZmZmZmZmZ d dlmZ erd dl m!Z!m"Z"m#Z#m$Z$m%Z%  ej&        e'          Z( G d de          Z)dS )    )annotationsN)defaultdict)CallableIterableSequence)TYPE_CHECKINGAny)Key)SchedulerPlugin)time)dumps)ToPickle)SchedulerShuffleState	ShuffleIdShuffleRunSpecShuffleSpecbarrier_keyid_from_key)ShuffleWorkerPlugin)Recs	Scheduler	TaskStateTaskStateStateWorkerStatec                      e Zd ZU dZded<   ded<   ded<   ded	<   d
ed<   dHdZdIdZdJdZdKdZdLdZ	dMd"Z
dNd$ZdOd'ZdPd(ZdQd)ZdRd/ZdSd2ZdTd3ZdUd5ZdVd7ZdWd:ZdXd?ZdYdCZdZdEZdIdFZdGS )[ShuffleSchedulerPluginz
    Shuffle plugin for the scheduler
    This coordinates the individual worker plugins to ensure correctness
    and collects heartbeat messages for the dashboard.
    See Also
    --------
    ShuffleWorkerPlugin
    r   	schedulerz&dict[ShuffleId, SchedulerShuffleState]active_shuffleszdefaultdict[ShuffleId, dict]
heartbeatsz2defaultdict[ShuffleId, set[SchedulerShuffleState]]	_shufflesz,defaultdict[str, set[SchedulerShuffleState]]_archived_by_stimulusc                X   || _         | j         j                            | j        | j        | j        | j        d           t          d           | _        i | _	        | j         
                    | d           t          t                    | _        t          t                    | _        d S )N)shuffle_barriershuffle_getshuffle_get_or_createshuffle_restrict_taskc                 *    t          t                    S N)r   dict     Elib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py<lambda>z1ShuffleSchedulerPlugin.__init__.<locals>.<lambda>?   s    k$.?.? r+   shufflename)r   handlersupdatebarriergetget_or_createrestrict_taskr   r   r   
add_pluginsetr    r!   selfr   s     r,   __init__zShuffleSchedulerPlugin.__init__5   s    "&&#'<#x)-);)-);	 	
 	
 	
 &&?&?@@!!!$Y!777$S))%0%5%5"""r+   returnNonec                   K   t                      }| j                            d t          |          d           d {V  d S )Nr.   r/   )r   r   register_worker_pluginr   )r:   r   worker_plugins      r,   startzShuffleSchedulerPlugin.startE   sg      +--n33%&&Y 4 
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
r+   set[ShuffleId]c                *    t          | j                  S r(   )r8   r   )r:   s    r,   shuffle_idsz"ShuffleSchedulerPlugin.shuffle_idsK   s    4'(((r+   idr   run_idint
consistentboolc                >  K   | j         |         }|j        |k    rt          d|d|           |s1|                     |j        | j        dt                                 S d||d}| j                            |t          |j	                             d {V  d S )Nzrun_id=z does not match zp2p-barrier-inconsistent-stimulus_idshuffle_inputs_done)op
shuffle_idrF   )msgworkers)
r   rF   
ValueError_restart_shufflerE   r   r   	broadcastlistparticipating_workers)r:   rE   rF   rH   r.   rP   s         r,   r3   zShuffleSchedulerPlugin.barrierN   s      &r*>V##BBBBBCCC 	((
@@@ )   
 +"OOn&&677 ' 
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
r+   keyr
   workerstrr)   c                    | j         |         }|j        |k    rdd|d| dS |j        |k     rdd|d| dS | j        j        |         }|                     ||           ddiS )NerrorzRequest stale, expected run_id=z for )statusmessagez!Request invalid, expected run_id=r\   OK)r   rF   r   tasks_set_restriction)r:   rE   rF   rW   rX   r.   tss          r,   r6   z$ShuffleSchedulerPlugin.restrict_task^   s    &r*>F""!MfMMGMM   ^f$$!OOOgOO   ^!#&b&)))$r+   wsr   datac                    |                                 D ]F\  }}||                                 v r+| j        |         |j                                     |           Gd S r(   )itemsrD   r   addressr2   )r:   rb   rc   rO   ds        r,   	heartbeatz ShuffleSchedulerPlugin.heartbeatn   se    !ZZ\\ 	B 	BMJT--////
+BJ7>>qAAA	B 	Br+   ToPickle[ShuffleRunSpec]c                    || j         j        vrt          d|          | j        |         }|j                            |           t          |j                  S )Nz$Scheduler is unaware of this worker )r   rQ   RuntimeErrorr   rV   addr   run_spec)r:   rE   rX   states       r,   r4   zShuffleSchedulerPlugin.gets   se    ///AvAA   $R(#''///'''r+   spec#ShuffleSpec | ToPickle[ShuffleSpec]c                   t          |t                    r|j        }	 |                     |j        |          S # t
          $ r |                     |j                   |                     |           |                    |           }|| j	        |j        <   | j
        |j                                     |           |j                            |           t          |j                  cY S w xY wr(   )
isinstancer   rc   r4   rE   KeyError_raise_if_barrier_unknown_raise_if_task_not_processingcreate_new_runr   r    rl   rV   rm   )r:   ro   rW   rX   rn   s        r,   r5   z$ShuffleSchedulerPlugin.get_or_create}   s     dH%% 	9D	,88DGV,,, 
	, 
	, 
	, **47333..s333''--E,1D )N47#''...'++F333EN+++++
	,s   9 B0C,+C,c                    t          |          }	 | j        j        |          d S # t          $ r t	          d|d          w xY w)NzBarrier task with key z does not exist. This may be caused by task fusion during graph generation. Please let us know that you ran into this by leaving a comment at distributed#7816.)r   r   r_   rs   rk   )r:   rE   rW   s      r,   rt   z0ShuffleSchedulerPlugin._raise_if_barrier_unknown   so    "oo	N %%%% 	 	 	F F F F  	s	   % Ac                v    | j         j        |         }|j        dk    rt          d| d|j         d          d S )N
processingz	Expected z to be processing, is .)r   r_   rn   rk   )r:   rW   tasks      r,   ru   z4ShuffleSchedulerPlugin._raise_if_task_not_processing   sJ    ~#C(:%%T4TTtzTTTUUU &%r+   output_partitionsIterable[Any]pick#Callable[[Any, Sequence[str]], str]dict[Any, str]c                    i }| j         j        t          |                   }|j        rt	          |j                  }nt	          | j         j                  }|D ]} |||          }|||<   |S )a  Pin the outputs of a P2P shuffle to specific workers.

        Parameters
        ----------
        id: ID of the shuffle to pin
        output_partitions: Output partition IDs to pin
        pick: Function that picks a worker given a partition ID and sequence of worker

        .. note:
            This function assumes that the barrier task and the output tasks share
            the same worker restrictions.
        )r   r_   r   worker_restrictionsrU   rQ   )	r:   rE   r|   r~   mappingr3   rQ   	partitionrX   s	            r,   _pin_output_workersz*ShuffleSchedulerPlugin._pin_output_workers   s    $ .&{27& 	37677GG4>122G* 	( 	(IT)W--F!'GIr+   ra   r   c                    d|j         v rd S |j                                        |j         d<   | j                            |j        |hi           d S Nshuffle_original_restrictions)r   r   copyr   set_restrictionsrW   )r:   ra   rX   s      r,   r`   z'ShuffleSchedulerPlugin._set_restriction   sW    *bn<< F:<:P:U:U:W:W67''&(:;;;;;r+   c                    d|j         vrd S |j                             d          }| j                            |j        |i           d S r   )r   popr   r   rW   )r:   ra   original_restrictionss      r,   _unset_restrictionz)ShuffleSchedulerPlugin._unset_restriction   sQ     +".@@F " 2 23R S S''1F(GHHHHHr+   r   c                   | j         j        t          |                   }i }|j        D ]-}|j        dk    ri c S |                    |j        di           .|j        dk    rt          d|d          |                    |j        di           |j        D ]<}|j        dk    rt          d| d          |                    |j        di           =|S )Nerredreleasedz$Expected dependents of barrier_task=z! to be 'erred' if the barrier is.zNExpected barrier and its dependents to be 'erred' if the barrier's dependency z is.)	r   r_   r   
dependentsrn   r2   rW   rk   dependencies)r:   rE   barrier_taskrecsdts        r,   _restart_recommendationsz/ShuffleSchedulerPlugin._restart_recommendations   s%   ~+KOO<) 	. 	.Bx7""			KK,----(( ", " " "   	\%z2333+ 	. 	.Bx7"" #D;=D D D   KK,----r+   rL   c                   |                      |          }| j                            ||           | j                            |           d S )NrK   )r   r   transitions!stimulus_queue_slots_maybe_opened)r:   rE   r   rL   r   s        r,   rS   z'ShuffleSchedulerPlugin._restart_shuffle   sQ     ,,R00""4["AAA88[8QQQQQr+   kwargsr	   c                  | j                                                                         D ]]\  }}||j        vrt	          d| d|           }|                     |t          |                     |                     ||           ^| j        	                    |t                                D ]}|                     |j        ||            dS )a  Restart all active shuffles when a participating worker leaves the cluster.

        .. note::
            Due to the order of operations in :meth:`~Scheduler.remove_worker`, the
            shuffle may have already been archived by
            :meth:`~ShuffleSchedulerPlugin.transition`. In this case, the
            ``stimulus_id`` is used as a transaction identifier and all archived shuffles
            with a matching `stimulus_id` are restarted.
        zWorker z left during active rK   N)r   r   re   rV   rk   _fail_on_workersrY   _clean_on_schedulerr!   r4   r8   rS   rE   )r:   r   rX   rL   r   rO   r.   	exceptions           r,   remove_workerz$ShuffleSchedulerPlugin.remove_worker   s      $(#7#<#<#>#>#D#D#F#F 	> 	>JW:::$%Tv%T%T7%T%TUUI!!'3y>>:::$$Z====155k355II 	R 	RG!!'*i[!QQQQ	R 	Rr+   rA   r   finishargsc                  |dvrdS t          |          }|sdS | j                            |          x}r1|                     || d           |                     ||           |dk    ri| j                            |t                                }	|	D ]A}|j        r6| j	        |j                 }
|

                    |           |
s| j	        |j        = @dS dS )zDClean up scheduler and worker state once a shuffle becomes inactive.)r   	forgottenNz
 forgotten)r]   rK   r   )r   r   r4   r   r   r    r   r8   _archived_byr!   remove)r:   rW   rA   r   rL   r   r   rO   r.   shufflesarchiveds              r,   
transitionz!ShuffleSchedulerPlugin.transition  s    222F %%
 	F*..z:::7 	J!!'g3I3I3I!JJJ$$Z[$III[  ~))*cee<<H# M M' M#9':NOHOOG,,,# M 6w7KL ! M Mr+   r.   r   r]   c                h    fdj         D             }| j                            i |           d S )Nc                6    i | ]}|d j         j        dgS )zshuffle-fail)rN   rO   rF   r]   )rE   rF   ).0rX   r]   r.   s     r,   
<dictcomp>z;ShuffleSchedulerPlugin._fail_on_workers.<locals>.<dictcomp>0  sJ     

 

 

  (")*%n&	 

 

 

r+   )rV   r   send_all)r:   r.   r]   worker_msgss    `` r,   r   z'ShuffleSchedulerPlugin._fail_on_workers/  sV    

 

 

 

 

 "7

 

 

 	K00000r+   
str | Nonec                   | j                             |          }|j        s)|r'||_        | j        |                             |           t          j        t                    5  | j        |= d d d            n# 1 swxY w Y   | j	        j
        t          |                   }|j        D ]}|                     |           d S r(   )r   r   r   r!   rl   
contextlibsuppressrs   r   r   r_   r   r   r   )r:   rE   rL   r.   r   r   s         r,   r   z*ShuffleSchedulerPlugin._clean_on_scheduler=  s
   &**2..# 	A 	A#.G &{377@@@ ** 	$ 	$#	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ ~+KOO<) 	( 	(B##B''''	( 	(s   $	A99A= A=c                    | j                                          | j                                         | j                                         | j                                         d S r(   )r   clearr   r    r!   r9   s     r,   restartzShuffleSchedulerPlugin.restartJ  sZ    ""$$$"((*****r+   N)r   r   )r   r   r<   r=   )r<   rB   )rE   r   rF   rG   rH   rI   r<   r=   )
rE   r   rF   rG   rW   r
   rX   rY   r<   r)   )rb   r   rc   r)   r<   r=   )rE   r   rX   rY   r<   ri   )ro   rp   rW   r
   rX   rY   r<   ri   )rE   r   r<   r=   )rW   r
   r<   r=   )rE   r   r|   r}   r~   r   r<   r   )ra   r   rX   rY   r<   r=   )ra   r   r<   r=   )rE   r   r<   r   )rE   r   r   r   rL   rY   r<   r=   )
r   r   rX   rY   rL   rY   r   r	   r<   r=   )rW   r
   rA   r   r   r   r   r	   rL   rY   r   r	   r<   r=   )r.   r   r]   rY   r<   r=   )rE   r   rL   r   r<   r=   )__name__
__module____qualname____doc____annotations__r;   rA   rD   r3   r6   rh   r4   r5   rt   ru   r   r`   r   r   rS   r   r   r   r   r   r*   r+   r,   r   r   %   s          ;;;;,,,,AAAAGGGG6 6 6 6 
 
 
 
) ) ) )
 
 
 
         B B B B
( ( ( (, , , ,0	 	 	 	V V V V
   >< < < <I I I I   :R R R RR R R R4M M M M:1 1 1 1( ( ( (+ + + + + +r+   r   )*
__future__r   r   loggingcollectionsr   collections.abcr   r   r   typingr   r	   dask.typingr
   distributed.diagnostics.pluginr   distributed.metricsr   distributed.protocol.pickler   distributed.protocol.serializer   distributed.shuffle._corer   r   r   r   r   r   "distributed.shuffle._worker_pluginr   distributed.schedulerr   r   r   r   r   	getLoggerr   loggerr   r*   r+   r,   <module>r      s   " " " " " "      # # # # # # 8 8 8 8 8 8 8 8 8 8 % % % % % % % %       : : : : : : $ $ $ $ $ $ - - - - - - 3 3 3 3 3 3                C B B B B B               
	8	$	$i+ i+ i+ i+ i+_ i+ i+ i+ i+ i+r+   