o
    Df"~                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZm	Z	m
Z
mZmZmZ d dlmZmZ d dlmZmZmZ d dlmZ d dlZd dlmZ d dlZd dlZddlmZmZ dd	lmZm Z m!Z!m"Z" dd
l#m$Z$ ddl%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6 e7 Z8G dd dZ9G dd dZ:G dd dZ;G dd deZ<G dd de<Z=G dd de<Z>G dd deZ?G dd deZ@G dd dZAG dd deZBG d d! d!eBZCG d"d# d#eBZDdS )$    N)ListOptionalBinaryIOTextIOAnyTupleDict)ABCabstractmethod)ProcessPipeQueue)Path)
Connection   )Progress
FileOpener)SingleEndModifierPairedEndModifierPairedEndModifierWrapperModificationInfo)
Statistics)
RedirectorPairedRedirectorNoFilterPairedNoFilterInfoFileWriterRestFileWriterWildcardFileWriterTooShortReadFilterTooLongReadFilterNContentFilterMaximumExpectedErrorsFilterCasavaFilterDiscardTrimmedFilterDiscardUntrimmedFilterDemultiplexerPairedDemultiplexerCombinatorialDemultiplexerc                   @   s:   e Zd Zddedee defddZdd	 ZdddZdS )
InputFilesNFfile1file2interleavedc                 C      || _ || _|| _d S N)r*   r+   r,   )selfr*   r+   r,    r0   Z/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/cutadapt/pipeline.py__init__      
zInputFiles.__init__c                 C   s   t j| j| j| jddS )Nr)r+   r,   mode)dnaioopenr*   r+   r,   r/   r0   r0   r1   r7   #   s   zInputFiles.openreturnc                 C   &   | j   | jd ur| j  d S d S r.   )r*   closer+   r8   r0   r0   r1   r;   &      

zInputFiles.closeNFr9   N)	__name__
__module____qualname__r   r   boolr2   r7   r;   r0   r0   r0   r1   r)      s    r)   c                   @   s:   e Zd Zddedee defddZded	efd
dZ	dS )
InputPathsNFpath1path2r,   c                 C   r-   r.   )rD   rE   r,   )r/   rD   rE   r,   r0   r0   r1   r2   -   r3   zInputPaths.__init__file_openerr9   c                 C   s$   | | j| jd\}}t||| jS )Nrb)Z
xopen_pairrD   rE   r)   r,   )r/   rF   r*   r+   r0   r0   r1   r7   2   s   zInputPaths.openr=   )
r?   r@   rA   strr   rB   r2   r   r)   r7   r0   r0   r0   r1   rC   ,   s    rC   c                #   @   s  e Zd ZdZ																ddee dee dee dee dee dee d	ee d
ee dee dee dee deeeef  deeeef  deeeeef ef  deeeeef ef  dee	 f ddZ
dd ZdddZdddZdS )OutputFilesz
    The attributes are either None or open file-like objects except for demultiplex_out
    and demultiplex_out2, which are dictionaries that map an adapter name
    to file-like objects.
    Noutout2	untrimmed
untrimmed2	too_short
too_short2too_long	too_long2inforestwildcarddemultiplex_outdemultiplex_out2combinatorial_outcombinatorial_out2force_fastac                 C   sd   || _ || _|| _|| _|| _|| _|| _|| _|	| _|
| _	|| _
|| _|| _|| _|| _|| _d S r.   )rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   rU   rV   rW   rX   rY   )r/   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   rU   rV   rW   rX   rY   r0   r0   r1   r2   =   s    
zOutputFiles.__init__c                 c   s    | j | j| j| j| j| j| j| j| j| j	| j
fD ]	}|d ur"|V  q| j| j| j| jfD ]}|d urC| D ]}|d us?J |V  q7q-d S r.   )rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   rU   rV   rW   rX   values)r/   fZoutsr0   r0   r1   __iter__a   s2   zOutputFiles.__iter__r9   c                 C   s   t | jd}dD ]}t| |durt||t  qdD ]%}t| |durAt||t  t| | D ]\}}t t|||< q3q|S )zl
        Create a new OutputFiles instance that has BytesIO instances for each non-None output file
        rY   )rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   N)rU   rV   rW   rX   )rI   rY   getattrsetattrioBytesIOdictitems)r/   resultattrkvr0   r0   r1   
