
    0FieW2                    (   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZmZ erd d	lmZ  ej        e          Zd
 Zd Zd Z d Z! G d d          Z" G d d          Z#d Z$dS )    )annotationsN)Callable)Queue)TYPE_CHECKING)Future)IOLoop)get_mp_contextwait_for)Selfc                    	  | j         |g|R   dS # t          $ r.}t          j        dt	          |                    s Y d}~dS d}~ww xY w)zQ
    Helper to silence "IOLoop is closing" exception on IOLoop.add_callback.
    zIOLoop is clos(ed|ing)N)add_callbackRuntimeErrorresearchstr)loopfuncargsexcs       3lib/python3.11/site-packages/distributed/process.py_loop_add_callbackr      s    $&&&&&&&   y13s88<< 		 	 	 	 	 	s    
A
#AA
c                \    |                                  s|                     |           d S d S N)	cancelled
set_result)futurevalues     r   #_future_set_result_unless_cancelledr   )   s:     !%     ! !    c                    |                                  s|                     |           d S t                              d|           d S )Nz$Exception after Future was cancelled)exc_info)r   set_exceptionloggererror)r   r   s     r   &_future_set_exception_unless_cancelledr%   .   sN     KS!!!!!;cJJJJJr   c                    	  ||i |}t          | t          ||           d S # t          $ r"}t          | t          ||           Y d }~d S d }~ww xY wr   )r   r   	Exceptionr%   )r   r   r   r   kwargsresr   s          r   _call_and_set_futurer*   5   s    SdD#F## 	4!DfcRRRRR  V V V 	4!GQTUUUUUUUUUVs   # 
AA

Ac                      e Zd ZdZdZdZdS )_ProcessStateFN)__name__
__module____qualname__is_alivepidexitcode r   r   r,   r,   @   s        H
CHHHr   r,   c                  D   e Zd ZU dZded<   d%dZd Zd Zd	 Zd&dZ	e
d             Ze
d             Ze
d'd            Ze
d             Zd Zd(dZd(dZd)dZd Zd*dZd Zed              Zed!             Zed"             Zed#             Zej        d$             ZdS )+AsyncProcessz
    A coroutine-compatible multiprocessing.Process-alike.
    All normally blocking methods are wrapped in Tornado coroutines.
    multiprocessing.Process_processNr3   c           
        |pi }t          |          st          dt          |                    t                      | _        |pt          j        d          | _        t                      	                    d          \  }| _
        t                                          | j        |||||| j
        t          j        j        f          | _        | j        j        | _        t'          j        | t*          | j                  | _        t/                      | _        t3                      | _        d | _        d| _        |                                  d S )Nz#`target` needs to be callable, not F)instance)duplextargetnamer   )callable	TypeErrortyper,   _stater   current_loopr	   Pipe_keep_child_aliveProcess_rundaskconfigglobal_configr7   r=   _nameweakreffinalize_asyncprocess_finalizer_proc_finalizerPyQueue_watch_qr   _exit_future_exit_callback_closed_start_threads)selfr   r<   r=   r   r(   parent_alive_pipes          r   __init__zAsyncProcess.__init__N   s<   2 	TR$v,,RRSSS#oo;V^U;;;
 5C4D4D4I4IQV4I4W4W141&((009!&) 1 
 
 ]'
&/)4= 
  
  		"HH"r   c                2    d| j         j         d| j         dS )N< >)	__class__r-   rK   rV   s    r   __repr__zAsyncProcess.__repr__x   s"    :4>*::TZ::::r   c                2    | j         rt          d          d S )Nz(invalid operation on closed AsyncProcess)rT   
ValueErrorr^   s    r   _check_closedzAsyncProcess._check_closed{   s(    < 	IGHHH	I 	Ir   c           
     d   t          j        | j        d| j        z  t	          j        |           | j        | j        | j        | j	        | j
        f          | _        d| j        _        | j                                         d }t	          j        | || j	                  | _        d| j        _        d S )Nz#AsyncProcess %s watch message queuer;   Tc                4    |                      ddi           d S )Nopstop)
