o
    hQ                     @   sX  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlZddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddlm!Z! ddlm"Z" ddlm#Z# ddlm$Z$ ddlm%Z% ddlm&Z& dd lm'Z' e( Z)e*ed!e+ Z,e#G d"d# d#eZ-G d$d% d%e-Z.e#G d&d' d'e-Z/e#G d(d) d)e-Z0G d*d+ d+e-Z1e2ed,G d-d. d.e-Z3G d/d0 d0eZ4e5d1kr*dd2l6m7Z7 e7e8 dS dS )3z;Tests for net_connections() and Process.connections() APIs.    N)closing)AF_INET)AF_INET6)
SOCK_DGRAM)SOCK_STREAM)FREEBSD)LINUX)MACOS)NETBSD)OPENBSD)POSIX)SUNOS)WINDOWS)supports_ipv6)PY3)AF_UNIX)HAS_CONNECTIONS_UNIX)SKIP_SYSCONS)PsutilTestCase)bind_socket)bind_unix_socket)check_connection_ntuple)create_sockets)reap_children)retry_on_failure)	serialrun)skip_on_access_denied)tcp_socketpair)unix_socketpair)wait_for_fileSOCK_SEQPACKETc                   @   s&   e Zd Zdd Zdd Zd	ddZdS )
ConnectionTestCasec                 C   ,   t ststjdd}|rJ |d S d S d S Nallkind)r
   r   thisprocconnectionsselfcons r,   t/Users/merlin/projects/employee-monitoring-system/venv/lib/python3.10/site-packages/psutil/tests/test_connections.pysetUp6   s
   zConnectionTestCase.setUpc                 C   r"   r#   )r   r
   r'   r(   r)   r,   r,   r-   tearDown<   s
   zConnectionTestCase.tearDownr$   c                    s`   zt j|d}W n t jy   trY dS  w  fdd|D }|  |  | || dS )zGiven a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        r%   Nc                    s"   g | ]}|j  kr|d d qS )Npid.0cr1   r,   r-   
<listcomp>R   s   " zBConnectionTestCase.compare_procsys_connections.<locals>.<listcomp>)psutilnet_connectionsZAccessDeniedr	   sortassertEqual)r*   r2   Z	proc_consr&   Zsys_consr,   r1   r-   compare_procsys_connectionsC   s   z.ConnectionTestCase.compare_procsys_connectionsN)r$   )__name__
__module____qualname__r.   r/   r;   r,   r,   r,   r-   r!   3   s    r!   c                   @   s0   e Zd Zeeddd Zdd Zdd ZdS )	TestBasicOperationsrequires rootc                 C   sF   t   tjddD ]}t| q
W d    d S 1 sw   Y  d S r#   )r   r7   r8   r   r*   connr,   r,   r-   test_systemZ   s
   
"zTestBasicOperations.test_systemc                 C   sJ   t   t jddD ]}t| qW d    d S 1 sw   Y  d S r#   )r   r7   Processr(   r   rA   r,   r,   r-   test_process`   s
   
"z TestBasicOperations.test_processc                 C   s(   | j ttjdd | j ttjdd d S )Nz???r%   )assertRaises
ValueErrorr'   r(   r7   r8   r*   r,   r,   r-   test_invalid_kinde   s   z%TestBasicOperations.test_invalid_kindN)	r<   r=   r>   unittestskipIfr   rC   rE   rI   r,   r,   r,   r-   r?   X   s
    

r?   c                   @   s   e Zd ZdZdd Zdd Zdd Zee	  dd	d
 Z
dd Zee	  ddd Zee ddd Zee ddd ZdS )TestUnconnectedSocketsz;Tests sockets which are open but not connected to anything.c                 C   sr   t jdd}tdd |D }tstr||  S | t|d |d jdkr5| ||  j|  |d S )Nr$   r%   c                 S   s   g | ]}|j |fqS r,   )fdr3   r,   r,   r-   r6   p   s    z=TestUnconnectedSockets.get_conn_from_sock.<locals>.<listcomp>   r   r0   )	r'   r(   dictr
   r   filenor:   lenrM   )r*   sockr+   Zsmapr,   r,   r-   get_conn_from_sockn   s   z)TestUnconnectedSockets.get_conn_from_sockc                 C   s   |  |}t| |jdkr| |j|  | |j|j | |j|tj	tj
 | }|s=tr=t|tr=| }|jtkrH|dd }| |j| |jtkrftrftjdd}| jt |dd |S )zGiven a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        r0   N   r$   r%   )rS   r   rM   r:   rP   familytype
getsockoptsocket
SOL_SOCKETSO_TYPEgetsocknamer   
isinstancebytesdecoder   laddrr   r   r'   r(   r;   osgetpid)r*   rR   rB   r_   r+   r,   r,   r-   check_socket{   s$   


z#TestUnconnectedSockets.check_socketc                 C   `   d}t ttt|d}| |}|jrJ | |jtj	 W d    d S 1 s)w   Y  d S N	127.0.0.1r   addr)
r   r   r   r   rb   raddrr:   statusr7   CONN_LISTENr*   rh   rR   rB   r,   r,   r-   test_tcp_v4      

