o
    Nrfk+                     @  s&  d dl mZ d dlZd dlZd dlmZ d dlmZ d dlZ	d dl
mZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ d dlmZ d dlmZmZ d dlmZ 									d"ddZdd Z e!deddddddddde	j"ddfddZ"	d#ddZ#dd Z$d d! Z%dS )$    )annotationsN)partial)zip_longest)
open_files)compute)
read_bytes)flatten)PANDAS_GE_200PANDAS_VERSION)dataframe_creation_dispatch)from_delayed)insert_meta_param_description	make_meta)delayedrecordsTutf-8strictc                   s   |du r|dk}|dkr|rt d| d< |o|dk d< t|df|||
| j|d|p-i } fdd	t||  D }|rP|	du rGt }	tt|i |	S |S )
a  Write dataframe into JSON text files

    This utilises ``pandas.DataFrame.to_json()``, and most parameters are
    passed through - see its docstring.

    Differences: orient is 'records' by default, with lines=True; this
    produces the kind of JSON output that is most common in big-data
    applications, and which can be chunked when reading (see ``read_json()``).

    Parameters
    ----------
    df: dask.DataFrame
        Data to save
    url_path: str, list of str
        Location to write to. If a string, and there are more than one
        partitions in df, should include a glob character to expand into a
        set of file names, or provide a ``name_function=`` parameter.
        Supports protocol specifications such as ``"s3://"``.
    encoding, errors:
        The text encoding to implement, e.g., "utf-8" and how to respond
        to errors in the conversion (see ``str.encode()``).
    orient, lines, kwargs
        passed to pandas; if not specified, lines=True when orient='records',
        False otherwise.
    storage_options: dict
        Passed to backend file-system implementation
    compute: bool
        If true, immediately executes. If False, returns a set of delayed
        objects, which can be computed at a later time.
    compute_kwargs : dict, optional
        Options to be passed in to the compute method
    compression : string or None
        String like 'gzip' or 'xz'.
    name_function : callable, default None
        Function accepting an integer (partition index) and producing a
        string to replace the asterisk in the given filename globstring.
        Should preserve the lexicographic order of partitions.
    Nr   <Line-delimited JSON is only available with orient="records".orientlineswt)encodingerrorsname_functionnumcompressionc                   s    g | ]\}}t t|| qS  )r   write_json_partition).0outfiledkwargsr   _/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/dask/dataframe/io/json.py
<listcomp>Y   s    zto_json.<locals>.<listcomp>)
ValueErrorr   ZnpartitionszipZ
to_delayeddictlistdask_compute)dfurl_pathr   r   storage_optionsr   r   r   r   Zcompute_kwargsr   r"   Zoutfilespartsr   r!   r#   to_json   s4   4

