o
    Nrf#                     @   sL   d dl Zd dlZd dlZd dlZd dlmZ	 G dd dZ
G dd dZdS )    Nc                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!S )"ParallelDatac                 C   s
   || _ d S Ndata)selfr    r   V/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/clodius/fpark.py__init__	      
zParallelData.__init__c                    s   t  fdd| jD S )Nc                    s   g | ]} |qS r   r   ).0dfuncr   r   
<listcomp>   s    z$ParallelData.map.<locals>.<listcomp>)r   r   r   r   r   r   r   map   s   zParallelData.mapc                 C   s
   t | jS r   )lenr   r   r   r   r   count   r
   zParallelData.countc                 C   s   | j |j  | _ | S r   r   )r   Z
other_datar   r   r   union   s   zParallelData.unionc                 C   s   dS )N   r   r   r   r   r   getNumPartitions      zParallelData.getNumPartitionsc                 C   s.   t t}| jD ]}||d   d7  < q|S )zE
        Count the number of elements with a particular key.
        r   r   )coldefaultdictintr   )r   countsr   r   r   r   
countByKey   s   

zParallelData.countByKeyc                 C   s   | S r   r   )r   numr   r   r   coalesce$   r   zParallelData.coalescec                 C   s8   t t}| jD ]}||d  |d  qt| S )zh
        When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
        r   r   )r   r   listr   appendr   items)r   bucketsr   r   r   r   
groupByKey'   s   

zParallelData.groupByKeyc                 C   s    t || j}tttj|S )zd
        Flatten a list of results mapped to the data
        and return as one large list.
        )r   r   r   r    itchainfrom_iterable)r   r   resultr   r   r   flatMap2   s   zParallelData.flatMapc                 C   s   | j D ]}|| qd S r   r   )r   r   r   r   r   r   foreach;   s   

zParallelData.foreachc                 C   s   || j  d S r   r   r   r   r   r   foreachPartition?      zParallelData.foreachPartitionc                 C   s\   t t}| jD ]}||d  |d  qt }|D ]}t||| ||< qt|	 S )aV  
        When called on a dataset of (K, V) pairs, returns a dataset of (K, V)
        pairs where the values for each key are aggregated using the given
        reduce function func, which must be of type (V,V) => V. Like in
        groupByKey, the number of reduce tasks is configurable through an
        optional second argument.
        r   r   )
r   r   r    r   r!   dictftreducer   r"   )r   r   r#   r   reduced_bucketskeyr   r   r   reduceByKeyB   s   

zParallelData.reduceByKeyc           
      C   sp   t t}| jD ]}||d  |d  qt }|D ]}| }|| D ]}	|||	}q%|||< qt| S )Nr   r   )	r   r   r    r   r!   r-   copyr   r"   )
r   Z	start_valZseq_funcZ	comb_funcr#   r   r0   r1   Zcomb_valvalr   r   r   aggregateByKeyT   s   


zParallelData.aggregateByKeyc                 C   s   t || jS r   )r.   r/   r   r   r   r   r   r/   b   r,   zParallelData.reducec                 C   s   | j S r   r   r   r   r   r   collecte   s   zParallelData.collectc                 C   s   | j d | S r   r   )r   nr   r   r   takeh   r,   zParallelData.takeN)__name__
__module____qualname__r	   r   r   r   r   r   r   r$   r)   r*   r+   r2   r5   r/   r6   r8   r   r   r   r   r      s"    	r   c                   @   s<   e Zd ZdZdd Zedd Zedd Zedd	 Zd
S )FakeSparkContextz6
    Emulate a SparkContext for local processing.
    c                 C   s   d S r   r   r   r   r   r   r	   q   r   zFakeSparkContext.__init__c                 C   s   t | S r   )r   r   r   r   r   parallelizet   s   zFakeSparkContext.parallelizec                 C   sH   t | d}tttdd | W  d   S 1 sw   Y  dS )z
        Load a single file as a ParallelData set.

        :param filename: A file to be loaded line by line
        @return: A ParallelData object wrapping the lines in the file.
        rc                 S   s   |   S r   )strip)xr   r   r   <lambda>   s    z1FakeSparkContext.singleTextFile.<locals>.<lambda>N)openr   r    r   	readlines)filenamefr   r   r   singleTextFilex   s   $zFakeSparkContext.singleTextFilec                 C   sN   t | r"tt | d}tg }|D ]
} |t| }q|S t| S )a{  
        Load a filename as a text file. Filename can be either a single file
        or a directory containing a multitude of 'part-*' files.

        :param filename: The name of the file (or directory) containing the data which
                         we want to load one by one
        :return: A ParallelData object containing all of the lines of the file or files
        zpart-*)opisdirglobjoinr<   r=   r   rF   )rD   Zparts_filespr   r   r   textFile   s   



zFakeSparkContext.textFileN)	r9   r:   r;   __doc__r	   staticmethodr=   rF   rL   r   r   r   r   r<   l   s    


r<   )collectionsr   	functoolsr.   rI   	itertoolsr%   Zos.pathpathrG   r   r<   r   r   r   r   <module>   s    d