o
    º¼tf`  ã                   @   s^   d dl Z d dlmZ d dlZd dlmZ d dlmZ G dd„ dejƒZe	dkr-e 
¡  dS dS )é    N)ÚEmpty)Úflaky)ÚQtKernelManagerc                   @   sJ   e Zd Zdd„ Zdd„ Zddd„Zeddd	d
„ ƒZedddd„ ƒZdS )ÚTestsc                 C   s¨   t ƒ | _| j ¡  | j ¡ | _| jjddd | j ¡ | _| jjddd | jj| _| j d¡ z|  	¡  |  	¡  W dS  t
yS   | j d¡ |  	¡  |  	¡  Y dS w )zOpen a kernel.T)ÚshellZiopubzprint(0)N)r   Úkernel_managerZstart_kernelÚclientÚkernel_clientZstart_channelsÚblocking_clientÚcomm_managerÚexecuteÚ_get_next_msgÚTimeoutError©Úself© r   úc/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/qtconsole/tests/test_comms.pyÚsetUp   s    

üzTests.setUpc                 C   s,   | j r
| j jdd | jr| j ¡  dS dS )zClose the kernel.T)ÚnowN)r   Zshutdown_kernelr	   Úshutdownr   r   r   r   ÚtearDown!   s
   ÿzTests.tearDowné
   c                 C   sf   t   ¡ | }d}|dkr1|t   ¡ k rt‚z| jjdd}|d d }W n	 ty,   Y nw |dks|S )NÚstatusé   )ÚtimeoutÚheaderÚmsg_type)Útimer   r
   Zget_iopub_msgr   )r   r   Ztimeout_timer   Úmsgr   r   r   r   (   s   ÿúzTests._get_next_msg)Zmax_runsc                    sN  | j ‰ | j}G ‡ fdd„dƒ}|ƒ }| d¡ |  ¡ }|d d dks%J ‚|  ¡ }|d d dks3J ‚ˆ  |¡ |jdks?J ‚|jj|d	 d
 ksKJ ‚|  ¡ }|d d dksYJ ‚ˆ  |¡ |jdkseJ ‚|jj|d	 d
 ksqJ ‚|  ¡ }|d d dksJ ‚ˆ  |¡ |jdks‹J ‚|jj|d	 d
 ks—J ‚|  ¡ }|d d dks¥J ‚dS )z,Communicate from the kernel to the frontend.c                       s(   e Zd Z‡ fdd„Zdd„ Zdd„ ZdS )z7Tests.test_kernel_to_frontend.<locals>.DummyCommHandlerc                    s   ˆ   d| j¡ d | _d S )NÚtest_api)Zregister_targetÚ	comm_openÚlast_msgr   ©r   r   r   Ú__init__=   s   
z@Tests.test_kernel_to_frontend.<locals>.DummyCommHandler.__init__c                 S   s0   |  | j¡ | | j¡ |d d | _|| _d S ©NÚcontentÚdata)Zon_msgÚcomm_messageZon_closer!   Úcomm)r   r(   r   r   r   r   r    A   s   
zATests.test_kernel_to_frontend.<locals>.DummyCommHandler.comm_openc                 S   s   |d d | _ d S r$   )r!   )r   r   r   r   r   r'   G   s   zDTests.test_kernel_to_frontend.<locals>.DummyCommHandler.comm_messageN)Ú__name__Ú
__module__Ú__qualname__r#   r    r'   r   r"   r   r   ÚDummyCommHandler<   s    r,   z‘from ipykernel.comm import Comm
comm = Comm(target_name='test_api', data='open')
comm.send('message')
comm.close('close')
del comm
print('Done')
r   r   Úexecute_inputr    Úopenr%   Úcomm_idZcomm_msgÚmessageÚ
comm_closeÚcloseÚstreamN)r   r
   r   r   Ú	_dispatchr!   r(   r/   )r   r
   r,   Úhandlerr   r   r"   r   Útest_kernel_to_frontend6   s4   ÿ	


zTests.test_kernel_to_frontendc                 C   sø   | j }| j}| d¡ |  ¡ }|d d dksJ ‚|jddd}|  ¡ }|d d dks.J ‚|d	 d
 dks8J ‚| d¡ |  ¡ }|d d dksKJ ‚|d	 d
 dksUJ ‚| d¡ |  ¡ }|d d dkshJ ‚|d dkspJ ‚|d	 d
 dkszJ ‚dS )z,Communicate from the frontend to the kernel.a‘  class DummyCommHandler():
    def __init__(self):
        get_ipython().kernel.comm_manager.register_target(
            'test_api', self.comm_open)
    def comm_open(self, comm, msg):
        comm.on_msg(self.comm_message)
        comm.on_close(self.comm_message)
        print(msg['content']['data'])
    def comm_message(self, msg):
        print(msg['content']['data'])
dummy = DummyCommHandler()
r   r   r-   r   r.   )r&   r3   r%   Útextzopen
r0   zmessage
r2   Zparent_headerr1   zclose
N)r   r
   r   r   Znew_commÚsendr2   )r   r   r
   r   r(   r   r   r   Útest_frontend_to_kernell   s(   ÿ

zTests.test_frontend_to_kernelN)r   )	r)   r*   r+   r   r   r   r   r6   r9   r   r   r   r   r   
   s    

5r   Ú__main__)r   Úqueuer   Zunittestr   Zqtconsole.managerr   ZTestCaser   r)   Úmainr   r   r   r   Ú<module>   s     ÿ