o
    adfT                     @   sd  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlZddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddlm!Z! ddlm"Z" ddlm#Z# ddlm$Z$ ddlm%Z% ddlm&Z& dd lm'Z' dd!lm(Z( e)ed"e* Z+d#d$ Z,e$G d%d& d&eZ-G d'd( d(e-Z.e$G d)d* d*e-Z/e$G d+d, d,e-Z0G d-d. d.e-Z1e2ed/G d0d1 d1e-Z3G d2d3 d3eZ4e5d4kr0dd5l6m7Z7 e7e8 dS dS )6z;Tests for net_connections() and Process.connections() APIs.    N)closing)AF_INET)AF_INET6)
SOCK_DGRAM)SOCK_STREAM)FREEBSD)LINUX)MACOS)NETBSD)OPENBSD)POSIX)SUNOS)WINDOWS)supports_ipv6)PY3)AF_UNIX)HAS_CONNECTIONS_UNIX)SKIP_SYSCONS)PsutilTestCase)bind_socket)bind_unix_socket)check_connection_ntuple)create_sockets)filter_proc_connections)reap_children)retry_on_failure)	serialrun)skip_on_access_denied)tcp_socketpair)unix_socketpair)wait_for_fileSOCK_SEQPACKETc                 C   s$   t  j| d}| dv rt|S |S )Nkind)allunix)psutilProcessconnectionsr   )r#   cons r*   f/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/psutil/tests/test_connections.pythis_proc_connections3   s   r,   c                   @   s&   e Zd Zdd Zdd Zd	ddZdS )
ConnectionTestCasec                 C      |  tddg  d S Nr$   r"   assertEqualr,   selfr*   r*   r+   setUp<   s   zConnectionTestCase.setUpc                 C   r.   r/   r0   r2   r*   r*   r+   tearDown?   s   zConnectionTestCase.tearDownr$   c                    s`   zt j|d}W n t jy   trY dS  w  fdd|D }|  |  | || dS )zGiven a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        r"   Nc                    s"   g | ]}|j  kr|d d qS )Npid.0cr7   r*   r+   
<listcomp>R   s   " zBConnectionTestCase.compare_procsys_connections.<locals>.<listcomp>)r&   net_connectionsZAccessDeniedr	   sortr1   )r3   r8   Z	proc_consr#   Zsys_consr*   r7   r+   compare_procsys_connectionsC   s   z.ConnectionTestCase.compare_procsys_connectionsN)r$   )__name__
__module____qualname__r4   r5   r?   r*   r*   r*   r+   r-   :   s    r-   c                   @   s0   e Zd Zeeddd Zdd Zdd ZdS )	TestBasicOperationsrequires rootc                 C   sF   t   tjddD ]}t| q
W d    d S 1 sw   Y  d S r/   )r   r&   r=   r   r3   connr*   r*   r+   test_systemY   s
   
"zTestBasicOperations.test_systemc                 C   sD   t   tddD ]}t| q	W d    d S 1 sw   Y  d S r/   )r   r,   r   rE   r*   r*   r+   test_process_   s
   
"z TestBasicOperations.test_processc                 C   s&   | j ttdd | j ttjdd d S )Nz???r"   )assertRaises
ValueErrorr,   r&   r=   r2   r*   r*   r+   test_invalid_kindd   s   z%TestBasicOperations.test_invalid_kindN)	r@   rA   rB   unittestskipIfr   rG   rH   rK   r*   r*   r*   r+   rC   X   s
    

rC   c                   @   s   e Zd ZdZdd Zdd Zdd Zee	  dd	d
 Z
dd Zee	  ddd Zee ddd Zee ddd ZdS )TestUnconnectedSocketsz;Tests sockets which are open but not connected to anything.c                 C   sp   t dd}tdd |D }tstr||  S | t|d |d jdkr4| ||  j|  |d S )Nr$   r"   c                 S   s   g | ]}|j |fqS r*   )fdr9   r*   r*   r+   r<   o   s    z=TestUnconnectedSockets.get_conn_from_sock.<locals>.<listcomp>   r   r6   )r,   dictr
   r   filenor1   lenrO   )r3   sockr)   Zsmapr*   r*   r+   get_conn_from_sockm   s   
z)TestUnconnectedSockets.get_conn_from_sockc                 C   s   |  |}t| |jdkr| |j|  | |j|j | |j|tj	tj
 | }|s=tr=t|tr=| }|jtkrH|dd }| |j| |jtkretretdd}| jt |dd |S )zGiven a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        r6   N   r$   r"   )rU   r   rO   r1   rR   familytype
getsockoptsocket
SOL_SOCKETSO_TYPEgetsocknamer   
isinstancebytesdecoder   laddrr   r   r,   r?   osgetpid)r3   rT   rF   ra   r)   r*   r*   r+   check_socketz   s$   



