o
    tfL0                     @   s   d dl Z d dlZ d dlZd dlZd dlZd dlZ	 e jdddkZ	e jddZ
e
dkZdZG dd	 d	ejZed
krBe  dS dS )    NTEST_WITH_INTERNET01LOCAL_WS_SERVER_PORTz-1Tc                   @   s6  e Zd ZG dd dZdd Zdd Zdd Zee	d	d
d Z
edddd Zee	d	dd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd  Zee	d	d!d" Zee	d	d#d$ Zee	d	d%d& Zd'S )(WebSocketAppTestc                   @   s   e Zd ZdZdS )zWebSocketAppTest.NotSetYetz?A marker class for signalling that a value hasn't been set yet.N)__name__
__module____qualname____doc__ r   r   a/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/websocket/tests/test_app.py	NotSetYet'   s    r   c                 C   s6   t t t t_t t_t t_t t_d S N)	wsZenableTrace	TRACEABLEr   r   keep_running_openkeep_running_closeget_mask_key_idon_error_dataselfr   r   r   setUp*   s
   



zWebSocketAppTest.setUpc                 C   s,   t  t _t  t _t  t _t  t _d S r   )r   r   r   r   r   r   r   r   r   r   tearDown2   s   


zWebSocketAppTest.tearDownc                 C   s   d S r   r   r   r   r   r   close8      zWebSocketAppTest.closez/Tests using local websocket server are disabledc                    s@   dd } fdd}dd }t jdt |||d}|  d	S )
{A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        c                 _   s   |  d | jt_d| _dS )zmSet the keep_running flag for later inspection and immediately
            close the connection.
            hello!FN)sendkeep_runningr   r   r   argskwargsr   r   r   on_openC   s   

z3WebSocketAppTest.test_keep_running.<locals>.on_openc                       t |    d S r   printr   _messager   r   r   
on_messageK      z6WebSocketAppTest.test_keep_running.<locals>.on_messagec                 _   s   | j t_dS )z.Set the keep_running flag for the test to use.N)r   r   r   r   r   r   r   on_closeO   s   z4WebSocketAppTest.test_keep_running.<locals>.on_closews://127.0.0.1:)r"   r+   r)   Nr   WebSocketAppr   run_forever)r   r"   r)   r+   appr   r   r   test_keep_running;   s   z"WebSocketAppTest.test_keep_runningFz$Test disabled for now (requires rel)c                    s:   dd } fdd}t jdt ||d}|jdd d	S )
r   c                 _   s    |  d |   |  d dS )z*Send a message, receive, and send one morer   zgoodbye!N)r   recvr   r   r   r   r"   b   s   
z=WebSocketAppTest.test_run_forever_dispatcher.<locals>.on_openc                    r#   r   r$   r&   r   r   r   r)   h   r*   z@WebSocketAppTest.test_run_forever_dispatcher.<locals>.on_messager,   )r"   r)   Z
Dispatcher)
dispatcherNr-   )r   r"   r)   r0   r   r   r   test_run_forever_dispatcher\   s   z,WebSocketAppTest.test_run_forever_dispatcherc                 C   s<   t dt }tjd|jd  | }| |d dS )zaThe WebSocketApp.run_forever() method should return `False` when the application ends gracefully.r,   g?)intervalfunctionFN)	r   r.   r   	threadingTimerr   startr/   assertEqual)r   r0   Zteardownr   r   r   $test_run_forever_teardown_clean_exitv   s   z5WebSocketAppTest.test_run_forever_teardown_clean_exitz%Internet-requiring tests are disabledc                 C   s0   dd }t jd|d}| t|jt| dS )zhA WebSocketApp should forward the received mask_key function down
        to the actual socket.
        c                   S   s   dS )Nz    r   r   r   r   r   my_mask_key_func   r   z=WebSocketAppTest.test_sock_mask_key.<locals>.my_mask_key_funcwss://api-pub.bitfinex.com/ws/1)get_mask_keyN)r   r.   r:   idr>   )r   r<   r0   r   r   r   test_sock_mask_key   s
   z#WebSocketAppTest.test_sock_mask_keyc                 C   sB   dd }dd }t jd||d}| jt j|jddd	tjid
 dS )z7Test exception handling if ping_interval < ping_timeoutc                 S      t d |   d S NzGot a ping!r$   r0   r'   r   r   r   on_ping   r*   zIWebSocketAppTest.test_invalid_ping_interval_ping_timeout.<locals>.on_pingc                 S   rA   NzGot a pong! No need to respondr$   rC   r   r   r   on_pong   r*   zIWebSocketAppTest.test_invalid_ping_interval_ping_timeout.<locals>.on_pongr=   rD   rF         	cert_reqsping_intervalping_timeoutssloptNr   r.   assertRaisesZWebSocketExceptionr/   ssl	CERT_NONEr   rD   rF   r0   r   r   r   'test_invalid_ping_interval_ping_timeout   s   
z8WebSocketAppTest.test_invalid_ping_interval_ping_timeoutc                 C   s:   dd }dd }t jd||d}|jddd	tjid
 dS )z+Test WebSocketApp proper ping functionalityc                 S   rA   rB   r$   rC   r   r   r   rD      r*   z4WebSocketAppTest.test_ping_interval.<locals>.on_pingc                 S   rA   rE   r$   rC   r   r   r   rF      r*   z4WebSocketAppTest.test_ping_interval.<locals>.on_pongr=   rG   rI   rH   rJ   rK   N)r   r.   r/   rQ   rR   rS   r   r   r   test_ping_interval   s   
