
    0FieP8                    0   d dl mZ d dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZmZmZ d dlmZ d dlmZ d dlmZm Z  e	r
d dl!Z"d dl#m$Z$  ej%        e&          Z' G d d          Z( G d de          Z)dS )    )annotationsN)Sequence)ThreadPoolExecutor)TYPE_CHECKINGAnyoverload)thread_state)parse_bytes)WorkerPlugin)ToPickle)NDIndex	ShuffleId
ShuffleRunShuffleRunSpecShuffleSpec)ShuffleClosedError)ResourceLimiter)
log_errorssync)Workerc                      e Zd ZU ded<   ded<   ded<   ded<   d	ed
<   ded<   d.dZd/dZd0dZd1dZd2dZd3dZ	d4d"Z
d5d%Z	 	 d6d7d*Zed8d+            Zed9d,            Z	 	 d6d:d-Zd&S );_ShuffleRunManagerboolclosedzdict[ShuffleId, ShuffleRun]_active_runszset[ShuffleRun]_runszdict[ShuffleId, int]_stale_run_idszasyncio.Condition_runs_cleanup_conditionShuffleWorkerPlugin_pluginpluginreturnNonec                    d| _         i | _        t                      | _        i | _        t          j                    | _        || _        d S )NF)	r   r   setr   r   asyncio	Conditionr   r    )selfr!   s     Blib/python3.11/site-packages/distributed/shuffle/_worker_plugin.py__init__z_ShuffleRunManager.__init__/   s@    UU
 '.'8':':$    dict[ShuffleId, Any]c                H    d | j                                         D             S )Nc                >    i | ]\  }}||                                 S  )	heartbeat).0idshuffle_runs      r)   
<dictcomp>z0_ShuffleRunManager.heartbeat.<locals>.<dictcomp>8   s7     
 
 
,;BB%%''
 
 
r+   )r   itemsr(   s    r)   r0   z_ShuffleRunManager.heartbeat7   s2    
 
?C?P?V?V?X?X
 
 
 	
r+   
shuffle_idr   run_idintmessagestrc                   | j                             ||          }||k     r
|| j         |<   | j                            |d           }||j        |k    rd S | j                            |           t          |          }|                    |           | j        j	        j
                            | j        |           d S N)r   
setdefaultr   getr8   popRuntimeErrorfailr    worker_ongoing_background_tasks	call_soonclose)r(   r7   r8   r:   stale_run_idr3   	exceptions          r)   rB   z_ShuffleRunManager.fail<   s    *55j&II&  .4D
+'++J==+"4">">Fj))) ))	###5??
KXXXXXr+   r3   r   c           	     *  K   t                      5  	 |                                 d {V  | j        4 d {V  | j                            |           | j                                         d d d           d {V  n# 1 d {V swxY w Y   ni# | j        4 d {V  | j                            |           | j                                         d d d           d {V  w # 1 d {V swxY w Y   w xY wd d d            d S # 1 swxY w Y   d S r=   )r   rF   r   r   remove
notify_all)r(   r3   s     r)   rF   z_ShuffleRunManager.closeJ   sl     \\ 	> 	>>!'')))))))))7 > > > > > > > >J%%k2220;;===> > > > > > > > > > > > > > > > > > > > > > > > > > >47 > > > > > > > >J%%k2220;;===> > > > > > > > > > > > > > > > > > > > > > > > > > > >		> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	> 	>sn   DBD4B .D 
B
	
DB
	DC8 4C&	C8&
C00C83C04C88DDDc                ~   K    j         rJ d _          j        rM j                                        \  }} j        j        j                             j        |            j        M j        4 d {V   j        	                     fd           d {V  d d d           d {V  d S # 1 d {V swxY w Y   d S )NTc                      j          S r=   )r   r6   s   r)   <lambda>z-_ShuffleRunManager.teardown.<locals>.<lambda>^   s    DJ r+   )
r   r   popitemr    rC   rD   rE   rF   r   wait_for)r(   _r3   s   `  r)   teardownz_ShuffleRunManager.teardownS   s     ; 	!.6688NA{L9CC
K    	 / 	P 	P 	P 	P 	P 	P 	P 	P.778N8N8N8NOOOOOOOOO	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	P 	Ps   5$B,,
B69B6c                `  K   | j                             |d          }||j        |k     r|                     |           d{V }|j        |k    rt	          d|d|           |j        |k     rt	          d|d|           | j        rt          |  d          |j        r|j        |S )a  Get the shuffle matching the ID and run ID.

        If necessary, this method fetches the shuffle run from the scheduler plugin.

        Parameters
        ----------
        shuffle_id
            Unique identifier of the shuffle
        run_id
            Unique identifier of the shuffle run

        Raises
        ------
        KeyError
            If the shuffle does not exist
        RuntimeError
            If the run_id is stale
        N)r7   zrun_id=z stale, got z invalid, got  has already been closed)r   r?   r8   _refreshrA   r   r   
