o
    tf0                     @   s$  d Z ddlZddlZddlZddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZmZmZ ddlZejrCddlmZmZmZ edZg d	ZG d
d deZG dd deZdededeejf ddfddZG dd dee ZG dd dee ZG dd deZ G dd deZ!dS )a  Asynchronous queues for coroutines. These classes are very similar
to those provided in the standard library's `asyncio package
<https://docs.python.org/3/library/asyncio-queue.html>`_.

.. warning::

   Unlike the standard library's `queue` module, the classes defined here
   are *not* thread-safe. To use these queues from another thread,
   use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
   before calling any queue methods.

    N)genioloop)Future"future_set_result_unless_cancelled)Event)UnionTypeVarGeneric	AwaitableOptional)DequeTupleAny_T)QueuePriorityQueue	LifoQueue	QueueFull
QueueEmptyc                   @      e Zd ZdZdS )r   z:Raised by `.Queue.get_nowait` when the queue has no items.N__name__
__module____qualname____doc__ r   r   W/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/tornado/queues.pyr   /       r   c                   @   r   )r   zBRaised by `.Queue.put_nowait` when a queue is at its maximum size.Nr   r   r   r   r   r   5   r   r   futuretimeoutreturnc                    sD   |r d fdd}t j || fdd d S d S )Nr    c                      s      s t  d S d S N)doneset_exceptionr   TimeoutErrorr   )r   r   r   
on_timeout@   s   z _set_timeout.<locals>.on_timeoutc                    s
     S r!   )Zremove_timeout)_)io_looptimeout_handler   r   <lambda>F   s   
 z_set_timeout.<locals>.<lambda>r    N)r   ZIOLoopcurrentZadd_timeoutadd_done_callback)r   r   r%   r   )r   r'   r(   r   _set_timeout;   s   
r-   c                   @   s(   e Zd Zd	ddZdee fddZdS )
_QueueIteratorq	Queue[_T]r    Nc                 C   s
   || _ d S r!   )r/   )selfr/   r   r   r   __init__J      
z_QueueIterator.__init__c                 C   
   | j  S r!   )r/   getr1   r   r   r   	__anext__M   r3   z_QueueIterator.__anext__)r/   r0   r    N)r   r   r   r2   r
   r   r7   r   r   r   r   r.   I   s    
r.   c                   @   s  e Zd ZdZdZd1deddfddZedefdd	Zdefd
dZ	de
fddZde
fddZ	d2dedeeeejf  ddfddZdeddfddZ	d2deeeejf  dee fddZdefddZd3ddZ	d2deeeejf  ded fddZdee fdd Zd3d!d"Zdefd#d$Zdeddfd%d&Zdeddfd'd(Zd3d)d*Z de!fd+d,Z"de!fd-d.Z#de!fd/d0Z$dS )4r   a  Coordinate producer and consumer coroutines.

    If maxsize is 0 (the default) the queue size is unbounded.

    .. testcode::

        import asyncio
        from tornado.ioloop import IOLoop
        from tornado.queues import Queue

        q = Queue(maxsize=2)

        async def consumer():
            async for item in q:
                try:
                    print('Doing work on %s' % item)
                    await asyncio.sleep(0.01)
                finally:
                    q.task_done()

        async def producer():
            for item in range(5):
                await q.put(item)
                print('Put %s' % item)

        async def main():
            # Start consumer without waiting (since it never finishes).
            IOLoop.current().spawn_callback(consumer)
            await producer()     # Wait for producer to put all tasks.
            await q.join()       # Wait for consumer to finish all tasks.
            print('Done')

        asyncio.run(main())

    .. testoutput::

        Put 0
        Put 1
        Doing work on 0
        Put 2
        Doing work on 1
        Put 3
        Doing work on 2
        Put 4
        Doing work on 3
        Doing work on 4
        Done


    In versions of Python without native coroutines (before 3.5),
    ``consumer()`` could be written as::

        @gen.coroutine
        def consumer():
            while True:
                item = yield q.get()
                try:
                    print('Doing work on %s' % item)
                    yield gen.sleep(0.01)
                finally:
                    q.task_done()

    .. versionchanged:: 4.3
       Added ``async for`` support in Python 3.5.

    Nr   maxsizer    c                 C   sb   |d u rt d|dk rtd|| _|   tg | _tg | _d| _t	 | _
