
    0Fie              	      D   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
Z
d dlZd dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ  ej        e          Z G d	 d
eej                  Z G d de          Z G d de          Zdd ddddddej        f	d$d#ZdS )%    )annotationsN)Any)	CPU_COUNT)WINDOWS)ProcessInterfaceSpecCluster)nprocesses_nthreads)parse_memory_limitc                  d     e Zd ZU ded<    fdZd	 fdZej        d	d            Zd	 fdZ	 xZ
S )

Subprocessz!asyncio.subprocess.Process | Noneprocessc                    t           rt          d          d | _        t                                                       d S )Nz$Subprocess does not support Windows.)r   RuntimeErrorr   super__init__self	__class__s    =lib/python3.11/site-packages/distributed/deploy/subprocess.pyr   zSubprocess.__init__   s>     	GEFFF    returnNonec                   K   |                                   d {V  t                                                       d {V  d S N)_startr   startr   s    r   r   zSubprocess.start"   sQ      kkmmggmmoor   c                
   K   dS )zStart the subprocessN )r   s    r   r   zSubprocess._start&   s
        r   c                  K   | j         r| j         j        t          j        | j         j                                      d          D ]}|                                 | j                                          | j                                          d {V  d | _         t                      	                                 d {V  d S )NT)	recursive)
r   
returncodepsutilProcesspidchildrenkillcommunicater   close)r   childr   s     r   r(   zSubprocess.close*   s      < 	-DL3;(899BBTBRR  

L,**,,,,,,,,,ggmmoor   r   r   )__name__
__module____qualname____annotations__r   r   abcabstractmethodr   r(   __classcell__r   s   @r   r   r      s         ....          	# # # #         r   r   c                  D     e Zd ZU dZded<   ded<   	 d
d fdZd	 Z xZS )SubprocessSchedulerzA local Dask scheduler running in a dedicated subprocess

    Parameters
    ----------
    scheduler_kwargs:
        Keywords to pass on to the ``Scheduler`` class constructor
    dictscheduler_kwargs
str | NoneaddressNdict | Nonec                Z    |pi | _         t                                                       d S r   )r6   r   r   )r   r6   r   s     r   r   zSubprocessScheduler.__init__@   s.     !1 6Br   c                  K   dddt          j        di | j        d          g}t                              d                    |                     t          j        |dt          j        j	        i d {V | _
        	 | j
        j                                         d {V                                 }|                                st          d	          t                              |                                           d
|v r3|                    d          d                                         | _        nt                              |           d S )Ndaskspec--speczdistributed.Schedulerclsopts stderrTzScheduler failed to startzScheduler atzScheduler at:   )jsondumpsr6   loggerinfojoinasynciocreate_subprocess_exec
subprocessPIPEr   rC   readlinedecodestripr   splitr8   debug)r   cmdlines      r   r   zSubprocessScheduler._startG   s^     J/9RD<Q9RSS 	
 	CHHSMM"""$;
%*
 
 
 
 
 
 
 

	,-6688888888@@BBD::<< @"#>???KK

%%%%%#zz/::1=CCEE	 	Tr   r   )r6   r9   r+   r,   r-   __doc__r.   r   r   r1   r2   s   @r   r4   r4   4   s|            )-            r   r4   c                  ^     e Zd ZU dZded<   ded<   ded<   ded<   	 	 	 dd fdZddZ xZS )SubprocessWorkerae  A local Dask worker running in a dedicated subprocess

    Parameters
    ----------
    scheduler:
        Address of the scheduler
    worker_class:
        Python class to use to create the worker, defaults to 'distributed.Nanny'
    name:
        Name of the worker
    worker_kwargs:
        Keywords to pass on to the ``Worker`` class constructor
    r7   namestr	schedulerworker_classr5   worker_kwargsdistributed.NannyNr9   r   r   c                    || _         || _        || _        t          j        |pi           | _        t                                                       d S r   )rY   r[   r\   copyr]   r   r   )r   r[   r\   rY   r]   r   s        r   r   zSubprocessWorker.__init__u   sN     	"(!Y}':;;r   c           	        K   dd| j         dt          j        | j        i | j        d          g}t
                              d                    |                     t          j	        |  d {V | _
        d S )Nr<   r=   r>   r?   rB   )r[   rE   rF   r\   r]   rG   rH   rI   rJ   rK   r   )r   rS   s     r   r   zSubprocessWorker._start   s      NJt0:PT=O:PQQRR
 	CHHSMM"""$;SAAAAAAAr   )r^   NN)