as_bytesioz   s   zOutputFiles.as_bytesioc                 C   s.   | D ]}|t ju s|t jju rq|  qdS )z*Close all output files that are not stdoutN)sysstdoutbufferr;   r/   r[   r0   r0   r1   r;      s
   
zOutputFiles.close)NNNNNNNNNNNNNNNN)r9   rI   r>   )r?   r@   rA   __doc__r   r   r   rH   r   rB   r2   r\   rh   r;   r0   r0   r0   r1   rI   7   sl    	

$
rI   c                
   @   s  e Zd ZdZdZdZdefddZdede	d	d
fddZ
e	
	
d)dedee dee fddZde	d	d
fddZd*ddZd*ddZd*ddZd*ddZed	efddZed+ded	eeeee f fdd Zed!d" Zed#d$ Zede	fd%d&Zede	fd'd(Zd
S ),PipelinezU
    Processing pipeline that loops over reads and applies modifiers and filters
    r   FrF   c                 C   sX   d | _ g | _d | _d | _d | _g | _d | _d | _d | _d | _	d| _
d| _d| _|| _d S r=   )_reader_filters_infiles	_outfiles_demultiplexer_textiowrappers_minimum_length_maximum_lengthmax_nmax_expected_errorsdiscard_casavadiscard_trimmeddiscard_untrimmedrF   r/   rF   r0   r0   r1   r2      s   
zPipeline.__init__infilesoutfilesr9   Nc                 C   s   || _ | | _| | d S r.   )rq   r7   ro   _set_output)r/   r}   r~   r0   r0   r1   
connect_io   s   
zPipeline.connect_iofiler+   rY   c                 C      d S r.   r0   r/   r   r+   rY   r0   r0   r1   _open_writer   s   zPipeline._open_writerc                 C   s  g | _ || _|  }t|jft|jft|jffD ]\}}|r5t	
|}| j| | j |d ||d  q| j|j|jtf| j|j|jtffD ]C\}}}}|d u rSqH|r[| ||nd }	|d d uri||d nd }
t|dkr~|d d ur~||d }nd }| j ||	|
|d qH| jd urt| j }
}| j |d |
| | jd ur| jjstd nt| j }
}| j |d |
| | jrt   }
}| j |d |
| t!| j"t!| j# t!|j$d u dkrt%d|j&d us|j'd ur| (|| _)| j | j) nG| * }| j"r| j |d t+ t+  n)| j#r*| j |d t, t,  n|j$rB| |j$|j-}| j ||t, t,  | j | .| t/d| j  d S )Nr      r   )filterZfilter2zAIgnoring option --max-ee as input does not contain quality valueszXdiscard_trimmed, discard_untrimmed and outfiles.untrimmed must not be set simultaneouslyzFilters: %s)0rp   rr   _filter_wrapperr   rS   r   rR   r   rT   r`   TextIOWrapperrt   appendru   rN   rO   r   rv   rP   rQ   r    r   lenrw   r!   rx   ro   delivers_qualitiesloggerwarningr"   ry   r#   intrz   r{   rL   
ValueErrorrU   rW   _create_demultiplexerrs   _untrimmed_filter_wrapperr$   r%   rM   _final_filterdebug)r/   r~   Zfilter_wrapperZfilter_classoutfileZtextiowrapperlengthsr*   r+   writerf1f2Zuntrimmed_filter_wrapperZuntrimmed_writerr0   r0   r1   r      sn   



&zPipeline._set_outputc                 C   s:   | j D ]}|  q| jd usJ | jD ]}|  qd S r.   )rt   flushrr   rl   r0   r0   r1   r     s   



