
    0Fie9                       d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlZd d	lmZ d d
lmZ d dlmZm Z  d dl!m"Z"m#Z# d dl$m%Z%  ej&        e'          Z(dddd@dZ) G d d          Z*d gZ+e#fdZ,e-e.e/e0fZ1dAd!Z2dBd&Z3dCdDd)Z4e5fd*Z6d+ Z7 ed,          Z8d-e9e:fdfdEd9Z;dd:dFd?Z<dS )G    )annotationsN)defaultdict)Callable
Collection	CoroutineMapping)partial)cycle)AnyTypeVar)concatdropgroupbymerge)SubgraphCallable)Key)is_namedtuple_instanceparse_timedelta)ConnectionPoolrpc)All)serializerswhowho_hasMapping[Key, Collection[str]]r   r   r   list[str] | Noner   
str | Nonereturn9tuple[dict[Key, object], list[Key], list[Key], list[str]]c               `  K   ddl m d |                                 D             }i }g }t                      }t                      }|rOt	          t
                    }	|                                D ]K\  }
}||z  }||z
  }|r:|	t          j        t          |                                                 |
           L|	sQ|r/t          j
        d           d{V  |                                 |t          |          |t          |          fS fd|	                                D             }t          j        |ddi d{V }t          |	|          D ]9\  }}t          |t                    r|                    |           1t          |t"                    rHt$                              d	|	|         ||
           |	|         D ]}
|                    |
           ||
= t          |t(                    r|t          |t*                    s
J |            |d         dk    r|                    |           |d         dk    sJ |	|         D ]<}
|
|d         v r|d         |
         ||
<   ||
= !||
                             |           =;|O|g |t          |          fS )a  Gather data directly from peers

    Parameters
    ----------
    who_has:
        mapping from keys to worker addresses
    rpc:
        RPC channel to use

    Returns
    -------
    Tuple:

    - Successfully retrieved: ``{key: value, ...}``
    - Keys that were not available on any worker: ``[key, ...]``
    - Keys that raised exception; e.g. failed to deserialize: ``[key, ...]``
    - Workers that failed to respond: ``[address, ...]``

    See Also
    --------
    gather
    _gather
    Scheduler.get_who_has
    r   )get_data_from_workerc                4    i | ]\  }}|t          |          S  )set.0kvs      6lib/python3.11/site-packages/distributed/utils_comm.py
<dictcomp>z'gather_from_workers.<locals>.<dictcomp>:   s$    777tq!CFF777    g333333?Nc                    g | ]@\  }}t          j        t          t          ||           d          d|           AS ))r   r   r!   	operationzget-data-from-)name)asynciocreate_taskretry_operationr	   )r&   addresskeysr!   r   r   r   s      r)   
<listcomp>z'gather_from_workers.<locals>.<listcomp>P   s     
 
 
  ,$/   5
 
 
 0g//  
 
 
r+   return_exceptionsTz2Unexpected error while collecting tasks %s from %s)exc_infostatusbusyOKdata)distributed.workerr!   itemsr$   r   listrandomchoiceappendr0   sleepcleargatherzip
isinstanceOSErroradd	ExceptionloggererrorBaseExceptiondictremove)r   r   r   r   	to_gatherr;   failed_keysmissing_workersbusy_workersdkey	addressesready_addressestasksresultsr3   rr!   s    ```             @r)   gather_from_workersrZ      sM     > 87777777w}}777I DK #O UUL
 @7'oo// 	D 	DNC(I',6O D&-_ 5 5667>>sCCC 	M mD)))))))))""$$$i+tO7L7LLL
 
 
 
 
 
 
 "#
 
 