| j
  d S )Nzmaxsize can't be Noner   zmaxsize can't be negative)	TypeError
ValueError_maxsize_initcollectionsdeque_getters_putters_unfinished_tasksr   	_finishedset)r1   r8   r   r   r   r2      s   zQueue.__init__c                 C   s   | j S )z%Number of items allowed in the queue.)r;   r6   r   r   r   r8      s   zQueue.maxsizec                 C   s
   t | jS )zNumber of items in the queue.)len_queuer6   r   r   r   qsize   s   
zQueue.qsizec                 C   s   | j  S r!   rE   r6   r   r   r   empty      zQueue.emptyc                 C   s   | j dkrdS |  | j kS )Nr   F)r8   rF   r6   r   r   r   full   s   
z
Queue.fullitemr   zFuture[None]c                 C   sR   t  }z| | W n ty!   | j||f t|| Y |S w |d |S )a  Put an item into the queue, perhaps waiting until there is room.

        Returns a Future, which raises `tornado.util.TimeoutError` after a
        timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.
        N)r   
put_nowaitr   r@   appendr-   
set_result)r1   rK   r   r   r   r   r   put   s   
z	Queue.putc                 C   s^   |    | jr"|  sJ d| j }| | t||   dS |  r(t| | dS )z{Put an item into the queue without blocking.

        If no free slot is immediately available, raise `QueueFull`.
        z)queue non-empty, why are getters waiting?N)	_consume_expiredr?   rH   popleft_Queue__put_internalr   _getrJ   r   )r1   rK   getterr   r   r   rL      s   

zQueue.put_nowaitc                 C   sF   t  }z
||   W |S  ty"   | j| t|| Y |S w )a.  Remove and return an item from the queue.

        Returns an awaitable which resolves once an item is available, or raises
        `tornado.util.TimeoutError` after a timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.

        .. note::

           The ``timeout`` argument of this method differs from that
           of the standard library's `queue.Queue.get`. That method
           interprets numeric values as relative timeouts; this one
           interprets them as absolute deadlines and requires
           ``timedelta`` objects for relative timeouts (consistent
           with other timeouts in Tornado).

        )r   rN   
get_nowaitr   r?   rM   r-   )r1   r   r   r   r   r   r5      s   z	Queue.getc                 C   s\   |    | jr$|  sJ d| j \}}| | t|d |  S |  r,|  S t)zRemove and return an item from the queue without blocking.

        Return an item if one is immediately available, else raise
        `QueueEmpty`.
        z(queue not full, why are putters waiting?N)	rP   r@   rJ   rQ   rR   r   rS   rF   r   )r1   rK   putterr   r   r   rU      s   

zQueue.get_nowaitc                 C   s<   | j dkr	td|  j d8  _ | j dkr| j  dS dS )a  Indicate that a formerly enqueued task is complete.

        Used by queue consumers. For each `.get` used to fetch a task, a
        subsequent call to `.task_done` tells the queue that the processing
        on the task is complete.

        If a `.join` is blocking, it resumes when all items have been
        processed; that is, when every `.put` is matched by a `.task_done`.

        Raises `ValueError` if called more times than `.put`.
        r   z!task_done() called too many times   N)rA   r:   rB   rC   r6   r   r   r   	task_done  s   