r.   c                 C  sD   |}| j |fi | W d    n1 sw   Y  tj|jS N)r.   ospathnormpath)r*   Zopenfiler"   fr   r   r#   r   e   s   r   pandasi   ZinferFc              	     s  du rdkdkrrt d|rdksst d|p!i }du r(ddu r0dd ttrHtsAt d	tt d
ttjd|rt| df|||d|}r|\}}}|d }t	fdd|D 	t
fddt||D }n
|\}}d}d}d	t
|}du rt| |	t 	fddt||D }n(t| df |d|}t	fdd|D 		fdd|D }t|dS )a  Create a dataframe from a set of JSON files

    This utilises ``pandas.read_json()``, and most parameters are
    passed through - see its docstring.

    Differences: orient is 'records' by default, with lines=True; this
    is appropriate for line-delimited "JSON-lines" data, the kind of JSON output
    that is most common in big-data scenarios, and which can be chunked when
    reading (see ``read_json()``). All other options require blocksize=None,
    i.e., one partition per input file.

    Parameters
    ----------
    url_path: str, list of str
        Location to read from. If a string, can include a glob character to
        find a set of file names.
        Supports protocol specifications such as ``"s3://"``.
    encoding, errors:
        The text encoding to implement, e.g., "utf-8" and how to respond
        to errors in the conversion (see ``str.encode()``).
    orient, lines, kwargs
        passed to pandas; if not specified, lines=True when orient='records',
        False otherwise.
    storage_options: dict
        Passed to backend file-system implementation
    blocksize: None or int
        If None, files are not blocked, and you get one partition per input
        file. If int, which can only be used for line-delimited JSON files,
        each partition will be approximately this size in bytes, to the nearest
        newline character.
    sample: int
        Number of bytes to pre-load, to provide an empty dataframe structure
        to any blocks without data. Only relevant when using blocksize.
    encoding, errors:
        Text conversion, ``see bytes.decode()``
    compression : string or None
        String like 'gzip' or 'xz'.
    engine : callable or str, default ``pd.read_json``
        The underlying function that dask will use to read JSON files. By
        default, this will be the pandas JSON reader (``pd.read_json``).
        If a string is specified, this value will be passed under the ``engine``
        key-word argument to ``pd.read_json`` (only supported for pandas>=2.0).
    include_path_column : bool or str, optional
        Include a column with the file path where each row in the dataframe
        originated. If ``True``, a new column is added to the dataframe called
        ``path``. If ``str``, sets new column name. Default is ``False``.
    path_converter : function or None, optional
        A function that takes one argument and returns a string. Used to convert
        paths in the ``path`` column, for instance, to strip a common prefix from
        all the paths.
    $META

    Returns
    -------
    dask.DataFrame

    Examples
    --------
    Load single file

    >>> dd.read_json('myfile.1.json')  # doctest: +SKIP

    Load multiple files

    >>> dd.read_json('myfile.*.json')  # doctest: +SKIP

    >>> dd.read_json(['myfile.1.json', 'myfile.2.json'])  # doctest: +SKIP

    Load large line-delimited JSON files using partitions of approx
    256MB size

    >> dd.read_json('data/file*.csv', blocksize=2**28)
    Nr   r   zSJSON file chunking only allowed for JSON-linesinput (orient='records', lines=True).Tr1   c                 S  s   | S r/   r   )xr   r   r#   <lambda>   s    zread_json.<locals>.<lambda>zYPandas>=2.0 is required to pass a string to the `engine` argument of `read_json` (pandas=z is currently installed).)engine   
)	blocksizesampler   Zinclude_pathr   c                 3  s    | ]} |V  qd S r/   r   )r   ppath_converterr   r#   	<genexpr>   s    zread_json.<locals>.<genexpr>c                 3  s&    | ]\}} |gt | V  qd S r/   )len)r   r;   chunkr<   r   r#   r>      s    
r/   c                   s.   g | ]\}}t t| |d 	qS )meta)r   read_json_chunk)r   r@   r1   )r   r7   r   include_path_columnr"   rB   
path_dtyper   r#   r$     s    zread_json.<locals>.<listcomp>rt)r   r   r   c                 3  s    | ]} |j V  qd S r/   )r1   r   r3   r<   r   r#   r>     s    c                   s,   g | ]}t t| |jqS r   )r   read_json_filer1   rG   )r7   rD   r"   r   r   r=   rE   r   r#   r$     s    rA   )r%   
isinstancestrr	   r
   r   pd	read_jsonr   ZCategoricalDtyper   r&   rC   r   r   r   r   )r+   r   r   r,   r9   r:   r   r   r   rB   r7   rD   r=   r"   Zb_outfirstchunkspathsZ
first_pathZ
flat_pathsZflat_chunksr-   filesr   )
r   r7   r   rD   r"   r   rB   r   r=   rE   r#   rL   k   s   [
	


rL   c	                 C  sZ   t | ||}	|	d ||	fddd|}
|d ur"|
jr"|S |r+t|
|||}
|
S )Nr   r   Tr   r   )ioStringIOdecodeseekemptyadd_path_column)r@   r   r   r7   column_namer1   rE   r"   rB   sr*   r   r   r#   rC   -  s   
rC   c           
      C  sP   | }||f||d|}	W d    n1 sw   Y  |r&t |	|||}	|	S )NrQ   )rW   )
r3   r   r   r7   rX   r1   rE   r"   	open_filer*   r   r   r#   rH   <  s   rH   c                 C  sB   || j v rtd| d| jdi |tj|gt|  |diS )Nz(Files already contain the column name: 'z^', so the path column cannot use this name. Please set `include_path_column` to a unique name.)dtyper   )columnsr%   ZassignrK   ZSeriesr?   )r*   rX   r1   r[   r   r   r#   rW   D  s
   

(rW   )	r   NNTr   r   NNNr/   )&
__future__r   rR   r0   	functoolsr   	itertoolsr   r4   rK   Zfsspec.corer   Z	dask.baser   r)   Z
dask.bytesr   Z	dask.corer   Zdask.dataframe._compatr	   r
   Zdask.dataframe.backendsr   Zdask.dataframe.io.ior   Zdask.dataframe.utilsr   r   Zdask.delayedr   r.   r   Zregister_inplacerL   rC   rH   rW   r   r   r   r#   <module>   s\    
P B