$  FFFFFFFFFa// 	7 	7JGQ!W%% 7##G,,,,Ay)) 7HgJ	     W: ' 'C&&s+++!#' A}-- 7!!T**--A---X;&(( $$W---{d****W: 7 7Cai''$%fIcNS	%cNN!#--g66667w  @7D [$"7"777r+   c                      e Zd ZdZd Zd ZdS )
WrappedKeya  Interface for a key in a dask graph.

    Subclasses must have .key attribute that refers to a key in a dask graph.

    Sometimes we want to associate metadata to keys in a dask graph.  For
    example we might know that that key lives on a particular machine or can
    only be accessed in a certain way.  Schedulers may have particular needs
    that can only be addressed by additional metadata.
    c                    || _         d S NrT   )selfrT   s     r)   __init__zWrappedKey.__init__   s    r+   c                @    t          |           j         d| j         dS )Nz('z'))type__name__rT   )r`   s    r)   __repr__zWrappedKey.__repr__   s$    t**%555555r+   N)rd   
__module____qualname____doc__ra   re   r#   r+   r)   r\   r\      s<           6 6 6 6 6r+   r\   c                `  K   t          | t                    sJ t          |t                    sJ t          t          d |                                 D                                 }t          t          |                                           \  }}t          t          d         t          |          z  t          |                    }t          dxx         t          |          z  cc<   t          t          |||                    }t          d|          }d |                                D             }fd|D             	 t          fd|                                D                        d{V }                                D ]}	|	                                 d{V  n6#                                 D ]}	|	                                 d{V  w xY wt          d |D                       }
d t          d	|                                          D             }|||
fS )
a  Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  nthreads should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    c              3  (   K   | ]\  }}|g|z  V  d S r^   r#   )r&   wncs      r)   	<genexpr>z%scatter_to_workers.<locals>.<genexpr>   s,      AAuq"1#(AAAAAAr+   r   c                .    i | ]\  }}|d  |D             S )c                    i | ]	\  }}}||
S r#   r#   )r&   _rT   values       r)   r*   z1scatter_to_workers.<locals>.<dictcomp>.<dictcomp>   s     444C#u444r+   r#   )r&   workerr(   s      r)   r*   z&scatter_to_workers.<locals>.<dictcomp>   s-    PPP44!444PPPr+   c                (    i | ]}| |          S r#   r#   )r&   addrr   s     r)   r*   z&scatter_to_workers.<locals>.<dictcomp>   s#    ***D##d))***r+   c                N    g | ]!\  }}|                              |           "S ))r;   )update_data)r&   r3   r(   rpcss      r)   r5   z&scatter_to_workers.<locals>.<listcomp>   s2    WWWzwg222::WWWr+   Nc              3  &   K   | ]}|d          V  dS )nbytesNr#   )r&   os     r)   rm   z%scatter_to_workers.<locals>.<genexpr>   s&      ,,11X;,,,,,,r+   c                .    i | ]\  }}|d  |D             S )c                    g | ]\  }}}|	S r#   r#   )r&   rk   rp   s      r)   r5   z1scatter_to_workers.<locals>.<dictcomp>.<listcomp>   s    &&&Aq1&&&r+   r#   r%   s      r)   r*   z&scatter_to_workers.<locals>.<dictcomp>   s-    III41aq&&A&&&IIIr+      )rF   rM   r>   r   r=   rE   r   _round_robin_counterlenr
   r   r   values	close_rpcr   )nthreadsr;   r   workersnamesworker_iterLrS   outrY   ry   r   rw   s     `         @r)   scatter_to_workersr      s8      h%%%%%dD!!!!!6AA0@0@AAAAABBGsDJJLL)**KE4+A.W=uW~~NNKs4yy(SeT**++A1APPaggiiPPPA*******D WWWWQWWYYWWWXXXXXXXX 	  	 A++--	  	  	 A++--	  ,,,,,,,FII71a==3F3F3H3HIIIG7F##s   3F0 03G#rz   r   handlerCallable[..., Any]c                p   t          |           sJ t          |           }t          | d          rM|                                 \  }}fd|D             }fd|                                D             } ||i |S t          |d          r|                                 n| }fd|D             } || S )zWSpecial pack/unpack dispatcher for namedtuples respecting their potential constructors.__getnewargs_ex__c                &    g | ]} |          S r#   r#   r&   itemr   s     r)   r5   z'_namedtuple_packing.<locals>.<listcomp>   s!    777$777r+   c                .    i | ]\  }}| |          S r#   r#   )r&   r'   r(   r   s      r)   r*   z'_namedtuple_packing.<locals>.<dictcomp>   s'    CCCDAq!WWQZZCCCr+   __getnewargs__c                &    g | ]} |          S r#   r#   r   s     r)   r5   z'_namedtuple_packing.<locals>.<listcomp>   s!    333dGGDMM333r+   )r   rc   hasattrr   r=   r   )rz   r   typargskwargshandled_argshandled_kwargss    `     r)   _namedtuple_packingr      s    !!$$$$$