zPipeline.flushc                 C   s   |    |   d S r.   )_close_input_close_outputr8   r0   r0   r1   r;   
  s   zPipeline.closec                 C   r:   r.   )ro   r;   rq   r8   r0   r0   r1   r     r<   zPipeline._close_inputc                 C   s0   | j D ]}|  q| jd usJ | j  d S r.   )rt   r;   rr   rl   r0   r0   r1   r     s   

zPipeline._close_outputc                 C   s   | j d usJ | j jS r.   )ro   r   r8   r0   r0   r1   uses_qualities  s   zPipeline.uses_qualitiesprogressc                 C   r   r.   r0   )r/   r   r0   r0   r1   process_reads      zPipeline.process_readsc                 C   r   r.   r0   r8   r0   r0   r1   r   $  r   zPipeline._filter_wrapperc                 C   r   r.   r0   r8   r0   r0   r1   r   (  r   z"Pipeline._untrimmed_filter_wrapperc                 C   r   r.   r0   r/   r~   r0   r0   r1   r   ,  r   zPipeline._final_filterc                 C   r   r.   r0   r   r0   r0   r1   r   0  r   zPipeline._create_demultiplexerNNr>   r.   )r?   r@   rA   rm   Z
n_adapterspairedr   r2   r)   rI   r   r
   r   r   rB   r   r   r   r;   r   r   propertyr   r   r   r   r   r   r   r   r   r0   r0   r0   r1   rn      sB    
H


$

rn   c                	       s   e Zd ZdZdef fddZdefddZd!d	ed
e	e
e
ee
 f fddZ		d"dedee dee fddZdd Zdd ZdefddZdefddZedd Zejdd Zedd Zejd d Z  ZS )#SingleEndPipelinez2
    Processing pipeline for single-end reads
    rF   c                    s   t  | g | _d S r.   )superr2   
_modifiersr|   	__class__r0   r1   r2   9  s   
zSingleEndPipeline.__init__modifierc                 C   s    |d u rt d| j| d S )NzModifier must not be None)r   r   r   r/   r   r0   r0   r1   add=  s   zSingleEndPipeline.addNr   r9   c                 C   s   d}d}| j D ]5}|d7 }|d dkr|r|| |t|7 }t|}| jD ]}|||}q'| jD ]	}|||r; nq2q||dfS )z#Run the pipeline. Return statisticsr   r   '  Nro   updater   r   r   rp   )r/   r   nZtotal_bpreadrR   r   filter_r0   r0   r1   r   B  s    





zSingleEndPipeline.process_readsr   r+   rY   c                 C   sB   |d u sJ t |tttfrJ | jj|d| j|rddS d dS )Nwfasta)r5   	qualities
fileformat
isinstancerH   bytesr   rF   Zdnaio_open_raise_limitr   r   r0   r0   r1   r   S  s   zSingleEndPipeline._open_writerc                 C      t S r.   r   r8   r0   r0   r1   r   ^     z!SingleEndPipeline._filter_wrapperc                 C   r   r.   r   r8   r0   r0   r1   r   a  r   z+SingleEndPipeline._untrimmed_filter_wrapperr~   c                 C   s2   |j d u r
|jd usJ | j|j|jd}t|S Nr]   )rK   rJ   r   rY   r   r/   r~   r   r0   r0   r1   r   d  s   zSingleEndPipeline._final_filterc                 C   sd   t  }|jd ur| j|j|jd|d < |jd usJ |j D ]\}}| j||jd||< qt|S r   )rb   rL   r   rY   rU   rc   r&   )r/   r~   writersnamer   r0   r0   r1   r   i  s   
z'SingleEndPipeline._create_demultiplexerc                 C      | j S r.   ru   r8   r0   r0   r1   minimum_lengthr     z SingleEndPipeline.minimum_lengthc                 C   "   |d u st |dksJ || _d S Nr   r   ru   r/   valuer0   r0   r1   r   v     
c                 C   r   r.   rv   r8   r0   r0   r1   maximum_length{  r   z SingleEndPipeline.maximum_lengthc                 C   r   r   r   rv   r   r0   r0   r1   r     r   r.   r   )r?   r@   rA   rm   r   r2   r   r   r   r   r   r   r   r   rB   r   r   r   rI   r   r   r   r   setterr   __classcell__r0   r0   r   r1   r   5  s4    "
	


