
    0Fies5                       d dl mZ d dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZmZmZ d dlZd dlmZ d d	lmZ d d
lmZmZ d dlmZ d dlm Z  d dl!m"Z"m#Z#  ej$        e%          Z&e&'                     e"d                     e&'                     e"d                      G d de          Z( G d de          Z) G d dej*        ee+f                   Z,d'dZ- G d de.          Z/ G d d e.          Z0 G d! d"e.          Z1 G d# d$ej2                  Z3 G d% d&ej4        ee+e5f                   Z6dS )(    )annotationsN)defaultdict)CallableHashableIteratorMappingMutableMappingSized)contextmanager)partial)Literal
NamedTupleProtocolcast)Key)context_meter)deserialize_bytesserialize_bytelist)get_compression_settingssafe_sizeof)RateLimiterFilternbytesz#Spill file on disk reached capacityzSpill to disk failedc                  8    e Zd ZU dZded<   ded<   d
dZd
dZd	S )SpilledSizez7Size of a key/value pair when spilled to disk, in bytesintmemorydiskotherreturnc                V    t          | j        |j        z   | j        |j        z             S Nr   r   r   selfr   s     1lib/python3.11/site-packages/distributed/spill.py__add__zSpilledSize.__add__    $    4;5ty5:7MNNN    c                V    t          | j        |j        z
  | j        |j        z
            S r"   r#   r$   s     r&   __sub__zSpilledSize.__sub__#   r(   r)   N)r   r   r    r   )__name__
__module____qualname____doc____annotations__r'   r+    r)   r&   r   r      sb         AA KKKIIIO O O OO O O O O Or)   r   c                  2    e Zd ZdZedd            Zd	dZdS )
ManualEvictProtoaS  Duck-type API that a third-party alternative to SpillBuffer must respect (in
    addition to MutableMapping) if it wishes to support spilling when the
    ``distributed.worker.memory.spill`` threshold is surpassed.

    This is public API. At the moment of writing, Dask-CUDA implements this protocol in
    the ProxifyHostFile class.
    r    Sized | boolc                    dS )zAccess to fast memory. This is normally a MutableMapping, but for the purpose
        of the manual eviction API it is just tested for emptiness to know if there is
        anything to evict.
        Nr1   r%   s    r&   fastzManualEvictProto.fast0   s	     	r)   r   c                    dS )a  Manually evict a key/value pair from fast to slow memory.
        Return size of the evicted value in fast memory.

        If the eviction failed for whatever reason, return -1. This method must
        guarantee that the key/value pair that caused the issue has been retained in
        fast memory and that the problem has been logged internally.

        This method never raises.
        Nr1   r6   s    r&   evictzManualEvictProto.evict8   s	     	r)   N)r    r4   r    r   )r,   r-   r.   r/   propertyr7   r9   r1   r)   r&   r3   r3   '   sR             X
 
 
 
 
 
r)   r3   c                      e Zd ZU dZded<   ded<   	 d&d' fdZed(d            Zed)d            Zd* fdZ	d+dZ
d, fdZd- fdZd.d/dZed0d             Zed0d!            Zed1d#            Zed2d%            Z xZS )3SpillBuffera  MutableMapping that automatically spills out dask key/value pairs to disk when
    the total size of the stored data exceeds the target. If max_spill is provided the
    key/value pairs won't be spilled once this threshold has been reached.

    Parameters
    ----------
    spill_directory: str
        Location on disk to write the spill files to
    target: int
        Managed memory, in bytes, to start spilling at
    max_spill: int | False, optional
        Limit of number of bytes to be spilled on disk. Set to False to disable.
    zset[Key]logged_pickle_errorsz#defaultdict[tuple[str, str], float]cumulative_metricsFspill_directorystrtargetr   	max_spillint | Literal[False]c                    t          ||          }t          j        |t          j                              }t	                                          i ||t                     t                      | _        t          t                    | _        d S )N)r7   slownweight)SlowzictCacheWeakValueMappingsuper__init___in_memory_weightsetr>   r   floatr?   )r%   r@   rB   rC   rF   slow_cached	__class__s         r&   rN   zSpillBuffer.__init__X   sr     OY//jt'<'>'>??b{fEVWWW$'EE!"-e"4"4r)   r    Iterator[None]c              #  z    K   d fd	}t          j        |          5  d
