
    0Fien                        d dl mZ d dlmZ d dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZ e	j        d             Ze	j        d	             Z G d
 dej                  ZdS )    )annotationsN)merge)gen)parse_timedelta)time)TimeoutErrorsyncc              #  p  K   |                      d          V }| j        }|dk    r|                    |           dS |dk    r*|                                 |                                 dS 	 |\  }}}|                    |          # t          $ r }|                    |           Y d}~dS d}~ww xY w)z[
    Coroutine that waits on Dask future, then transmits its outcome to
    cf_future.
    F)raiseitfinished	cancelledN)_resultstatus
set_resultcancelset_running_or_notify_cancelwith_tracebackBaseExceptionset_exception)future	cf_futureresultr   typexctbs          6lib/python3.11/site-packages/distributed/cfexecutor.py_cascade_futurer      s       >>%>0000F]FV$$$$$	;		..00000	)!LCb$$R((( 	) 	) 	)##C(((((((((	)s   0B 
B5B00B5c              #  >   K   | D ]}	 |V  # t           $ r Y w xY wd S N)	Exception)futuresfuts     r   _wait_on_futuresr#   %   sO        	IIII 	 	 	D	 s   
c                  L    e Zd ZdZ eg d          Zd Zd Zd Zd Z	d
dZ
d	S )ClientExecutorzY
    A concurrent.futures Executor that executes tasks on a dask.distributed Client.
    )pureworkers	resourcesallow_other_workersretriesc                    t          |          }|| j        k    s't          dt          || j        z
            z            || _        t          j                    | _        d| _        || _	        d S )Nz+unsupported arguments to ClientExecutor: %sF)
set_allowed_kwargs	TypeErrorsorted_clientweakrefWeakSet_futures	_shutdown_kwargs)selfclientkwargssks       r   __init__zClientExecutor.__init__7   sr    [[T)))=d22334   ))    c                    t          j                    }fd}|                    |           | j        j                            t          |           |S )zK
        Wrap a distributed Future in a concurrent.futures Future.
        c                v    |                                  r!j        dk    r                                 d S d S d S )Nr   )r   r   r   )r   r   s    r   cf_callbackz0ClientExecutor._wrap_future.<locals>.cf_callbackJ   sD    ""$$  +)E)E   )E)Er;   )cfFutureadd_done_callbackr0   loopadd_callbackr   )r6   r   r   r>   s    `  r   _wrap_futurezClientExecutor._wrap_futureC   sb     IKK		  	  	  	  	  	##K000&&	JJJr;   c                    | j         rt          d           | j        j        |g|R i t	          | j        |          }| j                            |           |                     |          S )a/  Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as ``fn(*args, **kwargs)``
        and returns a Future instance representing the execution of the callable.

        Returns
        -------
        A Future representing the given call.
        z*cannot schedule new futures after shutdown)	r4   RuntimeErrorr0   submitr   r5   r3   addrD   )r6   fnargsr8   r   s        r   rG   zClientExecutor.submitS   sx     > 	MKLLL$$RN$NNN%f2M2MNN&!!!  (((r;   c                B    |                     dd           t                    t                      z   d|v r|d= |rt          dt	          |          z              j        j        |g|R i  j        t                     fd} |            S )a&  Returns an iterator equivalent to ``map(fn, *iterables)``.

        Parameters
        ----------
        fn : A callable that will take as many arguments as there are
            passed iterables.
        iterables : One iterable for each parameter to *fn*.
        timeout : The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize : ignored.

        Returns
        -------
        An iterator equivalent to: ``map(fn, *iterables)`` but the calls may
        be evaluated out-of-order.

        Raises
        ------
        concurrent.futures.TimeoutError:
            If the entire result iterator could not be generated before the given
            timeout.
        Exception:
            If ``fn(*args)`` raises for any values.
        timeoutN	chunksizez!unexpected arguments to map(): %sc               3    K   	 D ]v} j                             |            B	 |                     t                      z
            V  F# t          $ r t
          j        w xY w|                                 V  w	 t                    }j                             |           j        	                    |           d S # t                    }j                             |           j        	                    |           w xY wr   )
r3   rH   r   r   r   r?   listupdater0   r   )r   	remainingend_timefsr6   rL   s     r   result_iteratorz+ClientExecutor.map.<locals>.result_iterator   s      /  . .FM%%f---*2"(--4660A"B"BBBBB+ 2 2 2"$/12 %mmoo----. !HH	$$Y///##I..... !HH	$$Y///##I....s#    C &AC A$$C AD	)	popr   r   r.   r/   r0   mapr5   iter)r6   rI   	iterablesr8   rT   rR   rS   rL   s   `    @@@r   rV   zClientExecutor.mapc   s    2 **Y--%g..G'H&  {# 	R?&..PQQQT\b=9===== "XX	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/     r;   Tc                    | j         s[d| _         t          | j                  }|r"t          | j        j        t          |           dS | j                            |           dS dS )a  Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Parameters
        ----------
        wait : If True then shutdown will not return until all running
            futures have finished executing.  If False then all running
            futures are cancelled immediately.
        TN)r4   rO   r3   r	   r0   rB   r#   r   )r6   waitrS   s      r   shutdownzClientExecutor.shutdown   sp     ~ 	(!DNdm$$B (T\&(8"=====##B'''''	( 	(r;   N)T)__name__
__module____qualname____doc__	frozensetr-   r:   rD   rG   rV   r[    r;   r   r%   r%   .   s           iJJJ O
 
 
   ) ) ) :! :! :!x( ( ( ( ( (r;   r%   )
__future__r   concurrent.futuresr!   r?   r1   tlzr   tornador   
dask.utilsr   distributed.metricsr   distributed.utilsr   r	   	coroutiner   r#   Executorr%   ra   r;   r   <module>rk      s   " " " " " "                    & & & & & & $ $ $ $ $ $ 0 0 0 0 0 0 0 0 ) ) )*   C( C( C( C( C(R[ C( C( C( C( C(r;   