r   c                	       s  e Zd ZdZdZdef fddZdee dee dd	fd
dZ	dedd	fddZ
dedd	fddZd'dedeeeee f fddZ				d(dedee dee fddZd'ddZdd Zdd Zdd  Zed!d" Zejd#d" Zed$d% Zejd&d% Z  ZS ))PairedEndPipelinez3
    Processing pipeline for paired-end reads.
    TrF   c                    s(   t  | g | _|| _d | _d| _d S r=   )r   r2   r   _pair_filter_modero   override_untrimmed_pair_filter)r/   pair_filter_moderF   r   r0   r1   r2     s
   
zPairedEndPipeline.__init__	modifier1	modifier2r9   Nc                 C   s.   |du r|du rt d| jt|| dS )z
        Add a modifier for R1 and R2. One of them can be None, in which case the modifier
        will only be added for the respective read.
        NzNot both modifiers can be None)r   r   r   r   )r/   r   r   r0   r0   r1   r     s   zPairedEndPipeline.addr   c                 C   s(   |dusJ | j t|t| dS )z5
        Add one modifier for both R1 and R2
        N)r   r   r   copyr   r0   r0   r1   add_both  s   zPairedEndPipeline.add_bothc                 C   s   | j | dS )zBAdd a Modifier (without wrapping it in a PairedEndModifierWrapper)N)r   r   r   r0   r0   r1   add_paired_modifier  s   z%PairedEndPipeline.add_paired_modifierr   c                 C   s   d}d}d}| j d usJ | j D ]G\}}|d7 }|d dkr%|r%|| |t|7 }|t|7 }t|}t|}| jD ]}	|	||||\}}q<| jD ]}
|
||||rV nqKq|||fS )Nr   r   r   r   )r/   r   r   	total1_bp	total2_bpread1Zread2Zinfo1Zinfo2r   r   r0   r0   r1   r     s(   



zPairedEndPipeline.process_readsr   r+   rY   c                 C   sH   ||fD ]}t |tttfrJ q| jj||d| j|rdnd |d u dS )Nr   r   )r+   r5   r   r   r,   r   )r/   r   r+   rY   r[   r0   r0   r1   r     s   
zPairedEndPipeline._open_writerc                 C   s   |d u r| j }tjt|dS )Nr   )r   	functoolspartialr   )r/   r   r0   r0   r1   r     s   z!PairedEndPipeline._filter_wrapperc                 C   s   | j r	| jddS |  S )z
        Return a different filter wrapper when adapters were given only for R1
        or only for R2 (then override_untrimmed_pair_filter will be set)
        Zbothr   )r   r   r8   r0   r0   r1   r     s   z+PairedEndPipeline._untrimmed_filter_wrapperc                 C   s   | j |j|j|jd}t|S r   )r   rJ   rK   rY   r   r   r0   r0   r1   r     s   zPairedEndPipeline._final_filterc                    s    fdd} j d ur3 jd u r jd u sJ t } j  D ]\}}|| j| ||< q t|S t } jd urD| j j|d <  j D ]\}}|| j| ||< qIt	|S )Nc                    s   j | | jdS r   )r   rY   )r   r+   r~   r/   r0   r1   open_writer  s   z<PairedEndPipeline._create_demultiplexer.<locals>.open_writer)