V  d
d
d
           d
S # 1 swxY w Y   d
S )as  Capture metrics re. disk read/write, serialize/deserialize, and
        compress/decompress.

        Note that this duplicates capturing from gather_dep, get_data, and execute. It
        is repeated here to make it possible to split serialize/deserialize and
        compress/decompress triggered by spill/unspill from those triggered by network
        comms.
        labelr   valuerQ   unitrA   r    Nonec                d    t          | t                    sJ j        | |fxx         |z  cc<   d S r"   )
isinstancerA   r?   )rV   rW   rX   r%   s      r&   metrics_callbackz6SpillBuffer._capture_metrics.<locals>.metrics_callbackr   sB    eS)))))#E4K000E900000r)   N)rV   r   rW   rQ   rX   rA   r    rY   )r   add_callback)r%   r\   s   ` r&   _capture_metricszSpillBuffer._capture_metricsg   s      	: 	: 	: 	: 	: 	: '(899 	 	EEE	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   044key
Key | Nonec              #  @  K   	 d V  d S # t           $ rL}|j        \  }|| j        v sJ || j        vsJ t                              d           t                      d }~wt          $ r+ t                              dd           t                      t          $ r}|j        \  }}|| j        v sJ || j        vsJ ||k    r	|J | |= ||| j
        vr7t                              d|d           | j
                            |           t                      d }~ww xY w)Nz;Spill file on disk reached capacity; keeping data in memoryz,Spill to disk failed; keeping data in memoryT)exc_infozFailed to pickle %r)MaxSpillExceededargsr7   rF   loggerwarningHandledErrorOSErrorerrorPickleErrorr>   add)r%   r_   ekey_eorig_es        r&   _handle_errorszSpillBuffer._handle_errorsy   sp     #	%EEEEE 	! 	! 	!vHUDI%%%%	))))NNM   ..  	! 	! 	!LLGRVLWWW..  	% 	% 	%FME6DI%%%%	))))||
 I  999LL!6LMMM-11%888"nn$)	%s#   
 
DAAA DA=DDr   rW   objectrY   c                   	 |                                  5  |                     |          5  t                                          ||           | j                            |           ddd           n# 1 swxY w Y   ddd           dS # 1 swxY w Y   dS # t          $ r || j        v sJ || j        vsJ Y dS w xY w)a  If sizeof(value) < target, write key/value pair to self.fast; this may in
        turn cause older keys to be spilled from fast to slow.
        If sizeof(value) >= target, write key/value pair directly to self.slow instead.

        Raises
        ------
        Exception
            sizeof(value) >= target, and value failed to pickle.
            The key/value pair has been forgotten.

        In all other cases:

        - an older value was evicted and failed to pickle,
        - this value or an older one caused the disk to fill and raise OSError,
        - this value or an older one caused the max_spill threshold to be exceeded,

        this method does not raise and guarantees that the key/value that caused the
        issue remained in fast.
        N)	r^   ro   rM   __setitem__r>   discardrg   r7   rF   )r%   r_   rW   rS   s      r&   rr   zSpillBuffer.__setitem__   sc   (	(&&(( 7 7$*=*=c*B*B 7 7##C///)11#6667 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7  	( 	( 	($)####di'''''''	(sW   B B=A6*B6A:	:B=A:	>BB BB BB  B?>B?c                Z   	 |                                  5  |                     d          5  | j                                        \  }}}t	          t
          |          cddd           cddd           S # 1 swxY w Y   ddd           dS # 1 swxY w Y   dS # t          $ r Y dS w xY w)a  Implementation of :meth:`ManualEvictProto.evict`.

        Manually evict the oldest key/value pair, even if target has not been
        reached. Returns sizeof(value).
        If the eviction failed (value failed to pickle, disk full, or max_spill
        exceeded), return -1; the key/value pair that caused the issue will remain in
        fast. The exception has been logged internally.
        This method never raises.
        N)r^   ro   r7   r9   r   r   rg   )r%   _rH   s      r&   r9   zSpillBuffer.evict   sU   	&&(( ) )$*=*=d*C*C ) )#y001fC(() ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) )  	 	 	22	s]   B B2A7B*B 7A;	;B>A;	?BB BB BB 