q''Cq%&& 4**,,f7777$777CCCCFLLNNCCCsL3N333!(.>!?!?F1QD3333d333L3r+   	byte_keysboolfound_futuresset[WrappedKey]c                H   t          |           }|t          u rI| s| S t          | d                   t          u rt                      t          fd| dd         D                       }                               | d         }t                      fd|j                                        D             }d}r|                               rt          d D                       nt          d D                       }t          |j                  |z   }t          ||j        ||j	                  }|f|z   |z   S t          fd	| D                       S t          |           r%t          | t          t          
                    S |t          v r| s| S fd| D             }	 ||	          S |t          u r%| r!fd|                                 D             S | S t!          |t"                    r| j        }
                    |            |
S | S )z[Inner implementation of `unpack_remotedata` that adds found wrapped keys to `found_futures`r   c              3  :   K   | ]}t          |          V  d S r^   _unpack_remotedata_inner)r&   ir   futuress     r)   rm   z+_unpack_remotedata_inner.<locals>.<genexpr>   s0      XXQ1!YHHXXXXXXr+   r}   Nc                :    i | ]\  }}|t          |          S r#   r   )r&   r'   r(   r   r   s      r)   r*   z,_unpack_remotedata_inner.<locals>.<dictcomp>   s<       Aq +Ay'BB  r+   r#   c              3  $   K   | ]}|j         V  d S r^   r_   r&   fs     r)   rm   z+_unpack_remotedata_inner.<locals>.<genexpr>   s$      11A!%111111r+   c              3  $   K   | ]}|j         V  d S r^   r_   r   s     r)   rm   z+_unpack_remotedata_inner.<locals>.<genexpr>   s$      66qu666666r+   c              3  :   K   | ]}t          |          V  d S r^   r   r&   r   r   r   s     r)   rm   z+_unpack_remotedata_inner.<locals>.<genexpr>   sA        MQ(y-HH     r+   )r   r   c                2    g | ]}t          |          S r#   r   r   s     r)   r5   z,_unpack_remotedata_inner.<locals>.<listcomp>  s&    WWWT(y-HHWWWr+   c                :    i | ]\  }}|t          |          S r#   r   )r&   r'   r(   r   r   s      r)   r*   z,_unpack_remotedata_inner.<locals>.<dictcomp>  s<       Aq +Ay-HH  r+   )rc   tupler   r$   updatedskr=   inkeysoutkeyr/   r   r   r	   r   collection_typesrM   
issubclassr\   rT   rH   )rz   r   r   r   r   scr   future_keysr   outsr'   r   s    ``        @r)   r   r      s   
 q''C
e|| 	H!::)))'*uuGXXXXXRSTUTVTVRWXXXXXD  ))) $%Q4BeeG    FLLNN  C "$K G$$W--- !7E1111111166g66666 
 ry))K7%c29fbgFF54<+--     UV      
 	"	" 
