o
    Dfb<                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
 ddlmZmZ dZdejdddfdddd	d
Zdd ZG dd dejZG dd dejZdS )    N)ListOptionalTuple   )igzip	isal_zlibi   rb   )threads
block_sizec          	   	   C   s   |dkrt | |||||S |dk r-z	ttd}W n   zt }W n   d}Y Y d|v r;tt	| |d}ntj
t| |dd|||d|d}d|v rYt||||S |S )	a  
    Utilize threads to read and write gzip objects and escape the GIL.
    Comparable to gzip.open. This method is only usable for streamed reading
    and writing of objects. Seeking is not supported.

    threads == 0 will defer to igzip.open. A threads < 0 will attempt to use
    the number of threads in the system.

    :param filename: str, bytes or file-like object (supporting read or write
                    method)
    :param mode: the mode with which the file should be opened.
    :param compresslevel: Compression level, only used for gzip writers.
    :param encoding: Passed through to the io.TextIOWrapper, if applicable.
    :param errors: Passed through to the io.TextIOWrapper, if applicable.
    :param newline: Passed through to the io.TextIOWrapper, if applicable.
    :param threads: If 0 will defer to igzip.open, if < 0 will use all threads
                    available to the system. Reading gzip can only
                    use one thread.
    :param block_size: Determines how large the blocks in the read/write
                       queues are for threaded reading and writing.
    :return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
             depending on the mode.
    r   r   r)r   tb)r   levelr
   )buffer_size)r   openlenossched_getaffinitymultiprocessing	cpu_countioBufferedReader_ThreadedGzipReaderBufferedWriter_ThreadedGzipWriterreplaceTextIOWrapper)	filenamemodecompresslevelencodingerrorsnewliner
   r   Z	gzip_file r$   \/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/isal/igzip_threaded.pyr      s<   


r   c                 C   s\   t | ttfst| drt| |}d}||fS t| ds"t| dr*| }d}||fS td)N
__fspath__TreadwriteFz1filename must be a str or bytes object, or a file)
isinstancestrbyteshasattrbuiltinsr   	TypeError)r   Z	open_modebinary_fileclosefdr$   r$   r%   open_as_binary_streamN   s   r1   c                   @   sh   e Zd ZdddZdddZdd	 Zd
d ZdefddZde	fddZ
dddZedefddZdS )r      r	   c                 C   s   t |d\| _| _tj| jd| d| _d| _d| _t	|| _d| _
d | _t | _|| _tj| jd| _d| _d| _| j  d S )Nr      )
buffersizer   FtargetT)r1   rawr0   r   Z_IGzipReaderfileobjpos	read_filequeueQueueeof	exceptionr   BytesIObufferr   	threadingThread_decompressworker_closedrunningstart)selfr   
queue_sizer   r$   r$   r%   __init__[   s   
z_ThreadedGzipReader.__init__Nc                 C      | j rtdd S NzI/O operation on closed filerE   
ValueErrorrH   msgr$   r$   r%   _check_closedj      z!_ThreadedGzipReader._check_closedc              
   C   s   | j }| j}| jrKz| j|}W n ty' } z
|| _W Y d }~d S d }~ww |s,d S | jrFz	|j|dd W n tjyB   Y nw | js/| js	d S d S )N皙?timeout)	r   r;   rF   r8   r'   	Exceptionr>   putFull)rH   r   Zblock_queuedataer$   r$   r%   rC   n   s*   z_ThreadedGzipReader._decompressc                 C   s   |    | j|}|dkr>	 z	| jjdd}W n tjy0   | j s.| jr+| jY dS Y nw qt	
|| _| j|}|  j|7  _|S )Nr   Tg{Gz?rT   )rQ   r@   readintor;   getEmptyrD   is_aliver>   r   r?   r9   )rH   r   resultZdata_from_queuer$   r$   r%   r[      s&   

z_ThreadedGzipReader.readintoreturnc                 C      dS NTr$   rH   r$   r$   r%   readable      z_ThreadedGzipReader.readablec                 C   s   |    | jS N)rQ   r9   rc   r$   r$   r%   tell   s   z_ThreadedGzipReader.tellc                 C   s>   | j rd S d| _| j  | j  | jr| j  d| _ d S )NFT)rE   rF   rD   joinr8   closer0   r7   rc   r$   r$   r%   ri      s   



