
    0FieF                        d dl mZ d dlZd dlZd dlZd dlZd dlmZmZm	Z	 d dlm
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ  G d d          Z G d de          ZdS )    )annotationsN)Callable	GeneratorIterable)contextmanager)Any)concat)ShardsBuffer)ResourceLimiterpickle_bytelist)Deadline
log_errorsc                      e Zd ZU ded<   ded<   ded<   dd	ZdddZddZdddZddZe	dd            Z
e	dd            ZdS )ReadWriteLockzthreading.Condition
_conditionint_n_readsbool_write_pendingreturnNonec                    t          j        t          j                              | _        d| _        d| _        d| _        d S )Nr   F)	threading	ConditionLockr   r   r   _write_activeselfs    9lib/python3.11/site-packages/distributed/shuffle/_disk.py__init__zReadWriteLock.__init__   s9    #-in.>.>??#"    timeoutfloatc                    t          j        |dk    r|nd           } j        5   j                             fd|j                  }|du r	 d d d            dS d _         j                             fd|j                  }|du r.d _         j                                         	 d d d            dS d _        	 d d d            dS # 1 swxY w Y   d S )Nr   c                      j          S Nr   r   s   r    <lambda>z-ReadWriteLock.acquire_write.<locals>.<lambda>"       D// r"   r$   FTc                      j         dk    S )Nr   )r   r   s   r    r*   z-ReadWriteLock.acquire_write.<locals>.<lambda>)   s    * r"   )r   afterr   wait_for	remainingr   
notify_allr   r   r$   deadlineresults   `   r    acquire_writezReadWriteLock.acquire_write   s   >W\\''tDD_ 	 	_--////9K .  F 	 	 	 	 	 	 	 	 #'D_--****H4F .  F &+#**,,,	 	 	 	 	 	 	 	  "&D#	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   *CAC9CCCc                    | j         5  | j        du rt          d          d| _        d| _        | j                                          d d d            d S # 1 swxY w Y   d S )NFz#Tried releasing unlocked write lock)r   r   RuntimeErrorr   r1   r   s    r    release_writezReadWriteLock.release_write3   s    _ 	) 	)!U**"#HIII"'D!&DO&&(((	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	)s   A AAAc                    t          j        |dk    r|nd           } j        5   j                             fd|j                  }|du r	 d d d            dS  xj        dz  c_        	 d d d            dS # 1 swxY w Y   d S )Nr   c                      j          S r(   r)   r   s   r    r*   z,ReadWriteLock.acquire_read.<locals>.<lambda>?   r+   r"   r,   F   T)r   r.   r   r/   r0   r   r2   s   `   r    acquire_readzReadWriteLock.acquire_read;   s   >W\\''tDD_ 	 	_--////9K .  F 	 	 	 	 	 	 	 	 MMQMM	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   *A:A::A>A>c                    | j         5  | j        dk    rt          d          | xj        dz  c_        | j        dk    r| j                                          d d d            d S # 1 swxY w Y   d S )Nr   z"Tired releasing unlocked read lockr;   )r   r   r7   r1   r   s    r    release_readzReadWriteLock.release_readF   s    _ 	- 	-}!!"#GHHHMMQMM}!!**,,,	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	-s   AA$$A(+A(Generator[None, None, None]c              #     K   |                                   	 d V  |                                  d S # |                                  w xY wr(   )r5   r8   r   s    r    writezReadWriteLock.writeN   sV      	!EEE     D    	   2 Ac              #     K   |                                   	 d V  |                                  d S # |                                  w xY wr(   )r<   r>   r   s    r    readzReadWriteLock.readV   sV      	 EEEDrB   Nr   r   )r#   )r$   r%   r   r   )r   r?   )__name__
__module____qualname____annotations__r!   r5   r8   r<   r>   r   rA   rD    r"   r    r   r      s         ####MMM# # # #    *) ) ) )	 	 	 	 	- - - - ! ! ! ^!       ^     r"   r   c                  @     e Zd ZdZd fdZddZddZd fdZ xZS )DiskShardsBuffera  Accept, buffer, and write many small objects to many files

    This takes in lots of small objects, writes them to a local directory, and
    then reads them back when all writes are complete.  It buffers these
    objects in memory so that it can optimize disk access for larger writes.

    **State**

    -   shards: dict[str, list[bytes]]

        This is our in-memory buffer of data waiting to be written to files.

    -   sizes: dict[str, int]

        The size of each list of shards.  We find the largest and write data from that buffer

    Parameters
    ----------
    directory : str or pathlib.Path
        Where to write and read data.  Ideally points to fast disk.
    memory_limiter : ResourceLimiter
        Limiter for in-memory buffering (at most this much data)
        before writes to disk occur. If the incoming data that has yet
        to be processed exceeds this limit, then the buffer will block
        until below the threshold. See :meth:`.write` for the
        implementation of this scheme.
    	directorystr | pathlib.PathrD   )Callable[[pathlib.Path], tuple[Any, int]]memory_limiterr   c                    t                                          |d           t          j        |          | _        | j                            d           d| _        || _        t                      | _	        d S )Nr;   )rP   concurrency_limitT)exist_okF)