zQueue.task_donec                 C   s   | j |S )zBlock until all items in the queue are processed.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )rB   wait)r1   r   r   r   r   join$  s   z
Queue.joinc                 C   s   t | S r!   )r.   r6   r   r   r   	__aiter__.  rI   zQueue.__aiter__c                 C   s   t  | _d S r!   )r=   r>   rE   r6   r   r   r   r<   2  s   zQueue._initc                 C   r4   r!   )rE   rQ   r6   r   r   r   rS   5  r3   z
Queue._getc                 C      | j | d S r!   rE   rM   r1   rK   r   r   r   _put8     z
Queue._putc                 C   s&   |  j d7  _ | j  | | d S )NrW   )rA   rB   clearr_   r^   r   r   r   Z__put_internal=  s   
zQueue.__put_internalc                 C   s|   | j r| j d d  r| j   | j r| j d d  s| jr8| jd  r<| j  | jr:| jd  s'd S d S d S d S )Nr   rW   )r@   r"   rQ   r?   r6   r   r   r   rP   B  s   

$zQueue._consume_expiredc                 C   s    dt | jtt| |  f S )Nz<%s at %s %s>)typer   hexid_formatr6   r   r   r   __repr__J  s    zQueue.__repr__c                 C   s   dt | j|  f S )Nz<%s %s>)rb   r   re   r6   r   r   r   __str__M  s   zQueue.__str__c                 C   sn   d| j f }t| dd r|d| j 7 }| jr|dt| j 7 }| jr+|dt| j 7 }| jr5|d| j 7 }|S )Nz
maxsize=%rrE   z	 queue=%rz getters[%s]z putters[%s]z	 tasks=%s)r8   getattrrE   r?   rD   r@   rA   )r1   resultr   r   r   re   P  s   zQueue._format)r   r!   r*   )%r   r   r   r   rE   intr2   propertyr8   rF   boolrH   rJ   r   r   r   floatdatetime	timedeltarO   rL   r
   r5   rU   rX   rZ   r.   r[   r<   rS   r_   rR   rP   strrf   rg   re   r   r   r   r   r   Q   sR    E






r   c                   @   :   e Zd ZdZdddZdeddfddZdefd	d
ZdS )r   a  A `.Queue` that retrieves entries in priority order, lowest first.

    Entries are typically tuples like ``(priority number, data)``.

    .. testcode::

        import asyncio
        from tornado.queues import PriorityQueue

        async def main():
            q = PriorityQueue()
            q.put((1, 'medium-priority item'))
            q.put((0, 'high-priority item'))
            q.put((10, 'low-priority item'))

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        (0, 'high-priority item')
        (1, 'medium-priority item')
        (10, 'low-priority item')
    r    Nc                 C   
   g | _ d S r!   rG   r6   r   r   r   r<   z  r3   zPriorityQueue._initrK   c                 C   s   t | j| d S r!   )heapqheappushrE   r^   r   r   r   r_   }  s   zPriorityQueue._putc                 C   s   t | jS r!   )rs   heappoprE   r6   r   r   r   rS     s   zPriorityQueue._getr*   r   r   r   r   r<   r   r_   rS   r   r   r   r   r   ]  s
    
r   c                   @   rq   )r   a  A `.Queue` that retrieves the most recently put items first.

    .. testcode::

        import asyncio
        from tornado.queues import LifoQueue

        async def main():
            q = LifoQueue()
            q.put(3)
            q.put(2)
            q.put(1)

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        1
        2
        3
    r    Nc                 C   rr   r!   rG   r6   r   r   r   r<     r3   zLifoQueue._initrK   c                 C   r\   r!   r]   r^   r   r   r   r_     r`   zLifoQueue._putc                 C   r4   r!   )rE   popr6   r   r   r   rS     r3   zLifoQueue._getr*   rv   r   r   r   r   r     s
    
r   )"r   r=   rn   rs   tornador   r   Ztornado.concurrentr   r   Ztornado.locksr   typingr   r   r	   r
   r   TYPE_CHECKINGr   r   r   r   __all__	Exceptionr   r   rm   ro   r-   r.   r   r   r   r   r   r   r   <module>   s8   
  '