B*)B*c                V   |                                  5  || j        v rQt          t          | j        j        |                   }t          j        ddd           t          j        d|d           t                                          |          cd d d            S # 1 swxY w Y   d S )Nzmemory-read   countbytes)	r^   r7   r   r   weightsr   digest_metricrM   __getitem__)r%   r_   memory_sizerS   s      r&   r}   zSpillBuffer.__getitem__   s    ""$$ 	, 	,di #3	(9#(>?? +M1gFFF+M;PPP77&&s++	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	,s   A;BB"%B"c                ~    t                                          |           | j                            |           d S r"   )rM   __delitem__r>   rs   r%   r_   rS   s     r&   r   zSpillBuffer.__delitem__   s8    C   !))#.....r)   Ndefaultc                     t          d          )NzAre you calling .pop(key, None) as a way to discard a key if it exists?It may cause data to be read back from disk! Please use `del` instead.)NotImplementedError)r%   r_   r   s      r&   popzSpillBuffer.pop   s    !U
 
 	
r)   Mapping[Key, object]c                    | j         S )zxKey/value pairs stored in RAM. Alias of zict.Buffer.fast.
        For inspection only - do not modify directly!
        )r7   r6   s    r&   r   zSpillBuffer.memory       
 yr)   c                    | j         S )z~Key/value pairs spilled out to disk. Alias of zict.Buffer.slow.
        For inspection only - do not modify directly!
        )rF   r6   s    r&   r   zSpillBuffer.disk   r   r)   rI   c                t    t          t          j        | j                  }t          t          |j                  S r"   )r   rJ   rK   rF   rI   data)r%   caches     r&   _slow_uncachedzSpillBuffer._slow_uncached   s'    TZ++D%*%%%r)   r   c                    | j         j        S )zNumber of bytes spilled to disk. Tuple of

        - output of sizeof()
        - pickled size

        The two may differ substantially, e.g. if sizeof() is inaccurate or in case of
        compression.
        )r   total_weightr6   s    r&   spilled_totalzSpillBuffer.spilled_total   s     "//r)   F)r@   rA   rB   r   rC   rD   )r    rT   )r_   r`   r    rT   r_   r   rW   rp   r    rY   r:   r_   r   r    rp   r_   r   r    rY   r"   )r_   r   r   rp   r    rp   )r    r   )r    rI   )r    r   )r,   r-   r.   r/   r0   rN   r   r^   ro   rr   r9   r}   r   r   r;   r   r   r   r   __classcell__rS   s   @r&   r=   r=   E   s          #""";;;; +0	5 5 5 5 5 5 5    ^" $% $% $% ^$%L( ( ( ( ( (8   ", , , , , ,/ / / / / /
 
 
 
 
    X    X & & & X& 	0 	0 	0 X	0 	0 	0 	0 	0r)   r=   r_   r   rW   rp   r    r   c                     t          |          S r"   r   )r_   rW   s     r&   rO   rO     s    ur)   c                      e Zd ZdS )rc   Nr,   r-   r.   r1   r)   r&   rc   rc   
          Dr)   rc   c                      e Zd ZdS )rj   Nr   r1   r)   r&   rj   rj     r   r)   rj   c                      e Zd ZdS )rg   Nr   r1   r)   r&   rg   rg     r   r)   rg   c                        e Zd Zd fdZ xZS )