_exceptionr(   r7   r8   r3   s       r)   get_with_run_idz"_ShuffleRunManager.get_with_run_id`   s     & '++J==+"4v"="= $% !. ! !      K &&D&DD{DDEEE&((F&FFFFGGG; 	H$%F%F%FGGG! 	)((r+   specr   keyc                   K   | j                             |j        d          }|#|                     |j        ||           d{V }| j        rt          |  d          |j        r|j        |S )a  Get or create a shuffle matching the ID and data spec.

        Parameters
        ----------
        shuffle_id
            Unique identifier of the shuffle
        type:
            Type of the shuffle operation
        key:
            Task key triggering the function
        Nr7   rY   rZ   rT   )r   r?   r2   rU   r   r   rV   )r(   rY   rZ   r3   s       r)   get_or_createz _ShuffleRunManager.get_or_create   s       '++DGT:: $7 !. ! !      K ; 	H$%F%F%FGGG! 	)((r+   run_idsSequence[int]c                Z   K   |                      |t          |                     d{V S )a  Get the shuffle matching the ID and most recent run ID.

        If necessary, this method fetches the shuffle run from the scheduler plugin.

        Parameters
        ----------
        shuffle_id
            Unique identifier of the shuffle
        run_ids
            Sequence of possibly different run IDs

        Raises
        ------
        KeyError
            If the shuffle does not exist
        RuntimeError
            If the most recent run_id is stale
        r7   r8   N)rX   max)r(   r7   r^   s      r)   get_most_recentz"_ShuffleRunManager.get_most_recent   s9      * ))ZG)UUUUUUUUUr+   NShuffleSpec | None
str | Noner   c                P  K   |<| j         j        j                            || j         j        j                   d {V }nI| j         j        j                            t          |          || j         j        j                   d {V }t          |t                    r|j        }|S )N)r2   rC   )rY   rZ   rC   )	r    rC   	schedulershuffle_getaddressshuffle_get_or_creater   
isinstancedata)r(   r7   rY   rZ   results        r)   _fetchz_ShuffleRunManager._fetch   s       <<.8DD|*2 E        FF
  <.8NNd^^|*2 O        F
 fh'' 	![Fr+   c                
   K   d S r=   r/   )r(   r7   s     r)   rU   z_ShuffleRunManager._refresh   s      
 	r+   c                
   K   d S r=   r/   )r(   r7   rY   rZ   s       r)   rU   z_ShuffleRunManager._refresh   s       	r+   c                D  K   |                      |||           d {V }| j        rt          |  d          | j                            |d           x}r8|j        |j        k    r|S |                     ||j        |d|j                    | j                            |d           }|%||j        k    rt          d|j         d|           |j	        
                    |j        |j        | j                  }|| j        |<   | j                            |           |S )Nr\   rT   z stale, expected run_id==z'Received stale shuffle run with run_id=z; expected run_id > )rn   r   r   r   r?   r8   rB   r   rA   rY   create_run_on_worker