put_nowaitqs    r   stop_threadz0AsyncProcess._start_threads.<locals>.stop_thread   s    LL$(((((r   rh   F)	threadingThread_watch_message_queuer=   rL   refr7   rC   rA   rQ   rR   _watch_message_threaddaemonstartrM   _thread_finalizeratexit)rV   rj   s     r   rU   zAsyncProcess._start_threads   s    %.%5,6BD!!
!&
 &
 &
" -1")"((***	) 	) 	)
 ")!1$t}!U!U!U(-%%%r   r2   intreturnNonec                    d | _         | j        |                     |            | j                            |           d S r   )r7   rS   rR   r   )rV   r2   s     r   _on_exitzAsyncProcess._on_exit   sC    *%%%$$X.....r   c                r    fd}t          j        |          }d|_        |                                 dS )zP
        Immediately exit the process when parent_alive_pipe is closed.
        c                     	                                    t          d          # t          $ r t          j        d           Y d S w xY w)Nz'unexpected state: should be unreachable)recvr   EOFErroros_exit)rW   s   r   monitor_parentz@AsyncProcess._immediate_exit_when_closed.<locals>.monitor_parent   s`    N "&&((( ##LMMM  
 
 
 
s   & AA)r<   TN)rk   rl   rp   rq   )clsrW   r   ts    `  r   _immediate_exit_when_closedz(AsyncProcess._immediate_exit_when_closed   sN    	N 	N 	N 	N 	N, N333						r   c                    |                                  |                     |           dt          j                    _        t
          j                            t
          j        j        |d            ||i | d S )N
MainThreadold)priority)	closer   rk   current_threadr=   rH   rI   updaterJ   )r   r<   r   r(   rW   rE   inherit_configs          r   rG   zAsyncProcess._run   s{     	!!! 	''(9:::*6	 ""'4;4nuUUUr   processc                  
 t                                              j        

fd}	                                 }t                              d d|           |d         }	|	dk    rt          ||d         |           nX|	dk    rt          ||d         j                   n5|	d	k    rt          ||d         j                   n|	d
k    rd S J |            )Nc                 &                                     t          j        t          j        dz  f          } d| _        |                                   d_        j        _        t          	                    d dj                   d S )Nz"AsyncProcess %s watch process joinr;   T[z] created process with pid )
rq   rk   rl   r5   _watch_processrp   r0   r1   r#   debug)threadr=   r   ri   rselfrefstates    r   _startz1AsyncProcess._watch_message_queue.<locals>._start   s    MMOOO%#29D@wq1  F
 !FMLLNNN!ENEILLHQHH59HHIIIIIr   Tr   z] got message re   rq   r   	terminatekillrf   )reprr=   getr#   r   r*   r   r   )r   r   r   r   r   ri   exit_futurer   msgre   r=   r   s    `` ``    @@r   rm   z!AsyncProcess._watch_message_queue   s-    OOwyy~	J 	J 	J 	J 	J 	J 	J 	J 	J 	J	%%''CLL5Q55c55666TBW}}$T3x=&AAAA{""$T3x=':KLLLLv$T3x=',GGGGv#!	r   c                   t           |                      }|                                 |j        x}}|d}d|_        ||_         |            }	 |t	          |j        |j        |           d }n# d }w xY w|#t                              d||j	                   d S t          
                    d||j	        |           d S )N   FzE[%s] process %r exit status was already read will report exitcode 255z#[%s] process %r exited with code %r)r   joinr2   r0   r   rC   rx   r#   warningr1   r   )	r   r   r   r   ri   r   r2   original_exit_coderV   s	            r   r   zAsyncProcess._watch_process   s    OO(/(88% H! wyy	"4:t}hGGGDD4DKKKK %NNW	     LL>59hWWWWWs   A2 2A6c                    |                                   t                      }| j                            d|d           |S )zQ
        Start the child process.

        This method returns a future.
        rq   re   r   rb   r   rQ   rg   rV   futs     r   rq   zAsyncProcess.start  sB     	hh  3!?!?@@@
r   asyncio.Future[None]c                    |                                   t                      }| j                            d|d           |S )zTerminate the child process.

        This method returns a future.

        See also
        --------
        multiprocessing.Process.terminate
        r   r   r   r   s     r   r   zAsyncProcess.terminate!  sB     	"HH  s!C!CDDD
r   c                    |                                   t                      }| j                            d|d           |S )zSend SIGKILL to the child process.
        On Windows, this is the same as terminate().

        This method returns a future.

        See also
        --------
        multiprocessing.Process.kill
        r   r   r   r   s     r   r   zAsyncProcess.kill/  sB     	"HH  #!>!>???
r   c                   K   |                                   | j        j        
J d            | j        j        dS t	          t          j        | j                  |           d{V  dS )z_
        Wait for the child process to exit.

        This method returns a coroutine.
        Nzcan only join a started process)rb   rA   r1   r2   r
   asyncioshieldrR   )rV   timeouts     r   r   zAsyncProcess.join>  sz       	{**,M***;+F w~d&788'BBBBBBBBBBBr   c                \    | j         s$|                                  d| _        d| _         dS dS )z
        Stop helper thread and release resources.  This method returns
        immediately and does not ensure the child process has exited.
        NT)rT   rr   r7   r^   s    r   r   zAsyncProcess.closeL  s<    
 | 	 ""$$$ DMDLLL	  	 r   rV   r   r   Callable[[Self], None]c                    t          j        |          r
J d            t          |          s
J d            | j        j        
J d            || _        dS )z
        Set a function to be called by the event loop when the process exits.
        The function is called with the AsyncProcess as sole argument.

        The function may not be a coroutine function.
        z-exit callback may not be a coroutine functionz exit callback should be callableNz5cannot set exit callback when process already started)inspectiscoroutinefunctionr>   rA   r1   rS   )rV   r   s     r   set_exit_callbackzAsyncProcess.set_exit_callbackV  sy     .
 
 	; 	;:	; 	; 	; ~~AAAAAAKO##B $##"r   c                    | j         j        S r   )rA   r0   r^   s    r   r0   zAsyncProcess.is_aliveg  s    {##r   c                    | j         j        S r   )rA   r1   r^   s    r   r1   zAsyncProcess.pidj  s    {r   c                    | j         j        S r   )rA   r2   r^   s    r   r2   zAsyncProcess.exitcoden  s    {##r   c                    | j         S r   )rK   r^   s    r   r=   zAsyncProcess.namer  s
    zr   c                    | j         j        S r   r7   rp   r^   s    r   rp   zAsyncProcess.daemonv  s    }##r   c                    || j         _        d S r   r   )rV   r   s     r   rp   zAsyncProcess.daemonz  s    $r   )NNNr3   N)r2   rt   ru   rv   )r   r6   )ru   r   r   )rV   r   r   r   ru   rv   )r-   r.   r/   __doc____annotations__rX   r_   rb   rU   rx   classmethodr   rG   rm   r   rq   r   r   r   r   r   r0   propertyr1   r2   r=   rp   setterr3   r   r   r5   r5   F   s         
 &%%%( ( ( (T; ; ;I I I. . .0/ / / /   [>     [  ( ( ( [(T X X [X<	 	 	      C C C C     # # # #"$ $ $   X $ $ X$   X $ $ X$ ]% % ]% % %r   r5   c                    |                                  rE	 t                              d|             |                                  d S # t          $ r Y d S w xY wd S )Nzreaping stray process )r0   r#   infor   OSError)procs    r   rN   rN     st    }} 	KK777888NN 	 	 	DD		 s   1A	 	
AA)%
__future__r   r   r   loggingmultiprocessingr~   r   rk   rL   collections.abcr   queuer   rP   typingr   tornado.concurrentr   tornado.ioloopr   rH   distributed.utilsr	   r
   typing_extensionsr   	getLoggerr-   r#   r   r   r%   r*   r,   r5   rN   r3   r   r   <module>r      s   " " " " " "        				 				      $ $ $ $ $ $ " " " " " "             % % % % % % ! ! ! ! ! !  6 6 6 6 6 6 6 6 '&&&&&& 
	8	$	$  ! ! !
K K KS S S       v% v% v% v% v% v% v% v%r	    r   