superr!   pathlibPathrM   mkdir_closed_readr   _directory_lock)r   rM   rD   rP   	__class__s       r    r!   zDiskShardsBuffer.__init__|   sx     	) 	 	
 	
 	

 !i00d+++
,r"   idstrshards	list[Any]r   r   c           	     T  K   t                      5  |                     d          5  | j                                        5  | j        rt          d          t          |d         t                    r|}nt          d |D                       }t          | j
        t          |          z  d          5 }|                    |           ddd           n# 1 swxY w Y   ddd           n# 1 swxY w Y   ddd           n# 1 swxY w Y   ddd           dS # 1 swxY w Y   dS )a'  Write one buffer to file

        This function was built to offload the disk IO, but since then we've
        decided to keep this within the event loop (disk bandwidth should be
        prioritized, and writes are typically small enough to not be a big
        deal).

        Most of the logic here is about possibly going back to a separate
        thread, or about diagnostics.  If things don't change much in the
        future then we should consider simplifying this considerably and
        dropping the write into communicate above.
        rA   Already closedr   c              3  4   K   | ]}t          |          V  d S r(   r   ).0shards     r    	<genexpr>z,DiskShardsBuffer._process.<locals>.<genexpr>   s*      'S'S5(>(>'S'S'S'S'S'Sr"   ab)modeN)r   timerZ   rD   rX   r7   
isinstancebytesr	   openrM   r]   
writelines)r   r\   r^   framesfs        r    _processzDiskShardsBuffer._process   s$      \\ 	- 	-7## - - )..00 - -| =*+;<<< "&)U33 T!' "('S'SF'S'S'S!S!Sdns2ww6TBBB -aV,,,- - - - - - - - - - - - - - -- - - - - - - - - - - - - - -- - - - - - - - - - - - - - -	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	- 	-s}   DDA4C.5C	C.CC.CC."D.C22D5C26D9DD			DD		DD!$D!r   c                @   |                                   | j        st          d          	 |                     d          5  | j                                        5  | j        rt          d          |                     | j        t          |          z  
                                          \  }}ddd           n# 1 swxY w Y   ddd           n# 1 swxY w Y   n# t          $ r t          |          w xY w|r| xj        |z  c_        |S t          |          )z%Read a complete file back into memoryz$Tried to read from file before done.rD   ra   N)raise_on_exception_inputs_doner7   rh   rZ   rD   rX   rY   rM   r]   resolveFileNotFoundErrorKeyError
bytes_read)r   r\   datasizes       r    rD   zDiskShardsBuffer.read   s   !!!  	GEFFF	6"" R R)..00 R R| =*+;<<<!%T^c"gg-E,N,N,P,P!Q!QJD$R R R R R R R R R R R R R R RR R R R R R R R R R R R R R R
 ! 	 	 	2,,	  	OOt#OOK2,,sT   C  CAB=1C=C	CC	CC  CC  CC   C:c                ^  K   t                                                       d {V  | j                                        5  d| _        t          j        t                    5  t          j	        | j
                   d d d            n# 1 swxY w Y   d d d            d S # 1 swxY w Y   d S )NT)rT   closerZ   rA   rX   
contextlibsuppressrt   shutilrmtreerM   )r   r[   s    r    rz   zDiskShardsBuffer.close   s$     ggmmoo!'')) 	. 	.DL$%677 . .dn---. . . . . . . . . . . . . . .	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.s6   !B"$B
>B"
B	B"B	B""B&)B&)rM   rN   rD   rO   rP   r   )r\   r]   r^   r_   r   r   )r\   r]   r   r   rE   )	rF   rG   rH   __doc__r!   ro   rD   rz   __classcell__)r[   s   @r    rL   rL   _   s         8/ / / / / /"!- !- !- !-F   *. . . . . . . . . .r"   rL   )
__future__r   r{   rU   r}   r   collections.abcr   r   r   r   typingr   toolzr	   distributed.shuffle._bufferr
   distributed.shuffle._limiterr   distributed.shuffle._pickler   distributed.utilsr   r   r   rL   rJ   r"   r    <module>r      sR   " " " " " "           9 9 9 9 9 9 9 9 9 9 % % % % % %             4 4 4 4 4 4 8 8 8 8 8 8 7 7 7 7 7 7 2 2 2 2 2 2 2 2I  I  I  I  I  I  I  I Xk. k. k. k. k.| k. k. k. k. k.r"   