o
    Nrfm                  	   @  s   d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZmZ ed
dZddeddd
dddf	ddZdddZdd Zdd ZdS )    )annotationsN)partial)
open_files)concat)from_delayed)
read_bytes)delayed)parse_bytessystem_encodingT)ZpureZinferstrictFc
                   s  |dur|durt dt|trt|}|du rndv r"}
dnd}
t| fd ||
d|p1i }|du rCfdd|D }nqg }td	t||D ]}||||  }tttt	t
td
|}|| qMnFt| fdury nd|d|d|pi }|d  fddtD }rttfddt|d D }dd t||D }|st d| |rt|}|S )a\	  Read lines from text files

    Parameters
    ----------
    urlpath : string or list
        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
        to read from alternative filesystems. To read from multiple files you
        can pass a globstring or a list of paths, with the caveat that they
        must all have the same protocol.
    blocksize: None, int, or str
        Size (in bytes) to cut up larger files.  Streams by default.
        Can be ``None`` for streaming, an integer number of bytes, or a string
        like "128MiB"
    compression: string
        Compression format like 'gzip' or 'xz'.  Defaults to 'infer'
    encoding: string
    errors: string
    linedelimiter: string or None
    collection: bool, optional
        Return dask.bag if True, or list of delayed values if false
    storage_options: dict
        Extra options that make sense to a particular storage connection, e.g.
        host, port, username, password, etc.
    files_per_partition: None or int
        If set, group input files into partitions of the requested size,
        instead of one partition per file. Mutually exclusive with blocksize.
    include_path: bool
        Whether or not to include the path in the bag.
        If true, elements are tuples of (line, path).
        Default is False.

    Examples
    --------
    >>> b = read_text('myfiles.1.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt.gz')  # doctest: +SKIP
    >>> b = read_text('s3://bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('s3://key:secret@bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')  # doctest: +SKIP

    Parallelize a large file by providing the number of uncompressed bytes to
    load into each partition.

    >>> b = read_text('largefile.txt', blocksize='10MB')  # doctest: +SKIP

    Get file paths of the bag by setting include_path=True

    >>> b = read_text('myfiles.*.txt', include_path=True) # doctest: +SKIP
    >>> b.take(1) # doctest: +SKIP
    (('first line of the first file', '/home/dask/myfiles.0.txt'),)

    Returns
    -------
    dask.bag.Bag or list
        dask.bag.Bag if collection is True or list of Delayed lists otherwise.

    See Also
    --------
    from_sequence: Build bag from Python sequence
    Nz7Only one of blocksize or files_per_partition can be setN 
z
r   rt)modeencodingerrorscompressionnewlinec              	     s*   g | ]}t tt tt d |qS )	delimiter)r   listr   file_to_blocks).0Zfil)include_pathlinedelimiter V/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/dask/bag/text.py
<listcomp>n   s    zread_text.<locals>.<listcomp>r   r      
F)r   	blocksizesampler   r      c                   s   g | ]}t t| qS r   )r   decode)r   b)r   r   r   r   r   r      s    c                   s"   g | ]\}}|gt  |  qS r   )len)r   ipath)
raw_blocksr   r   r      s   "    c                 S  s   g | ]\}}t t||qS r   )r   attach_path)r   entryr(   r   r   r   r      s    zNo files found)
ValueError
isinstancestrr	   r   ranger&   r   r   mapr   r   appendr   encoder   	enumeratezipr   )Zurlpathr!   r   r   r   r   Z
collectionZstorage_optionsZfiles_per_partitionr   r   filesblocksstartZblock_filesZblock_linesopathsr   )r   r   r   r   r)   r   	read_text   s|   H
			
r;   c                 #  s    W} d ur;|  }|sg W  d    S | }fdd fdd|d d D |dd   D E d H  n|D ]}rF|jfn|V  q=W d    d S W d    d S 1 s]w   Y  d S )Nc                 3  s"    | ]} r|j fn|V  qd S N)r(   r   line)r   	lazy_filer   r   	<genexpr>   s
    
z!file_to_blocks.<locals>.<genexpr>c                      g | ]}|  qS r   r   r=   r   r   r   r          z"file_to_blocks.<locals>.<listcomp>)readsplitr(   )r   r?   r   ftextpartsr>   r   )r   r   r?   r   r      s"   
$"r   c                 c  s    | D ]}||fV  qd S r<   r   )blockr(   pr   r   r   r+      s   r+   c                   sz   |  ||} dv rtj| d}t|S |sg S | } fdd|d d D | s8|dd   }|S g  }|S )Nr   )r   c                   rA   r   r   )r   tline_delimiterr   r   r      rB   zdecode.<locals>.<listcomp>rC   )r$   ioStringIOr   rE   endswith)rI   r   r   rM   rG   linesrH   outr   rL   r   r$      s   
r$   r<   )
__future__r   rN   	functoolsr   Zfsspec.corer   Ztlzr   Zdask.bag.corer   Z
dask.bytesr   Zdask.delayedr   Z
dask.utilsr	   r
   r;   r   r+   r$   r   r   r   r   <module>   s0    

 