rW   rL   rM   rb   rc   rX   r(   rU   rV   r'   )r/   r~   r   r   keyrJ   r   r   r0   r   r1   r     s   

z'PairedEndPipeline._create_demultiplexerc                 C   r   r.   r   r8   r0   r0   r1   r     r   z PairedEndPipeline.minimum_lengthc                 C   r   Nr   r   r   r0   r0   r1   r     r   c                 C   r   r.   r   r8   r0   r0   r1   r     r   z PairedEndPipeline.maximum_lengthc                 C   r   r   r   r   r0   r0   r1   r     r   r.   r   )r?   r@   rA   rm   r   r   r2   r   r   r   r   r   r   r   r   r   r   r   rB   r   r   r   r   r   r   r   r   r   r   r0   r0   r   r1   r     s:    	"





r   c                       sD   e Zd ZdZdedee def fddZdd Zdd
dZ	  Z
S )ReaderProcessa>  
    Read chunks of FASTA or FASTQ data (single-end or paired) and send to a worker.

    The reader repeatedly

    - reads a chunk from the file(s)
    - reads a worker index from the Queue
    - sends the chunk to connections[index]

    and finally sends the stop token -1 ("poison pills") to all connections.
    pathrE   openerc                    s8   t    || _|| _|| _|| _|| _|| _|| _dS )z
        queue -- a Queue of worker indices. A worker writes its own index into this
            queue to notify the reader that it is ready to receive more data.
        connections -- a list of Connection objects, one for each worker.
        N)	r   r2   r   rE   connectionsqueuebuffer_sizestdin_fd_opener)r/   r   rE   r   r   r   r   r   r   r0   r1   r2     s   

zReaderProcess.__init__c              
   C   sf  | j dkrtj  t| j t_zz| j| jdP}| j	rO| j| j	d }t
t||| jD ]\}\}}| ||| q1W d    n1 sIw   Y  nt
t|| jD ]
\}}| || qXW d    n1 smw   Y  tt| jD ]}| j }| j| d qyW d S  ty }	 z| jD ]}
|
d |
|	t f qW Y d }	~	d S d }	~	ww )NrG   )r   ri   stdinr;   osfdopenr   Zxopenr   rE   	enumerater6   Zread_paired_chunksr   send_to_workerread_chunksranger   r   r   getsend	Exception	traceback
format_exc)r/   r[   r   chunk_indexchunk1chunk2chunk_worker_indexe
connectionr0   r0   r1   run#  s:   




zReaderProcess.runNc                 C   sB   | j  }| j| }|| || |d ur|| d S d S r.   )r   r   r   r   
send_bytes)r/   r   r   r   r   r   r0   r0   r1   r   <  s   



zReaderProcess.send_to_workerr.   )r?   r@   rA   rm   rH   r   r   r2   r   r   r   r0   r0   r   r1   r     s
    r   c                       sn   e Zd ZdZdededededededed	e	f fd
dZ
dd ZdefddZdededefddZ  ZS )WorkerProcessa7  
    The worker repeatedly reads chunks of data from the read_pipe, runs the pipeline on it
    and sends the processed chunks to the write_pipe.

    To notify the reader process that it wants data, it puts its own identifier into the
    need_work_queue before attempting to read data from the read_pipe.
    id_pipelinetwo_input_filesinterleaved_inputorig_outfiles	read_pipe
write_pipeneed_work_queuec	           	         sB   t    || _|| _|| _|| _|| _|| _|| _|	 | _
d S r.   )r   r2   _id	_pipeline_two_input_files_interleaved_input
_read_pipe_write_pipe_need_work_queuerh   _original_outfiles)	r/   r  r  r  r  r  r  r	  r
  r   r0   r1   r2   M  s   
zWorkerProcess.__init__c              
   C   sP  zt  }	 | j| j | j }|dkrnH|dkr)| j \}}td| ||  }| j	
 }| j|| | j \}}}	| j  t  |||	g | jj}
