
    0Fie                        d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	  ej
        e          Z G d de          Zd Zd	 Zd
 ZdS )    )annotationsN)coerce_to_addressconnect)SchedulerPlugin)dumps_functionc                       e Zd ZdZddZd ZdS )EventStreamz Maintain a copy of worker eventsNc                T    d| _         g | _        |r|                    |            d S d S )Nr	   )namebuffer
add_plugin)self	schedulers     Clib/python3.11/site-packages/distributed/diagnostics/eventstream.py__init__zEventStream.__init__   s<    !	 	'  &&&&&	' 	'    c                p    |dk    r-||d<   |dk    s|dk    r| j                             |           d S d S d S )N
processingkeymemoryerred)r   append)r   r   startfinishargskwargss         r   
transitionzEventStream.transition   sU    L  F5M!!Vw%6%6""6***** ! %6%6r   N)__name__
__module____qualname____doc__r   r    r   r   r	   r	      s=        **' ' ' '+ + + + +r   r	   c                $    g |j         c|_         }|S r   )r   )r   esr   s      r   swap_bufferr&      s    BIBIvMr   c                <    |                      |j                   d S )N)r   )remove_pluginr   )r   r%   s     r   teardownr)   !   s!    )))))r   c           	        K   t          |           } t          |            d{V }|                    dt          t                    t          t
                    |t          t                    d           d{V  |S )a  Open a TCP connection to scheduler, receive batched task messages

    The messages coming back are lists of dicts.  Each dict is of the following
    form::

        {'key': 'mykey', 'worker': 'host:port', 'status': status,
         'compute_start': time(), 'compute_stop': time(),
         'transfer_start': time(), 'transfer_stop': time(),
         'disk_load_start': time(), 'disk_load_stop': time(),
         'other': 'junk'}

    Where ``status`` is either 'OK', or 'error'

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = await eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(await read(stream))  # doctest: +SKIP
    [{'key': 'x', 'status': 'OK', 'worker': '192.168.0.1:54684', ...},
     {'key': 'y', 'status': 'error', 'worker': '192.168.0.1:54684', ...}]
    Nfeed)opsetupfunctionintervalr)   )r   r   writer   r	   r&   r)   )addressr/   comms      r   eventstreamr3   %   s      4  ((G!!!!!!!!D
**#K00&{33 &x00	
 	
         Kr   )
__future__r   loggingdistributed.corer   r   distributed.diagnostics.pluginr   distributed.workerr   	getLoggerr   loggerr	   r&   r)   r3   r#   r   r   <module>r;      s    " " " " " "  7 7 7 7 7 7 7 7 : : : : : : - - - - - -		8	$	$+ + + + +/ + + +   
* * *% % % % %r   