"(#+  
 
 	
  	HWWWWWUVWWWs4yy	 	    GGII   
 H	C	$	$ E!r+   Ftuple[Any, set]c                D    t                      }t          | ||          |fS )a  Unpack WrappedKey objects from collection

    Returns original collection and set of all found WrappedKey objects

    Examples
    --------
    >>> rd = WrappedKey('mykey')
    >>> unpack_remotedata(1)
    (1, set())
    >>> unpack_remotedata(())
    ((), set())
    >>> unpack_remotedata(rd)
    ('mykey', {WrappedKey('mykey')})
    >>> unpack_remotedata([1, rd])
    ([1, 'mykey'], {WrappedKey('mykey')})
    >>> unpack_remotedata({1: rd})
    ({1: 'mykey'}, {WrappedKey('mykey')})
    >>> unpack_remotedata({1: [rd]})
    ({1: ['mykey']}, {WrappedKey('mykey')})

    Use the ``byte_keys=True`` keyword to force string keys

    >>> rd = WrappedKey(('x', 1))
    >>> unpack_remotedata(rd, byte_keys=True)
    ("('x', 1)", {WrappedKey('('x', 1)')})
    )r$   r   )rz   r   r   s      r)   unpack_remotedatar     s$    6 &)UUM#Ay-@@-OOr+   c                   t          |           }	 t          |           r| v r|          S n# t          $ r Y nw xY w|t          v r |fd| D                       S |t          u r!fd|                                 D             S t          |           r%t          | t          t                              S | S )a  Merge known data into tuple or dict

    Parameters
    ----------
    o
        core data structures containing literals and keys
    d : dict
        mapping of keys to data

    Examples
    --------
    >>> data = {'x': 1}
    >>> pack_data(('x', 'y'), data)
    (1, 'y')
    >>> pack_data({'a': 'x', 'b': 'y'}, data)  # doctest: +SKIP
    {'a': 1, 'b': 'y'}
    >>> pack_data({'a': ['x'], 'b': 'y'}, data)  # doctest: +SKIP
    {'a': [1], 'b': 'y'}
    c                4    g | ]}t          |           S )	key_types	pack_data)r&   xrS   r   s     r)   r5   zpack_data.<locals>.<listcomp>N  s(    DDDQIai888DDDr+   c                <    i | ]\  }}|t          |           S r   r   )r&   r'   r(   rS   r   s      r)   r*   zpack_data.<locals>.<dictcomp>P  s.    NNNDAq9QY777NNNr+   )rS   r   )
rc   rF   	TypeErrorr   rM   r=   r   r   r	   r   )rz   rS   r   r   s    `` r)   r   r   2  s    ( q''Ca## 	QQ4K    sDDDDD!DDDEEE	NNNNNAGGIINNNN		"	" "1gi1	&R&R&RSSSs   0 
==c                   t          |           }|t          u rD| rBt          | d                   r-| d         ft          fd| dd         D                       z   S |t          u rfd| D             S |t          u r fd|                                 D             S 	                     | |           S # t          $ r | cY S w xY w)aX  Perform substitutions on a tasks

    Parameters
    ----------
    o
        Core data structures containing literals and keys
    d : dict
        Mapping of keys to values

    Examples
    --------
    >>> dsk = {"a": (sum, ["x", 2])}
    >>> data = {"x": 1}
    >>> subs_multiple(dsk, data)  # doctest: +SKIP
    {'a': (sum, [1, 2])}

    r   c              3  8   K   | ]}t          |          V  d S r^   subs_multipler&   r   rS   s     r)   rm   z subs_multiple.<locals>.<genexpr>k  s-      BBq}Q22BBBBBBr+   r}   Nc                0    g | ]}t          |          S r#   r   r   s     r)   r5   z!subs_multiple.<locals>.<listcomp>m  s#    ///a##///r+   c                8    i | ]\  }}|t          |          S r#   r   )r&   r'   r(   rS   s      r)   r*   z!subs_multiple.<locals>.<dictcomp>o  s)    ???6Aq=A&&???r+   )rc   r   callabler>   rM   r=   getr   )rz   rS   r   s    ` r)   r   r   W  s    $ q''C