worker_forr    r   add)r(   r7   rY   rZ   rm   existingrG   r3   s           r)   rU   z_ShuffleRunManager._refresh   sf      {{jt{MMMMMMMM; 	H$%F%F%FGGG(,,Z>>>8 	&-//		OKKFMKK  
 *..z4@@#(E(E5&- 5 5&25 5   k66M6,dl
 
 )4*%
{###r+   )r!   r   r"   r#   r"   r,   r7   r   r8   r9   r:   r;   r"   r#   )r3   r   r"   r#   )r"   r#   r7   r   r8   r9   r"   r   rY   r   rZ   r;   r"   r   )r7   r   r^   r_   r"   r   )NN)r7   r   rY   rd   rZ   re   r"   r   )r7   r   r"   r   )r7   r   rY   r   rZ   r;   r"   r   )r7   r   rY   rd   rZ   re   r"   r   )__name__
__module____qualname____annotations__r*   r0   rB   rF   rR   rX   r]   rc   rn   r   rU   r/   r+   r)   r   r   #   s        LLL----
 )(((....       
 
 
 

Y Y Y Y> > > >P P P P" " " "H   4V V V V4 $(	    .    X    X $(	      r+   r   c                      e Zd ZU dZded<   ded<   ded<   ded<   d	ed
<   d3dZd4dZd4dZd5dZd6dZ	d7dZ
d8dZd9d#Zd:d&Zd;d(Zd<d*Zd3d+Zd:d,Zd;d-Zd=d.Z	 d>d?d2Zd/S )@r   a  Interface between a Worker and a Shuffle.

    This extension is responsible for

    - Lifecycle of Shuffle instances
    - ensuring connectivity between remote shuffle instances
    - ensuring connectivity and integration with the scheduler
    - routing concurrent calls to the appropriate `Shuffle` based on its `ShuffleID`
    - collecting instrumentation of ongoing shuffles and route to scheduler/worker
    r   rC   r   shuffle_runsr   memory_limiter_commsmemory_limiter_diskr   r   r"   r#   c                   | j         |j        d<   | j        |j        d<   | j        |j        d<   | |j        d<   || _        t          |           | _        t          t          d                    | _        t          t          d                    | _        d| _        t          | j        j        j                  | _        d S )Nshuffle_receiveshuffle_inputs_donezshuffle-failshufflez100 MiBz1 GiBF)r   handlersr   shuffle_failstream_handlers
extensionsrC   r   r   r   r
   r   r   r   r   statenthreads	_executorr(   rC   s     r)   setupzShuffleWorkerPlugin.setup  s    -1-A)*151I-.151B~.'+)$ .t44$3K	4J4J$K$K!#2;w3G3G#H#H +DK,=,FGGr+   r;   c                     d| j         j         S )NzShuffleWorkerPlugin on )rC   ri   r6   s    r)   __str__zShuffleWorkerPlugin.__str__  s    >)<>>>r+   c                2    d| j         j        d| j         dS )Nz<ShuffleWorkerPlugin, worker=z	, closed=>)rC   address_safer   r6   s    r)   __repr__zShuffleWorkerPlugin.__repr__   s$    bt{/GbbTXT_bbbbr+   r,   c                4    | j                                         S r=   )r   r0   r6   s    r)   r0   zShuffleWorkerPlugin.heartbeat'  s     **,,,r+   r7   r   r8   r9   rl   list[tuple[int, Any]] | bytesc                x   K   |                      ||           d{V }|                    |           d{V  dS )z
        Handler: Receive an incoming shard of data from a peer worker.
        Using an unknown ``shuffle_id`` is an error.
        N)_get_shuffle_runreceive)r(   r7   r8   rl   r3   s        r)   r   z#ShuffleWorkerPlugin.shuffle_receive*  s[       !11*fEEEEEEEE!!$'''''''''''r+   c                   K   t                      5  |                     ||           d{V }|                                 d{V  ddd           dS # 1 swxY w Y   dS )z
        Handler: Inform the extension that all input partitions have been handed off to extensions.
        Using an unknown ``shuffle_id`` is an error.
        N)r   r   inputs_donerW   s       r)   r   z'ShuffleWorkerPlugin.shuffle_inputs_done7  s      
 \\ 	, 	, $ 5 5j& I IIIIIIIK))+++++++++	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	,s   7AAAr:   c                @    | j                             |||           dS )aC  Fails the shuffle run with the message as exception and triggers cleanup.

        .. warning::
            To guarantee the correct order of operations, shuffle_fail must be
            synchronous. See
            https://github.com/dask/distributed/pull/7486#discussion_r1088857185
            for more details.
        )r7   r8   r:   N)r   rB   )r(   r7   r8   r:   s       r)   r   z ShuffleWorkerPlugin.shuffle_fail@  s(     	*VWUUUUUr+   r   partition_idint | NDIndexrY   r   kwargsc                L    |                      |          } |j        d||d|S )N)rl   r   r/   )get_or_create_shuffleadd_partition)r(   rl   r   rY   r   r3   s         r)   r   z!ShuffleWorkerPlugin.add_partitionK  sH     0066({( 
%
 
 
 
 	
r+   r^   r_   c                ~   K   | j                             ||           d{V }|                    |           d{V S )z
        Task: Note that the barrier task has been reached (`add_partition` called for all input partitions)

        Using an unknown ``shuffle_id`` is an error. Calling this before all partitions have been
        added is undefined.
        N)r   rc   barrier)r(   r7   r^   r3   s       r)   _barrierzShuffleWorkerPlugin._barrierY  sZ       !-==j'RRRRRRRR !((111111111r+   r   c                J   K   | j                             ||           d {V S )Nra   )r   rX   r(   r7   r8   s      r)   r   z$ShuffleWorkerPlugin._get_shuffle_rune  sK      
 &66!& 7 
 
 
 
 
 
 
 
 	
r+   rZ   c                J   K   | j                             ||           d {V S )N)rY   rZ   )r   r]   r(   rY   rZ   s      r)   _get_or_create_shufflez*ShuffleWorkerPlugin._get_or_create_shufflen  s6      
 &44$C4HHHHHHHHHr+   c                   K   | j         rJ d| _         | j                                         d {V  	 | j                            d           d S # t
          $ r | j                                         Y d S w xY w)NT)cancel_futures)r   r   rR   r   shutdown	Exceptionr   s     r)   rR   zShuffleWorkerPlugin.teardownu  s      ;((*********	&N##4#88888 	& 	& 	&N##%%%%%%	&s   A #A76A7c                H    t          | j        j        | j        ||          }|S r=   )r   rC   loopr   )r(   r7   r^   rm   s       r)   r   zShuffleWorkerPlugin.barrier  s!    dk&z7KKr+   c                N    t          | j        j        | j        j        ||          S r=   )r   rC   r   r   rX   r   s      r)   get_shuffle_runz#ShuffleWorkerPlugin.get_shuffle_run  s-    
 K-	
 
 	
r+   c                f    t           j        }t          | j        j        | j        j        ||          S r=   )r	   rZ   r   rC   r   r   r]   r   s      r)   r   z)ShuffleWorkerPlugin.get_or_create_shuffle  s5     K+	
 
 	
r+   Nmetapd.DataFrame | Nonec                v    |                      ||          }t          j        }|                    |||          S )z
        Task: Retrieve a shuffled output partition from the ShuffleWorkerPlugin.

        Calling this for a ``shuffle_id`` which is unknown or incomplete is an error.
        )r   rZ   r   )r   r	   rZ   get_output_partition)r(   r7   r8   r   r   r3   rZ   s          r)   r   z(ShuffleWorkerPlugin.get_output_partition  sG     **:v>>//% 0 
 
 	
r+   )rC   r   r"   r#   )r"   r;   rv   )r7   r   r8   r9   rl   r   r"   r#   )r7   r   r8   r9   r"   r#   rw   )
rl   r   r   r   rY   r   r   r   r"   r9   )r7   r   r^   r_   r"   r9   rx   ry   )rY   r   r"   r   r=   )
r7   r   r8   r9   r   r   r   r   r"   r   )rz   r{   r|   __doc__r}   r   r   r   r0   r   r   r   r   r   r   r   rR   r   r   r   r   r/   r+   r)   r   r      s        	 	 NNN$$$$))))((((LLLH H H H? ? ? ?c c c c- - - -( ( ( (, , , ,	V 	V 	V 	V
 
 
 

2 
2 
2 
2
 
 
 
I I I I& & & &   

 

 

 



 

 

 

" %)
 
 
 
 
 
 
r+   r   )*
__future__r   r&   loggingcollections.abcr   concurrent.futuresr   typingr   r   r   dask.contextr	   
dask.utilsr
   distributed.diagnostics.pluginr   distributed.protocol.serializer   distributed.shuffle._corer   r   r   r   r   distributed.shuffle._exceptionsr   distributed.shuffle._limiterr   distributed.utilsr   r   pandaspddistributed.workerr   	getLoggerrz   loggerr   r   r/   r+   r)   <module>r      s   " " " " " "   $ $ $ $ $ $ 1 1 1 1 1 1 / / / / / / / / / / % % % % % % " " " " " " 7 7 7 7 7 7 3 3 3 3 3 3              ? > > > > > 8 8 8 8 8 8 . . . . . . . . *)))))) 
	8	$	$V V V V V V V Vru
 u
 u
 u
 u
, u
 u
 u
 u
 u
r+   