z#TestUnconnectedSockets.check_socketc                 C   d   d}t ttt|d}| |}| |jd | |jtj	 W d    d S 1 s+w   Y  d S N	127.0.0.1r   addrr*   )
r   r   r   r   rd   r1   raddrstatusr&   CONN_LISTENr3   rj   rT   rF   r*   r*   r+   test_tcp_v4      
"z"TestUnconnectedSockets.test_tcp_v4zIPv6 not supportedc                 C   re   N)::1r   ri   r*   )
r   r   r   r   rd   r1   rk   rl   r&   rm   rn   r*   r*   r+   test_tcp_v6      
"z"TestUnconnectedSockets.test_tcp_v6c                 C   re   rf   )
r   r   r   r   rd   r1   rk   rl   r&   	CONN_NONErn   r*   r*   r+   test_udp_v4   rp   z"TestUnconnectedSockets.test_udp_v4c                 C   re   rq   )
r   r   r   r   rd   r1   rk   rl   r&   ru   rn   r*   r*   r+   test_udp_v6   rt   z"TestUnconnectedSockets.test_udp_v6
POSIX onlyc                 C   f   |   }tt|td}| |}| |jd | |jtj	 W d    d S 1 s,w   Y  d S N)rX    

get_testfnr   r   r   rd   r1   rk   rl   r&   ru   r3   testfnrT   rF   r*   r*   r+   test_unix_tcp      
"z$TestUnconnectedSockets.test_unix_tcpc                 C   ry   rz   r|   r~   r*   r*   r+   test_unix_udp   r   z$TestUnconnectedSockets.test_unix_udpN)r@   rA   rB   __doc__rU   rd   ro   rL   rM   r   rs   rv   rw   r   r   r   r*   r*   r*   r+   rN   i   s     


rN   c                   @   s:   e Zd ZdZeeddd Zee ddd Z	dS )	TestConnectedSocketzFTest socket pairs which are actually connected to
    each other.
    zunreliable on SUONSc                 C   s   d}|  tddg  tt|d\}}z,tdd}|  t|d |  |d jtj |  |d jtj W |  |  d S |  |  w )Nrg   tcp4r"   ri   rV   r   rP   )	r1   r,   r   r   rS   rl   r&   ZCONN_ESTABLISHEDclose)r3   rj   serverclientr)   r*   r*   r+   test_tcp   s   

zTestConnectedSocket.test_tcprx   c                 C   s.  |   }t|\}}ztdd}|d jr|d jrJ ||d jr,|d jr,J |ts0tr7dd |D }| jt|d|d t	sItsIt
sItrj| |d jd	 | |d jd	 | ||d jpg|d j n| |d jpu|d j| W |  |  d S W |  |  d S |  |  w )
Nr%   r"   r   rP   c                 S   s   g | ]	}|j d kr|qS )z/var/run/log)rk   r9   r*   r*   r+   r<          z1TestConnectedSocket.test_unix.<locals>.<listcomp>rV   msgr{   )r}   r   r,   ra   rk   r
   r   r1   rS   r   r   r   r   )r3   r   r   r   r)   r*   r*   r+   	test_unix   s*   

zTestConnectedSocket.test_unixN)
r@   rA   rB   r   rL   rM   r   r   r   r   r*   r*   r*   r+   r      s    

r   c                   @   s.   e Zd Zdd Zeeddd Zdd ZdS )	TestFiltersc                    s   fdd}t  n |dtttgtttg |dttgttg |dtgttg |dttgtg |dtgtg |dtgtg |d	ttgtg |d
tgtg |dtgtg tro|dtgtttg W d    d S W d    d S 1 szw   Y  d S )Nc                    sf   t | dD ]} |j|  |j| qts/tj| dD ]} |j|  |j| qd S d S )Nr"   )r,   assertInrW   rX   r   r&   r=   )r#   familiestypesrF   r2   r*   r+   check  s   z'TestFilters.test_filters.<locals>.checkr$   inetinet4tcpr   tcp6udpudp4udp6r%   )r   r   r   r   r   r   r!   r   )r3   r   r*   r2   r+   test_filters  s0   	"zTestFilters.test_filters)Zonly_ifc                    s  t    fdd}td}td}tj jt d}|jt	t
d|d}|jt	t
d|d}|jt	td|d}|jt	td|d} |}	tt|d	d
}
 |}tt|d	d
}t r |}tt|d	d
} |}tt|d	d
}nd }d }d }d }t  D ]d}| } t|d |D ]S}|j|	jkr|||t
t|
dtjd q|j|jkr|||t
t|dtjd q|jt|dd kr|||tt|dtjd q|jt|dd kr|||tt|dtjd qqd S )Nc                    s   d}t |  |j|  |j|  |j|  |j|  |j| |D ]}	| j|	d}
|	|v r> |
g  q+ |
g  q+t	rQ 
| j|g d S d S )N)
r$   r   r   inet6r   r   r   r   r   r   r"   )r   r1   rW   rX   ra   rk   rl   r(   ZassertNotEqualr   r?   r8   )procrF   rW   rX   ra   rk   rl   kindsZ	all_kindsr#   r)   r2   r*   r+   
check_conn%  s   z+TestFilters.test_combos.<locals>.check_conna  
            import socket, time
            s = socket.socket({family}, socket.SOCK_STREAM)
            s.bind(('{addr}', 0))
            s.listen(5)
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
            a  
            import socket, time
            s = socket.socket({family}, socket.SOCK_DGRAM)
            s.bind(('{addr}', 0))
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
            )dirrh   )rW   rj   r   rr   T)deleterP   r*   )r$   r   r   r   r   )r$   r   r   r   r   r8   )r$   r   r   r   r   )r$   r   r   r   r   )r   textwrapdedentrb   pathbasenamer}   getcwdformatintr   r   pyrunevalr    r   r&   r'   childrenr(   r1   rS   r8   r   rm   r   ru   getattr)r3   r   Ztcp_templateZudp_templateZtestfileZtcp4_templateZudp4_templateZtcp6_templateZudp6_templateZ	tcp4_procZ	tcp4_addrZ	udp4_procZ	udp4_addrZ	tcp6_procZ	tcp6_addrZ	udp6_procZ	udp6_addrpr)   rF   r*   r2   r+   test_combos!  s   