z#WebSocketAppTest.test_ping_intervalc                 C   s   t d}|jdddd dS )zTest WebSocketApp close opcode'wss://tsock.us1.twilio.com/v3/wsconnectrI   rH   zPing payload)rL   rM   Zping_payloadN)r   r.   r/   r   r0   r   r   r   test_opcode_close   s   
z"WebSocketAppTest.test_opcode_closec                 C   *   t d}| jt j|jddtjid dS )z1A WebSocketApp handling of negative ping_intervalr=   rJ   )rL   rN   NrO   rW   r   r   r   test_bad_ping_interval      

z'WebSocketAppTest.test_bad_ping_intervalc                 C   rY   )z0A WebSocketApp handling of negative ping_timeoutr=   rJ   )rM   rN   NrO   rW   r   r   r   test_bad_ping_timeout   r\   z&WebSocketAppTest.test_bad_ping_timeoutc                 C   s   dd }t jd|d}t jt jjdd}| ddg|| t jt jjd	d}| d
d
g|| t d}t jt jjd	d}| d
d
g|| | jt j|jdd d
S )zKTest extraction of close frame status code and close reason in WebSocketAppc                 S   s   t d d S )Nzon_close reached)r%   )ZwsappZclose_status_codeZ	close_msgr   r   r   r+         z9WebSocketAppTest.test_close_status_code.<locals>.on_closerV   )r+   s   no-init-from-client)opcodedatai  zno-init-from-client    Nztest if connection is closed)ra   )	r   r.   ABNFZOPCODE_CLOSEr:   Z_get_close_argsrP   Z"WebSocketConnectionClosedExceptionr   )r   r+   r0   Z
closeframeZapp2r   r   r   test_close_status_code   s$   

z'WebSocketAppTest.test_close_status_codec                    sx   d ddd } fdd}dd }t jdt |||d	}|jd
dd | | |  t | t d dS )z)Test callback function exception handlingNc                 S      t dNCallback failedRuntimeError)r0   r   r   r   r"        zBWebSocketAppTest.test_callback_function_exception.<locals>.on_openc                    s   | | d S r   r   )r0   errexc
passed_appr   r   on_error  s   zCWebSocketAppTest.test_callback_function_exception.<locals>.on_errorc                 S   s   |    d S r   r   rC   r   r   r   rF     r_   zBWebSocketAppTest.test_callback_function_exception.<locals>.on_pongr,   r"   ro   rF   rI   rH   rL   rM   rg   )r   r.   r   r/   r:   assertIsInstanceri   str)r   r"   ro   rF   r0   r   rl   r    test_callback_function_exception   s   z1WebSocketAppTest.test_callback_function_exceptionc                 C   sH   G dd d}| }|  |j|j | |jt |  t|jd dS )z'Test callback method exception handlingc                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
zBWebSocketAppTest.test_callback_method_exception.<locals>.Callbacksc                 S   s@   d | _ d | _tjdt | j| j| jd| _| jj	ddd d S )Nr,   rq   rI   rH   rr   )
rm   rn   r   r.   r   r"   ro   rF   r0   r/   r   r   r   r   __init__$  s   zKWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.__init__c                 S   re   rf   rh   )r   r'   r   r   r   r"   /  rj   zJWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.on_openc                 S   s   || _ || _d S r   )rn   rm   )r   r0   rk   r   r   r   ro   2  s   
zKWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.on_errorc                 S   s   |   d S r   rp   )r   r0   r'   r   r   r   rF   6  r_   zJWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.on_pongN)r   r   r	   rv   r"   ro   rF   r   r   r   r   	Callbacks#  s
    rw   rg   N)r:   rn   r0   rs   rm   ri   rt   )r   rw   	callbacksr   r   r   test_callback_method_exception  s
   z/WebSocketAppTest.test_callback_method_exceptionc                    st   dd  fdd}fdd}t jdt ||d}|jd	d
dd | d	 |  t j | t d dS )zTest reconnectr   Nc                    s   | d S r   r   )r'   rk   )rm   r   r   ro   G  s   z1WebSocketAppTest.test_reconnect.<locals>.on_errorc                    s2    d7   dkr| j    dkr|   d S d S )NrH   rI   )sockshutdownr   rC   )
pong_countr   r   rF   K  s   
z0WebSocketAppTest.test_reconnect.<locals>.on_pongr,   )rF   ro   rI   rH      )rL   rM   Z	reconnectzping/pong timed out)r   r.   r   r/   r:   rs   ZWebSocketTimeoutExceptionrt   )r   ro   rF   r0   r   )rm   r|   r   test_reconnect?  s   
zWebSocketAppTest.test_reconnectN)r   r   r	   r   r   r   r   unittestZ
skipUnlessTEST_WITH_LOCAL_SERVERr1   r4   r;   r   r@   rT   rU   rX   r[   r^   rd   ru   ry   r~   r   r   r   r   r   &   sR    





















r   __main__)osZos.pathrQ   r7   r   Z	websocketr   environgetr   r   r   r   ZTestCaser   r   mainr   r   r   r   <module>   s"     ;