||
7 }| ||| q| jj}t  dd| jjrmdnd |g }||7 }| jd | j| W d S  ty } z| jd | j|t f W Y d }~d S d }~ww )NTr   r   %sr   )r   r  putr  r  recvr   error_make_input_filesr  rh   r  r   r   r   collectrp   _send_outfilesr   r   r  r   r   r   r   )r/   statsr   r   tb_strr}   r~   r   Zbp1Zbp2	cur_statsmZmodifier_statsr0   r0   r1   r   d  s<   


 "zWorkerProcess.runr9   c                 C   sD   | j  }t|}| jr| j  }t|}nd }t||| jdS )N)r,   )r  
recv_bytesr`   ra   r  r)   r  )r/   datainputZinput2r0   r0   r1   r    s   


zWorkerProcess._make_input_filesr~   r   n_readsc                 C   sR   | j | | j | |D ]}|  t|tjsJ | }| j | qd S r.   )r  r   r   r   r`   ra   getvaluer  )r/   r~   r   r!  r[   Zprocessed_chunkr0   r0   r1   r    s   zWorkerProcess._send_outfiles)r?   r@   rA   rm   r   rn   rB   rI   r   r   r2   r   r)   r  r  r   r0   r0   r   r1   r  E  s,    	"r  c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	OrderedChunkWriterz
    We may receive chunks of processed data from worker processes
    in any order. This class writes them to an output file in
    the correct order.
    c                 C   s   t  | _d| _|| _d S )Nr   )rb   _chunks_current_index_outfile)r/   r   r0   r0   r1   r2     s   
zOrderedChunkWriter.__init__c                 C   sV   || j |< | j| j v r)| j| j | j  | j | j= |  jd7  _| j| j v sdS dS )z	
        r   N)r$  r%  r&  write)r/   r  indexr0   r0   r1   r'    s   

zOrderedChunkWriter.writec                 C   s   | j  S r.   )r$  r8   r0   r0   r1   wrote_everything  s   z#OrderedChunkWriter.wrote_everythingN)r?   r@   rA   rm   r2   r'  r)  r0   r0   r0   r1   r#    s
    	r#  c                   @   sJ   e Zd ZdZdedefddZedd Zedd	 Z	d
d Z
dd ZdS )PipelineRunnerz$
    A read processing pipeline
    r  r   c                 C   s   || _ || _d S r.   )r  	_progress)r/   r  r   r0   r0   r1   r2     s   
zPipelineRunner.__init__c                 C   r   r.   r0   r8   r0   r0   r1   r     r   zPipelineRunner.runc                 C   r   r.   r0   r8   r0   r0   r1   r;     r   zPipelineRunner.closec                 C   s   | S r.   r0   r8   r0   r0   r1   	__enter__  r   zPipelineRunner.__enter__c                 G   s   |    d S r.   )r;   )r/   argsr0   r0   r1   __exit__  s   zPipelineRunner.__exit__N)r?   r@   rA   rm   rn   r   r2   r
   r   r;   r,  r.  r0   r0   r0   r1   r*    s    

r*  c                       s   e Zd ZdZ	ddededededede	d	e	f fd
dZ
		ddedee deddfddZdeee ee f fddZdefddZdddZ  ZS )ParallelPipelineRunnera  
    Run a Pipeline in parallel

    - When connect_io() is called, a reader process is spawned.
    - When run() is called, as many worker processes as requested are spawned.
    - In the main process, results are written to the output files in the correct
      order, and statistics are aggregated.

    If a worker needs work, it puts its own index into a Queue() (_need_work_queue).
    The reader process listens on this queue and sends the raw data to the
    worker that has requested work. For sending the data from reader to worker,
    a Connection() is used. There is one such connection for each worker (self._pipes).

    For sending the processed data from the worker to the main process, there
    is a second set of connections, again one for each worker.

    When the reader is finished, it sends 'poison pills' to all workers.
    When a worker receives this, it sends a poison pill to the main process,
    followed by a Statistics object that contains statistics about all the reads
    processed by that worker.
      @ r  r}   r~   r   r   	n_workersr   c                    sF   t  || || _t | _|| _|| _|| _| |j	|j