e|||hqtnn|!wBBBBAabbEBBBBBBB	////Q////	????QWWYY????	55A;; 	 	 	HHH	s   B5 5CCTg?coro$Callable[[], Coroutine[Any, Any, T]]countint	delay_minfloat	delay_maxjitter_fractionretry_on_exceptions5type[BaseException] | tuple[type[BaseException], ...]r.   c                  K   t          |          D ]}	  |              d{V c S # |$ r}|pt          |           }t                              d| d| d| d|            t	          |d|z  dz
  z  |          }	|dk    r|	dt          j                    |z  z   z  }	t          j        |	           d{V  Y d}~d}~ww xY w |              d{V S )	a  
    Return the result of ``await coro()``, re-trying in case of exceptions

    The delay between attempts is ``delay_min * (2 ** i - 1)`` where ``i`` enumerates the attempt that just failed
    (starting at 0), but never larger than ``delay_max``.
    This yields no delay between the first and second attempt, then ``delay_min``, ``3 * delay_min``, etc.
    (The reason to re-try with no delay is that in most cases this is sufficient and will thus recover faster
    from a communication failure).

    Parameters
    ----------
    coro
        The coroutine function to call and await
    count
        The maximum number of re-tries before giving up. 0 means no re-try; must be >= 0.
    delay_min
        The base factor for the delay (in seconds); this is the first non-zero delay between re-tries.
    delay_max
        The maximum delay (in seconds) between consecutive re-tries (without jitter)
    jitter_fraction
        The maximum jitter to add to the delay, as fraction of the total delay. No jitter is added if this
        value is <= 0.
        Using a non-zero value here avoids "herd effects" of many operations re-tried at the same time
    retry_on_exceptions
        A tuple of exception classes to retry. Other exceptions are not caught and re-tried, but propagate immediately.
    operation
        A human-readable description of the operation attempted; used only for logging failures

    Returns
    -------
    Any
        Whatever `await coro()` returned
    Nz	Retrying z after exception in attempt /z:    r}   r   )rangestrrJ   infominr?   r0   rB   )
r   r   r   r   r   r   r.   i_tryexdelays
             r)   retryr   z  s8     X u ' '
	'<<<<<<" 	' 	' 	'!.SYYIKKXIXX5XX5XXTVXX   	QX\2I>>E""V]__>>>-&&&&&&&&&&&&&&	' <<<<<<s   'CBB==Cr-   %Callable[..., Coroutine[Any, Any, T]]r   objectr   c               L  K   t           j                            d          }t          t           j                            d          d          }t          t           j                            d          d          }t	          t          | g|R i |||||           d{V S )zT
    Retry an operation using the configuration values for the retry parameters
    zdistributed.comm.retry.countz distributed.comm.retry.delay.mins)defaultz distributed.comm.retry.delay.max)r   r   r   r.   N)daskconfigr   r   r   r	   )r   r.   r   r   retry_countretry_delay_minretry_delay_maxs          r)   r2   r2     s       +//"@AAK%:;;S  O &:;;S  O &t&&&v&&!!         r+   )
r   r   r   r   r   r   r   r   r   r   )rz   r   r   r   r   r   )rz   r   r   r   r   r   r   r   )F)rz   r   r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r.   r   r   r   )
r   r   r   r   r.   r   r   r   r   r   )=
__future__r   r0   loggingr?   collectionsr   collections.abcr   r   r   r   	functoolsr	   	itertoolsr
   typingr   r   tlzr   r   r   r   dask.configr   dask.optimizationr   dask.typingr   
dask.utilsr   r   distributed.corer   r   distributed.utilsr   	getLoggerrd   rJ   rZ   r\   r~   r   r   r>   r$   	frozensetr   r   r   r   r   r   r   r   EnvironmentErrorIOErrorr   r2   r#   r+   r)   <module>r      s   " " " " " "    # # # # # # D D D D D D D D D D D D                     , , , , , , , , , , , ,     . . . . . .       > > > > > > > > 0 0 0 0 0 0 0 0 ! ! ! ! ! !		8	$	$ %)i8 i8 i8 i8 i8 i8X6 6 6 6 6 6 6 6$ s  25 !$ !$ !$ !$H 4i0    A A A AHP P P P P> % " " " "J  @ GCLL !)97(C 8 8 8 8 8~ !       r+   