z_ThreadedGzipReader.closec                 C      | j S rf   rE   rc   r$   r$   r%   closed      z_ThreadedGzipReader.closed)r2   r	   rf   r`   N)__name__
__module____qualname__rJ   rQ   rC   r[   boolrd   intrg   ri   propertyrl   r$   r$   r$   r%   r   Z   s    



r   c                   @   s   e Zd ZdZdejdddfdedededed	ef
d
dZd)ddZ	dd Z
dd Zdd ZdefddZdd Zd*ddZedefddZdefdd Zd!d" Zd#d$ Zd%d& Zdefd'd(ZdS )+r   a  
    Write a gzip file using multiple threads.

    This class is heavily inspired by pigz from Mark Adler
    (https://github.com/madler/pigz). It works similarly.

    Each thread gets its own input and output queue. The program performs a
    round robin using an index. The writer thread reads from the output
    queues in a round robin using an index. This way all the blocks will be
    written to the output stream in order while still allowing independent
    compression for each thread.

    Writing to the ThreadedGzipWriter happens on the main thread in a
    io.BufferedWriter. The BufferedWriter will offer a memoryview of its
    buffer. Using the bytes constructor this is made into an immutable block of
    data.

    A reference to the previous block is used to create a memoryview of the
    last 32k of that block. This is used as a dictionary for the compression
    allowing for better compression rates.

    The current block and the dictionary are pushed into an input queue. They
    are picked up by a compression worker that calculates the crc32, the
    length of the data and compresses the block. The compressed block, checksum
    and length are pushed into an output queue.

    The writer thread reads from output queues and uses the crc32_combine
    function to calculate the total crc. It also writes the compressed block.

    When only one thread is requested, only the input queue is used and
    compressing and output is handled in one thread.
    wbr   r	   r   r   r
   rI   r   c                    sj  d_ d|v sd|v rtdd|vr|d7 }t _d __d_|t|d d  |_	 fd	d
t
|D _|dkrofdd
t
|D _fdd
t
|D _tjjd_fdd
t
|D _n!|dkrtg_g _g _tjjd_ntd| |_d_d_d_d_t||\__d_     d S )NTr   r   z Only binary writing is supportedr       
   i  c                    s   g | ]	}t j d qS ))r4   r   )r   Z_ParallelCompress.0_)compress_buffer_sizer   r$   r%   
<listcomp>   s    z0_ThreadedGzipWriter.__init__.<locals>.<listcomp>r   c                       g | ]}t  qS r$   r;   r<   rx   rI   r$   r%   r|          
c                    r}   r$   r~   rx   r   r$   r%   r|      r   r5   c                    s   g | ]}t j j|fd qS ))r6   args)rA   rB   	_compress)ry   irc   r$   r%   r|      s    z"threads should be at least 1, got r   F)rE   rN   rA   Locklockr>   r   previous_blockmaxr   rangecompressorsinput_queuesoutput_queuesrB   _writeoutput_workercompression_workersr;   r<   _compress_and_writer
   index_crcrF   _sizer1   r7   r0   _write_gzip_headerrG   )rH   r   r   r   r
   rI   r   r$   )r{   r   rI   rH   r%   rJ      sT   






z_ThreadedGzipWriter.__init__Nc                 C   rK   rL   rM   rO   r$   r$   r%   rQ     rR   z!_ThreadedGzipWriter._check_closedc                 C   sN   d}d}d}d}d}d}| j dkrdnd}| jtd||||||| dS )	z<Simple gzip header. Only xfl flag is set according to level.      r3   r         ZBBBBIBBN)r   r7   r(   structpack)rH   Zmagic1Zmagic2methodflagsmtimer   xflr$   r$   r%   r     s   

z&_ThreadedGzipWriter._write_gzip_headerc                 C   s(   d| _ | j  | jD ]}|  qd S rb   )rF   r   rG   r   rH   rD   r$   r$   r%   rG     s
   


z_ThreadedGzipWriter.startc                 C   s(   d| _ | jD ]}|  q| j  dS )z(Stop, but do not care for remaining workFN)rF   r   rh   r   r   r$   r$   r%   stop  s   

z_ThreadedGzipWriter.stopr`   c           
      C   s  |    | j | jr| jW d    n1 sw   Y  t|tr%|jnt|}|| jkrSt|}d}d}||k rQ|| |||| j  7 }|| j7 }||k s:|S t	|}| j
}t| jt d  }|| _|  j
d7  _
|| j }	| j|	 ||f t|S )Nr   r   )rQ   r   r>   r)   
memoryviewnbytesr   r   r(   r+   r   r   DEFLATE_WINDOW_SIZEr
   r   rW   )