r[   rZ   r\   rZ   rY   r7   r]   r9   r   r   r*   rU   r2   s   @r   rX   rX   a   s           NNN
 0%)      	B 	B 	B 	B 	B 	B 	B 	Br   rX   z:8787r^   hostr7   scheduler_portintr6   r9   dashboard_addressrZ   r\   	n_workers
int | Nonethreads_per_workerr]   silence_logskwargsr   r   r   c	           	     `   t           rt          d          | sd} |pi }|pi }||t                      \  }}||t          dt          |z            }|r9|7t          dt          t          j        t          |z                                }|rd|vrt          dd|t                    |d<   |J t          j        | ||du|d|          }t          j        | ||d	|          }t          d
|id}
t          ||ddfdt          |          D             }t          d||
d|d|	S )aL  Create a scheduler and workers that run in dedicated subprocesses

    This creates a "cluster" of a scheduler and workers running in dedicated subprocesses.

    .. warning::

       This function is experimental

    Parameters
    ----------
    host:
        Host address on which the scheduler will listen, defaults to localhost
    scheduler_port:
        Port fo the scheduler, defaults to 0 to choose a random port
    scheduler_kwargs:
            Keywords to pass on to scheduler
    dashboard_address:
        Address on which to listen for the Bokeh diagnostics server like
        'localhost:8787' or '0.0.0.0:8787', defaults to ':8787'

        Set to ``None`` to disable the dashboard.
        Use ':0' for a random port.
    worker_class:
        Worker class to instantiate workers from, defaults to 'distributed.Nanny'
    n_workers:
        Number of workers to start
    threads:
        Number of threads per each worker
    worker_kwargs:
        Keywords to pass on to the ``Worker`` class constructor
    silence_logs:
        Level of logs to print out to stdout, defaults to ``logging.WARN``

        Use a falsy value like False or None to disable log silencing.

    Examples
    --------
    >>> cluster = SubprocessCluster()  # Create a subprocess cluster  #doctest: +SKIP
    >>> cluster  # doctest: +SKIP
    SubprocessCluster(SubprocessCluster, 'tcp://127.0.0.1:61207', workers=5, threads=10, memory=16.00 GiB)

    >>> c = Client(cluster)  # connect to subprocess cluster  # doctest: +SKIP

    Scale the cluster to three workers

    >>> cluster.scale(3)  # doctest: +SKIP
    z+SubprocessCluster does not support Windows.z	127.0.0.1NrD   memory_limitauto)rG   )rb   port	dashboardre   )rb   nthreadsri   r6   )r@   options)r\   r]   c                    i | ]}|S r   r   ).0iworkers     r   
<dictcomp>z%SubprocessCluster.<locals>.<dictcomp>   s    333Qq&333r   SubprocessCluster)workersr[   ru   rY   ri   r   )r   r   r	   maxr   rd   mathceilr
   rG   toolzmerger4   rX   ranger   )rb   rc   r6   re   r\   rf   rh   r]   ri   rj   r[   rx   ru   s               @r   rw   rw      s   v  JHIII !'RM'-2/7(;(=(=%	%/;9(::;;	 K'/ C	)i2G(H(H$I$IJJ 
^=88(:Ay)
 )
 )
n%    {"*$6!2		
 	
 	  K*(	
 	

 	 M # 0
 I  $0=QQ F 4333%	"2"2333G  !    r   )rb   r7   rc   rd   r6   r9   re   rZ   r\   rZ   rf   rg   rh   rg   r]   r9   ri   rd   rj   r   r   r   ) 
__future__r   r/   rJ   r`   rE   loggingrz   typingr   r"   r|   dask.systemr   distributed.compatibilityr   distributed.deploy.specr   r   distributed.deploy.utilsr	   distributed.worker_memoryr
   	getLoggerr+   rG   ABCr   r4   rX   WARNrw   r   r   r   <module>r      s   " " " " " " 



              ! ! ! ! ! ! - - - - - - A A A A A A A A 8 8 8 8 8 8 8 8 8 8 8 8		8	$	$    !37   8* * * * ** * * *Z*B *B *B *B *Bz *B *B *B\ $($+ %)!%t t t t t t tr   