zTestFilters.test_combosc                 C   s  t  m tdd}| t|t rdnd |D ]}| |jttf | |j	t
 qtdd}| t|d | |d jt | |d j	t
 t rmtdd}| t|d | |d jt | |d j	t
 tdd}| t|t r|dnd |D ]}| |jttf | |j	t qtd	d}| t|d | |d jt | |d j	t t rtd
d}| t|d | |d jt | |d j	t tdd}| t|t rdnd |D ]}| |jttf | |j	t
tf qt r%tdd}| t|d |D ]}| |jt | |j	t
tf qtrQtsYtsatdd}| t|d |D ]+}| |jt | |j	t
tf q=W d    d S W d    d S W d    d S W d    d S 1 suw   Y  d S )Nr   r"   rV   rP   r   r   r   r   r   r   r      r   r%      )r   r,   r1   rS   r   r   rW   r   r   rX   r   r   r   r   r
   r   )r3   r)   rF   r*   r*   r+   
test_count  sl   









225$zTestFilters.test_countN)r@   rA   rB   r   r   r	   r   r   r*   r*   r*   r+   r     s    
 r   rD   c                   @   s&   e Zd ZdZdd Ze dd ZdS )TestSystemWideConnectionszTests for net_connections().c                    s    fdd}t  : ddlm} | D ]&\}}|dkrtsq|\}}t|} t|tt	| |||| qW d    d S 1 sFw   Y  d S )Nc                    sD   | D ]} j |j||d |jtkr j |j||d t| qd S )Nr   )r   rW   r   rX   r   )r)   r   types_rF   r2   r*   r+   r     s   

z0TestSystemWideConnections.test_it.<locals>.checkr   )	conn_tmapr%   )
r   psutil._commonr   itemsr   r&   r=   r1   rS   set)r3   r   r   r#   groupsr   r   r)   r*   r2   r+   test_it  s   
"z!TestSystemWideConnections.test_itc                    s   t  }t|}W d    n1 sw   Y  g d}g }t|D ]}|  }|| td| }| |}|j q!|D ]}t	| qAfddt
jddD }	D ]! | t fdd|	D | t
 }
| t|
d| qWd S )N
   a                  import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'%s', 'w') as f:
                        f.write("hello")
                    time.sleep(60)
                c                    s   g | ]	}|j  v r|qS r*   r7   r:   x)pidsr*   r+   r<     s    zFTestSystemWideConnections.test_multi_sockets_procs.<locals>.<listcomp>r$   r"   c                    s   g | ]	}|j  kr|qS r*   r7   r   r7   r*   r+   r<   #  r   )r   rS   ranger}   appendr   r   r   r8   r    r&   r=   r1   r'   r(   )r3   socksexpectedtimesfnames_fnamesrcZsprocZsysconsr   r*   )r8   r   r+   test_multi_sockets_procs  s4   






z2TestSystemWideConnections.test_multi_sockets_procsN)r@   rA   rB   r   r   r   r   r*   r*   r*   r+   r     s
    r   c                   @   s   e Zd Zdd ZdS )TestMiscc                 C   s   g }g }t tD ].}|dr6tt|}t|}| s J || t| | || || || qtr?tj	 tj
 trFtj d S d S )NZCONN_)r   r&   
startswithr   strisupperZassertNotInr   r   Z	CONN_IDLEZ
CONN_BOUNDr   ZCONN_DELETE_TCB)r3   ZintsstrsnamenumZstr_r*   r*   r+   test_connection_constants*  s$   




z"TestMisc.test_connection_constantsN)r@   rA   rB   r   r*   r*   r*   r+   r   )  s    r   __main__)run_from_name)9r   rb   rZ   r   rL   
contextlibr   r   r   r   r   r&   r   r   r	   r
   r   r   r   r   r   r   Zpsutil._compatr   Zpsutil.testsr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r   objectr!   r,   r-   rC   rN   r   r   rM   r   r   r@   Zpsutil.tests.runnerr   __file__r*   r*   r*   r+   <module>   sn   _7 
gA