|j d S r.   )r   r2   
_n_workersr   r  _buffer_sizerr   r   _assign_inputrD   rE   r,   )r/   r  r}   r~   r   r   r1  r   r   r0   r1   r2     s   
zParallelPipelineRunner.__init__NFrD   rE   r,   r9   c                 C   s   |d u| _ || _dd t| jD }t| \| _}ztj }W n t	j
y,   d}Y nw t||| j|| j| j|| _d| j_| j  d S )Nc                 S   s   g | ]}t d dqS )FZduplex)r   ).0r   r0   r0   r1   
<listcomp>  s    z8ParallelPipelineRunner._assign_input.<locals>.<listcomp>r   T)r  r  r   r2  zip_connectionsri   r   filenor`   UnsupportedOperationr   r   r  r3  _reader_processdaemonstart)r/   rD   rE   r,   r   Zconnwr:  r0   r0   r1   r4    s   

z$ParallelPipelineRunner._assign_inputc              
   C   sv   g }g }t | jD ]-}tdd\}}|| t|| j| j| j| j| j	| || j
}d|_|  || q	||fS )NFr5  T)r   r2  r   r   r  r  r  r  rr   r9  r  r=  r>  )r/   workersr   r(  Zconn_rZconn_wZworkerr0   r0   r1   _start_workers
  s   
z%ParallelPipelineRunner._start_workersc                 C   sf  |   \}}g }| jD ]	}|t| qt }d}|rtj|}|D ]k}| }	|	dkrN| }
|dkrD| \}}t	
d| |||
7 }|| q$|	dkr`| \}}t	
d| || }|dkrv| \}}t	
d| |||7 }| j| |D ]}| }|||	 qq$|s|D ]}| sJ q|D ]}|  q| j  | j| |S )Nr   r   r   r  )r@  rr   r   r#  r   multiprocessingr   waitr  r   r  remover+  r   r  r'  r)  joinr<  stop)r/   r?  r   r   r[   r  r   Zready_connectionsr   r   r  r   r  Zchunk_nr   r  r   r0   r0   r1   r     sT   

%

zParallelPipelineRunner.runc                 C      | j   d S r.   )rr   r;   r8   r0   r0   r1   r;   N     zParallelPipelineRunner.close)r0  r=   r>   )r?   r@   rA   rm   rn   rC   rI   r   r   r   r2   rH   r   rB   r4  r   r   r  r   r@  r   r   r;   r   r0   r0   r   r1   r/    s@    
4r/  c                       sJ   e Zd ZdZdedededef fddZde	fd	d
Z
dddZ  ZS )SerialPipelineRunnerz)
    Run a Pipeline on a single core
    r  r}   r~   r   c                    s    t  || | j|| d S r.   )r   r2   r  r   )r/   r  r}   r~   r   r   r0   r1   r2   W  s   zSerialPipelineRunner.__init__r9   c                 C   sZ   | j j| jd\}}}| jr| j| t| j dd }|d us!J t ||||| j jS )N)r   r   )r  r   r+  rE  r^   r   r  rp   )r/   r   r   r   	modifiersr0   r0   r1   r   a  s   zSerialPipelineRunner.runNc                 C   rF  r.   )r  r;   r8   r0   r0   r1   r;   j  rG  zSerialPipelineRunner.closer>   )r?   r@   rA   rm   rn   r)   rI   r   r2   r   r   r;   r   r0   r0   r   r1   rH  R  s    
	rH  )Er`   r   ri   r   loggingr   typingr   r   r   r   r   r   r   abcr	   r
   rA  r   r   r   pathlibr   Zmultiprocessing.connectionr   r   r6   utilsr   r   rI  r   r   r   r   reportr   filtersr   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   	getLoggerr   r)   rC   rI   rn   r   r   r   r  r#  r*  r/  rH  r0   r0   r0   r1   <module>   sD    $L] "P >W 