rH   r   lengthZmemviewrG   Ztotal_writtenrY   r   zdictZworker_indexr$   r$   r%   r(     s4   


z_ThreadedGzipWriter.writec                 C   s>   |    | jD ]}|  q| jD ]}|  q| j  d S rf   )rQ   r   rh   r   r7   flush)rH   Zin_qZout_qr$   r$   r%   r   7  s   



z_ThreadedGzipWriter.flushc                 C   s   | j rd S |   |   | jr| j  d| _ | j| jtjddd t	
d| j| jd@ }| j| | j  | jrD| j  d| _ d S )NTrv   i)wbitsz<IIl    )rE   r   r   r>   r7   ri   r(   r   compressr   r   r   r   r0   )rH   trailerr$   r$   r%   ri   A  s   



z_ThreadedGzipWriter.closec                 C   rj   rf   rk   rc   r$   r$   r%   rl   S  rm   z_ThreadedGzipWriter.closedr   c              
   C   s   | j | }| j| }| j| }	 z
|jdd\}}W n tjy*   | js(Y d S Y qw z
|||\}}W n tyR }	 z|	  | 
|	| W Y d }	~	d S d }	~	ww t|}
||||
f |	  q)NTrS   rT   )r   r   r   r\   r;   r]   rF   compress_and_crcrV   	task_done_set_error_and_empty_queuer   rW   )rH   r   in_queueZ	out_queue
compressorrY   r   
compressedcrcrZ   data_lengthr$   r$   r%   r   W  s.   


z_ThreadedGzipWriter._compressc                 C   s   d}| j }| j}d}d}	 || j }|| }z|jdd\}}	}
W n tjy7   | js5|| _|| _Y d S Y qw t	
||	|
}||
7 }|| |  |d7 }q)Nr   TrS   rT   r   )r   r7   r
   r\   r;   r]   rF   r   r   r   crc32_combiner(   r   )rH   r   r   fp	total_crcsizeZ	out_indexZoutput_queuer   r   r   r$   r$   r%   r   l  s.   

z_ThreadedGzipWriter._writec              
   C   s   | j dks	td| j}d}d}| jd }| jd }	 z
|jdd\}}W n tjy;   | js9|| _	|| _
Y d S Y qw z
|||\}}	W n tyc }
 z|  | |
| W Y d }
~
d S d }
~
ww t|}t||	|}||7 }|| |  q)Nr   z)Compress_and_write is for one thread onlyr   TrS   rT   )r
   SystemErrorr7   r   r   r\   r;   r]   rF   r   r   r   rV   r   r   r   r   r   r(   )rH   r   r   r   r   r   rY   r   r   r   rZ   r   r$   r$   r%   r     s>   



z'_ThreadedGzipWriter._compress_and_writec              	   C   sh   | j ' || _d| _	 z|jdd}|  W n tjy(   Y W d    d S w q1 s-w   Y  d S )NFTrS   rT   )r   r>   rF   r\   r   r;   r]   )rH   errorqrz   r$   r$   r%   r     s   z._ThreadedGzipWriter._set_error_and_empty_queuec                 C   ra   rb   r$   rc   r$   r$   r%   writable  re   z_ThreadedGzipWriter.writablerf   rn   )ro   rp   rq   __doc__r   ZISAL_DEFAULT_COMPRESSIONr*   rs   rJ   rQ   r   rG   r   r(   r   ri   rt   rr   rl   r   r   r   r   r   r$   r$   r$   r%   r      s@    "

7

r   )r-   r   r   r   r;   r   rA   typingr   r   r    r   r   r   _COMPRESS_LEVEL_TRADEOFFr   r1   	RawIOBaser   r   r$   r$   r$   r%   <module>   s&   8O