"z"TestUnconnectedSockets.test_tcp_v4zIPv6 not supportedc                 C   rc   N)::1r   rg   )
r   r   r   r   rb   ri   r:   rj   r7   rk   rl   r,   r,   r-   test_tcp_v6      

"z"TestUnconnectedSockets.test_tcp_v6c                 C   rc   rd   )
r   r   r   r   rb   ri   r:   rj   r7   	CONN_NONErl   r,   r,   r-   test_udp_v4   rn   z"TestUnconnectedSockets.test_udp_v4c                 C   rc   ro   )
r   r   r   r   rb   ri   r:   rj   r7   rs   rl   r,   r,   r-   test_udp_v6   rr   z"TestUnconnectedSockets.test_udp_v6
POSIX onlyc                 C   b   |   }tt|td}| |}|jrJ | |jtj	 W d    d S 1 s*w   Y  d S N)rV   

get_testfnr   r   r   rb   ri   r:   rj   r7   rs   r*   testfnrR   rB   r,   r,   r-   test_unix_tcp      

"z$TestUnconnectedSockets.test_unix_tcpc                 C   rw   rx   ry   r{   r,   r,   r-   test_unix_udp   r~   z$TestUnconnectedSockets.test_unix_udpN)r<   r=   r>   __doc__rS   rb   rm   rJ   rK   r   rq   rt   ru   r   r}   r   r,   r,   r,   r-   rL   j   s    


rL   c                   @   s:   e Zd ZdZeeddd Zee ddd Z	dS )	TestConnectedSocketzJTest socket pairs which are are actually connected to
    each other.
    zunreliable on SUONSc                 C   s   d}t jddr
J tt|d\}}z-t jdd}| t|d | |d jtj | |d jtj W |	  |	  d S |	  |	  w )Nre   tcp4r%   rg   rT   r   rN   )
r'   r(   r   r   r:   rQ   rj   r7   ZCONN_ESTABLISHEDclose)r*   rh   serverclientr+   r,   r,   r-   test_tcp   s   
zTestConnectedSocket.test_tcprv   c                 C   s(  |   }t|\}}ztjdd}|d jr|d jrJ |d jr)|d jr)J ts-tr4dd |D }| jt	|d|d t
sFtsFtsFtrg| |d jd	 | |d jd	 | ||d jpd|d j n| |d jpr|d j| W |  |  d S W |  |  d S |  |  w )
Nunixr%   r   rN   c                 S   s   g | ]	}|j d kr|qS )z/var/run/log)ri   r3   r,   r,   r-   r6          z1TestConnectedSocket.test_unix.<locals>.<listcomp>rT   msg )rz   r   r'   r(   r_   ri   r
   r   r:   rQ   r   r   r   r   )r*   r|   r   r   r+   r,   r,   r-   	test_unix   s*   
zTestConnectedSocket.test_unixN)
r<   r=   r>   r   rJ   rK   r   r   r   r   r,   r,   r,   r-   r      s    