AnyKeyFiler_   r   r    rA   c                `    t                                          t          |                    S r"   )rM   	_safe_keyrA   r   s     r&   r   zAnyKeyFile._safe_key  s!    ww  S***r)   )r_   r   r    rA   )r,   r-   r.   r   r   r   s   @r&   r   r     s=        + + + + + + + + + +r)   r   c                  ^     e Zd ZU ded<   ded<   ded<   dd fd
ZddZddZd fdZ xZS )rI   rD   
max_weightzdict[Key, SpilledSize]weight_by_keyr   r   Fr@   rA   c           
        t          d          }t          t          t          gt          f         t          t          |d                    }t                                          |t          t          t          t          t          f         t          |                               || _        i | _        t          dd          | _        d S )Nz+distributed.worker.memory.spill-compressionraise)compressionon_errorr   )r   r   r   rp   rz   r   r   rM   rN   r   r	   r   r   r   r   r   r   )r%   r@   r   r   dumprS   s        r&   rN   zSlow.__init__!  s    .9
 
 fXu_%&K'RRR
 
 	U
+Z-H-HII	
 	
 	

 %'1--r)   r_   r   r    rp   c                   t          j        dd          5  | j        |         }d d d            n# 1 swxY w Y   t          j        ddd           t          j        dt	          |          d           |                     |          }|S )Nz	disk-readsecondsrx   ry   rz   )r   meterdr|   lenload)r%   r_   pickledouts       r&   r}   zSlow.__getitem__8  s     i88 	" 	"fSkG	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	"#KG<<<#KWwGGGii  
s   044rW   rY   c                |   	 |                      |          }n"# t          $ r}t          ||          d }~ww xY w|| j        vsJ || j        vsJ t          t          t          |                    }| j        dur'| j	        j
        |z   | j        k    rt          |          t          j        dd          5  || j        |<   d d d            n# 1 swxY w Y   t          j        ddd           t          j        d|d           t          t!          |          |          }|| j        |<   | xj	        |z  c_	        d S )NFz
disk-writer   rx   ry   rz   )r   	Exceptionrj   r   r   summapr   r   r   r   rc   r   r   r|   r   r   )r%   r_   rW   r   rl   pickled_sizerH   s          r&   rr   zSlow.__setitem__@  s   	&ii&&GG 	& 	& 	& c1%%%		& $&    $,,,,,3vw//00 O5((!&5GG #3'''  y99 	" 	"!DF3K	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	" 	"#L!W===#L,HHH[//>>"(3V#s"    
7277CCCc                    t                                          |           | xj        | j                            |          z  c_        d S r"   )rM   r   r   r   r   r   s     r&   r   zSlow.__delitem__c  sF    C   T/33C888r)   r   )r@   rA   r   rD   r   r   r   )	r,   r-   r.   r0   rN   r}   rr   r   r   r   s   @r&   rI   rI     s         $$$$)))). . . . . . ..   !$ !$ !$ !$F9 9 9 9 9 9 9 9 9 9r)   rI   )r_   r   rW   rp   r    r   )7
__future__r   loggingcollectionsr   collections.abcr   r   r   r   r	   r
   
contextlibr   	functoolsr   typingr   r   r   r   rJ   dask.typingr   distributed.metricsr   distributed.protocolr   r    distributed.protocol.compressionr   distributed.sizeofr   distributed.utilsr   r   	getLoggerr,   re   	addFilterr   r3   Bufferrp   r=   rO   r   rc   rj   rg   Filer   Funcrz   rI   r1   r)   r&   <module>r      s#   " " " " " "  # # # # # # X X X X X X X X X X X X X X X X % % % % % %       6 6 6 6 6 6 6 6 6 6 6 6        - - - - - - F F F F F F F F E E E E E E * * * * * * 7 7 7 7 7 7 7 7		8	$	$   ""#HII J J J   ""#9:: ; ; ;O O O O O* O O O    x   <}0 }0 }0 }0 }0$+c6k* }0 }0 }0@   
	 	 	 	 	y 	 	 		 	 	 	 	) 	 	 		 	 	 	 	9 	 	 	+ + + + + + + +I9 I9 I9 I9 I949S&%'( I9 I9 I9 I9 I9r)   