r   c                   @   s.   e Zd Zdd Zeeddd Zdd ZdS )	TestFiltersc                    s   fdd}t  n |dtttgtttg |dttgttg |dtgttg |dttgtg |dtgtg |dtgtg |d	ttgtg |d
tgtg |dtgtg tro|dtgtttg W d    d S W d    d S 1 szw   Y  d S )Nc                    sh   t j| dD ]} |j|  |j| qts0tj| dD ]} |j|  |j| qd S d S )Nr%   )r'   r(   assertInrU   rV   r   r7   r8   )r&   familiestypesrB   rH   r,   r-   check  s   z'TestFilters.test_filters.<locals>.checkr$   inetinet4tcpr   tcp6udpudp4udp6r   )r   r   r   r   r   r   r    r   )r*   r   r,   rH   r-   test_filters  s\   	"zTestFilters.test_filters)Zonly_ifc                    s  t    fdd}td}td}tj jt d}|jt	t
d|d}|jt	t
d|d}|jt	td|d}|jt	td|d} |}	tt|d	d
}
 |}tt|d	d
}t r |}tt|d	d
} |}tt|d	d
}nd }d }d }d }t D ]d}| } t|d |D ]S}|j|	jkr|||t
t|
dtjd q|j|jkr|||t
t|dtjd q|jt|dd kr|||tt|dtjd q|jt|dd kr|||tt|dtjd qqd S )Nc                    s   d}t |  |j|  |j|  |j|  |j|  |j| |D ]}	| j|	d}
|	|v r<|
s;J q+|
rBJ |
q+trO 	| j
|g d S d S )N)
r$   r   r   inet6r   r   r   r   r   r   r%   )r   r:   rU   rV   r_   ri   rj   r(   r   r;   r2   )procrB   rU   rV   r_   ri   rj   kindsZ	all_kindsr&   r+   rH   r,   r-   
check_conn2  s   
z+TestFilters.test_combos.<locals>.check_conna  
            import socket, time
            s = socket.socket({family}, socket.SOCK_STREAM)
            s.bind(('{addr}', 0))
            s.listen(5)
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
            a  
            import socket, time
            s = socket.socket({family}, socket.SOCK_DGRAM)
            s.bind(('{addr}', 0))
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
            )dirrf   )rU   rh   r|   rp   T)deleterN   r,   )r$   r   r   r   r   )r$   r   r   r   r   r2   )r$   r   r   r   r   )r$   r   r   r   r   )r   textwrapdedentr`   pathbasenamerz   getcwdformatintr   r   pyrunevalr   r   r'   childrenr(   r:   rQ   r2   r   r7   rk   r   rs   getattr)r*   r   Ztcp_templateZudp_templateZtestfileZtcp4_templateZudp4_templateZtcp6_templateZudp6_templateZ	tcp4_procZ	tcp4_addrZ	udp4_procZ	udp4_addrZ	tcp6_procZ	tcp6_addrZ	udp6_procZ	udp6_addrpr+   rB   r,   rH   r-   test_combos.  sr   











zTestFilters.test_combosc                 C   s
  t  v tjdd}| t|t rdnd |D ]}| |jtt	f | |j
t qtjdd}| t|d | |d jt | |d j
t t rptjdd}| t|d | |d jt	 | |d j
t tjdd}| t|t rdnd |D ]}| |jtt	f | |j
t qtjd	d}| t|d | |d jt | |d j
t t rtjd
d}| t|d | |d jt	 | |d j
t tjdd}| t|t rdnd |D ]}| |jtt	f | |j
ttf qt r-tjdd}| t|d |D ]}| |jt	 | |j
ttf qtrZtsbtsjtjdd}| t|d |D ]+}| |jt | |j
ttf qFW d    d S W d    d S W d    d S W d    d S 1 s~w   Y  d S )Nr   r%   rT   rN   r   r   r   r   r   r   r      r   r      )r   r'   r(   r:   rQ   r   r   rU   r   r   rV   r   r   r   r   r
   r   )r*   r+   rB   r,   r,   r-   
test_count  sl   
225$zTestFilters.test_countN)r<   r=   r>   r   r   r	   r   r   r,   r,   r,   r-   r     s
    +
`r   r@   c                   @   s&   e Zd ZdZdd Ze dd ZdS )TestSystemWideConnectionszTests for net_connections().c                    s    fdd}t  : ddlm} | D ]&\}}|dkrtsq|\}}t|} t|tt	| |||| qW d    d S 1 sFw   Y  d S )Nc                    sD   | D ]} j |j||d |jtkr j |j||d t| qd S )Nr   )r   rU   r   rV   r   )r+   r   types_rB   rH   r,   r-   r     s   

z0TestSystemWideConnections.test_it.<locals>.checkr   )	conn_tmapr   )
r   psutil._commonr   itemsr   r7   r8   r:   rQ   set)r*   r   r   r&   groupsr   r   r+   r,   rH   r-   test_it  s   
"z!TestSystemWideConnections.test_itc                    s   t  }t|}W d    n1 sw   Y  g d}g }t|D ]}|  }|| td| }| |}|j q!|D ]}t	| qAfddt
jddD }	D ]! | t fdd|	D | t
 }
| t|
d| qWd S )N
   a                  import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'%s', 'w') as f:
                        f.write("hello")
                    time.sleep(60)
                c                    s   g | ]	}|j  v r|qS r,   r1   r4   x)pidsr,   r-   r6      s    zFTestSystemWideConnections.test_multi_sockets_procs.<locals>.<listcomp>r$   r%   c                    s   g | ]	}|j  kr|qS r,   r1   r   r1   r,   r-   r6     r   )r   rQ   rangerz   appendr   r   r   r2   r   r7   r8   r:   rD   r(   )r*   socksexpectedtimesfnames_fnamesrcZsprocZsysconsr   r,   )r2   r   r-   test_multi_sockets_procs  s0   




z2TestSystemWideConnections.test_multi_sockets_procsN)r<   r=   r>   r   r   r   r   r,   r,   r,   r-   r     s
    r   c                   @   s   e Zd Zdd ZdS )TestMiscc                 C   s   g }g }t tD ].}|dr6tt|}t|}| s J || t| | || || || qtr?tj	 tj
 trFtj d S d S )NZCONN_)r   r7   
startswithr   strisupperZassertNotInr   r   Z	CONN_IDLEZ
CONN_BOUNDr   ZCONN_DELETE_TCB)r*   ZintsstrsnamenumZstr_r,   r,   r-   test_connection_constants  s$   




z"TestMisc.test_connection_constantsN)r<   r=   r>   r   r,   r,   r,   r-   r   	  s    r   __main__)run_from_name)9r   r`   rX   r   rJ   
contextlibr   r   r   r   r   r7   r   r   r	   r
   r   r   r   r   r   r   Zpsutil._compatr   Zpsutil.testsr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rD   r'   r   objectr    r!   r?   rL   r   r   rK   r   r   r<   Zpsutil.tests.runnerr   __file__r,   r,   r,   r-   <module>   sl   $^7 
J>
