
    >ie                        d dl mZ d dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZmZ  G d d	          Zd
 Zd ZddZddZd Zd Zd Zd Z	 	 	 	 	 	 ddZd Zd Z ddZ!dS )    )annotationsN)expand_paths_if_neededget_fs_token_pathsstringify_path)AbstractFileSystem)config)_is_local_fs)natural_sort_keyparse_bytesc                      e Zd ZdZed             Ze	 	 	 	 	 	 dd            Zed             Ze	 dd            Ze	 	 	 	 dd            Z	ed	             Z
edd
            Zed             Zed             ZdS )Enginez8The API necessary to provide a new Parquet reader/writerc                    ||                     dd          nd|v rt          d          |dv rt          |d|          \  }}|||fS t          t                    st          d           |rt          d	|           t          |t
          t          t          f          r|st          d
          d |D             }nt          |          g}t          |ddd          }fd|D             ||fS )a  Extract filesystem object from urlpath or user arguments

        This classmethod should only be overridden for engines that need
        to handle filesystem implementations other than ``fsspec``
        (e.g. ``pyarrow.fs.S3FileSystem``).

        Parameters
        ----------
        urlpath: str or List[str]
            Source directory for data, or path(s) to individual parquet files.
        filesystem: "fsspec" or fsspec.AbstractFileSystem
            Filesystem backend to use. Default is "fsspec"
        dataset_options: dict
            Engine-specific dataset options.
        open_file_options: dict
            Options to be used for file-opening at read time.
        storage_options: dict
            Options to be passed on to the file-system backend.

        Returns
        -------
        fs: Any
            A global filesystem object to be used for metadata
            processing and file-opening by the engine.
        paths: List[str]
            List of data-source paths.
        dataset_options: dict
            Engine-specific dataset options.
        open_file_options: dict
            Options to be used for file-opening at read time.
        NfsfsspeczPCannot specify a filesystem argument if the 'fs' dataset option is also defined.)Nr   rb)modestorage_optionsz4Expected fsspec.AbstractFileSystem or 'fsspec'. Got zUCannot specify storage_options when an explicit filesystem object is specified. Got: zempty urlpath sequencec                ,    g | ]}t          |          S  )r   ).0us     ?lib/python3.11/site-packages/dask/dataframe/io/parquet/utils.py
<listcomp>z-Engine.extract_filesystem.<locals>.<listcomp>`   s     >>>>!,,>>>       c                :    g | ]}                     |          S r   )_strip_protocol)r   r   r   s     r   r   z-Engine.extract_filesystem.<locals>.<listcomp>g   s'    6661##A&&666r   )
pop
ValueErrorr   
isinstancer   listtuplesetr   r   )	clsurlpath
filesystemdataset_optionsopen_file_optionsr   _pathsr   s	           @r   extract_filesystemzEngine.extract_filesystem   s   T  $$T844BB&& ;   B!!!-dO  LB5 uo/@@@ b"455  O2OO     !N<KN N  
 'D%#566 4 ?$%=>>>>>g>>>)'223*7D!RFFE6666666!	 r   Nc	                    t                      )a  Gather metadata about a Parquet Dataset to prepare for a read

        This function is called once in the user's Python session to gather
        important metadata about the parquet dataset.

        Parameters
        ----------
        fs: FileSystem
        paths: List[str]
            A list of paths to files (or their equivalents)
        categories: list, dict or None
            Column(s) containing categorical data.
        index: str, List[str], or False
            The column name(s) to be used as the index.
            If set to ``None``, pandas metadata (if available) can be used
            to reset the value in this function
        use_nullable_dtypes: boolean
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        gather_statistics: bool
            Whether or not to gather statistics to calculate divisions
            for the output DataFrame collection.
        filters: list
            List of filters to apply, like ``[('x', '>', 0), ...]``.
        **kwargs: dict (of dicts)
            User-specified arguments to pass on to backend.
            Top level key can be used by engine to select appropriate dict.

        Returns
        -------
        meta: pandas.DataFrame
            An empty DataFrame object to use for metadata.
            Should have appropriate column names and dtypes but need not have
            any actual data
        statistics: Optional[List[Dict]]
            Either None, if no statistics were found, or a list of dictionaries
            of statistics data, one dict for every partition (see the next
            return value).  The statistics should look like the following:

            [
                {'num-rows': 1000, 'columns': [
                    {'name': 'id', 'min': 0, 'max': 100},
                    {'name': 'x', 'min': 0.0, 'max': 1.0},
                    ]},
                ...
            ]
        parts: List[object]
            A list of objects to be passed to ``Engine.read_partition``.
            Each object should represent a piece of data (usually a row-group).
            The type of each object can be anything, as long as the
            engine's read_partition function knows how to interpret it.
        NotImplementedError)
r$   r   r*   
categoriesindexuse_nullable_dtypesdtype_backendgather_statisticsfilterskwargss
             r   read_metadatazEngine.read_metadatal   s    B "###r   c                    dS )Nz256 MiBr   )r$   s    r   default_blocksizezEngine.default_blocksize   s    yr   Fc                    t                      )a  Read a single piece of a Parquet dataset into a Pandas DataFrame

        This function is called many times in individual tasks

        Parameters
        ----------
        fs: FileSystem
        piece: object
            This is some token that is returned by Engine.read_metadata.
            Typically it represents a row group in a Parquet dataset
        columns: List[str]
            List of column names to pull out of that row group
        index: str, List[str], or False
            The index name(s).
        use_nullable_dtypes: boolean
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        dtype_backend: {"numpy_nullable", "pyarrow"}
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        convert_string: boolean
            Whether to use pyarrow strings when reading parquet files.
        **kwargs:
            Includes `"kwargs"` values stored within the `parts` output
            of `engine.read_metadata`. May also include arguments to be
            passed to the backend (if stored under a top-level `"read"` key).

        Returns
        -------
        A Pandas DataFrame
        r-   )r$   r   piececolumnsr0   r1   r5   s          r   read_partitionzEngine.read_partition   s    F "###r   c                    t           )a#  Perform engine-specific initialization steps for this dataset

        Parameters
        ----------
        df: dask.dataframe.DataFrame
        fs: FileSystem
        path: str
            Destination directory for data.  Prepend with protocol like ``s3://``
            or ``hdfs://`` for remote data.
        append: bool
            If True, may use existing metadata (if any) and perform checks
            against the new data being stored.
        partition_on: List(str)
            Column(s) to use for dataset partitioning in parquet.
        ignore_divisions: bool
            Whether or not to ignore old divisions when appending.  Otherwise,
            overlapping divisions will lead to an error being raised.
        division_info: dict
            Dictionary containing the divisions and corresponding column name.
        **kwargs: dict
            Other keyword arguments (including `index_cols`)

        Returns
        -------
        tuple:
            engine-specific instance
            list of filenames, one per partition
        r-   )	r$   dfr   pathappendpartition_onignore_divisionsdivision_infor5   s	            r   initialize_writezEngine.initialize_write   s    P "!r   c                    t           )a
  
        Output a partition of a dask.DataFrame. This will correspond to
        one output file, unless partition_on is set, in which case, it will
        correspond to up to one file in each sub-directory.

        Parameters
        ----------
        df: dask.dataframe.DataFrame
        path: str
            Destination directory for data.  Prepend with protocol like ``s3://``
            or ``hdfs://`` for remote data.
        fs: FileSystem
        filename: str
        partition_on: List(str)
            Column(s) to use for dataset partitioning in parquet.
        return_metadata : bool
            Whether to return list of instances from this write, one for each
            output file. These will be passed to write_metadata if an output
            metadata file is requested.
        **kwargs: dict
            Other keyword arguments (including `fmd` and `index_cols`)

        Returns
        -------
        List of metadata-containing instances (if `return_metadata` is `True`)
        or empty list
        r-   )r$   r>   r?   r   filenamerA   return_metadatar5   s           r   write_partitionzEngine.write_partition  s
    > "!r   c                    t                      )a;  
        Write the shared metadata file for a parquet dataset.

        Parameters
        ----------
        parts: List
            Contains metadata objects to write, of the type undrestood by the
            specific implementation
        meta: non-chunk metadata
            Details that do not depend on the specifics of each chunk write,
            typically the schema and pandas metadata, in a format the writer
            can use.
        fs: FileSystem
        path: str
            Output file to write to, usually ``"_metadata"`` in the root of
            the output dataset
        append: boolean
            Whether or not to consolidate new metadata with existing (True)
            or start from scratch (False)
        **kwargs: dict
            Other keyword arguments (including `compression`)
        r-   )r$   partsmetar   r?   r@   r5   s          r   write_metadatazEngine.write_metadata#  s    0 "###r   c                    t                      )a  
        Collect parquet metadata from a file and set the file_path.

        Parameters
        ----------
        path: str
            Parquet-file path to extract metadata from.
        fs: FileSystem
        file_path: str
            Relative path to set as `file_path` in the metadata.

        Returns
        -------
        A metadata object.  The specific type should be recognized
        by the aggregate_metadata method.
        r-   )r$   r?   r   	file_paths       r   collect_file_metadatazEngine.collect_file_metadata=  s    $ "###r   c                    t                      )a  
        Aggregate a list of metadata objects and optionally
        write out the final result as a _metadata file.

        Parameters
        ----------
        meta_list: list
            List of metadata objects to be aggregated into a single
            metadata object, and optionally written to disk. The
            specific element type can be engine specific.
        fs: FileSystem
        out_path: str or None
            Directory to write the final _metadata file. If None
            is specified, the aggregated metadata will be returned,
            and nothing will be written to disk.

        Returns
        -------
        If out_path is None, an aggregate metadata object is returned.
        Otherwise, None is returned.
        r-   )r$   	meta_listr   out_paths       r   aggregate_metadatazEngine.aggregate_metadataQ  s    . "###r   )NNNNNNF)FNFN)__name__
__module____qualname____doc__classmethodr+   r6   r8   r<   rD   rH   rL   rO   rS   r   r   r   r   r      sP       BBV V [Vp 
  @$ @$ @$ [@$D   [ <A"$ "$ "$ ["$H  '" '" '" ['"R " " ["@ $ $ $ [$2 $ $ [$& $ $ [$ $ $r   r   c                (   d | d         D             }t          j        d          }d | d         D             }g }|D ]5\  }}|r|                    |          rd}|                    ||f           6d |D             }|                     dd	dig          }d
 |D             }|sL|rt          |d         t                    rg }t          |          }t          |          fd|D             }	nd |D             }	t          |          }
||	|
|fS )aP  Get the set of names from the pandas metadata section

    Parameters
    ----------
    pandas_metadata : dict
        Should conform to the pandas parquet metadata spec

    Returns
    -------
    index_names : list
        List of strings indicating the actual index names
    column_names : list
        List of strings indicating the actual column names
    storage_name_mapping : dict
        Pairs of storage names (e.g. the field names for
        PyArrow) and actual names. The storage and field names will
        differ for index names for certain writers (pyarrow > 0.8).
    column_indexes_names : list
        The names for ``df.columns.name`` or ``df.columns.names`` for
        a MultiIndex in the columns

    Notes
    -----
    This should support metadata written by at least

    * fastparquet>=0.1.3
    * pyarrow>=0.7.0
    c                L    g | ]!}t          |t                    r|d          n|"S name)r    dict)r   ns     r   r   z*_parse_pandas_metadata.<locals>.<listcomp>  s?         4((/&		a  r   index_columnsz__index_level_\d+__c                V    g | ]&}|                     d |d                   |d         f'S )
field_namer]   )getr   xs     r   r   z*_parse_pandas_metadata.<locals>.<listcomp>  s@       89|QvY	'	'63  r   r;   Nc                $    g | ]\  }}||k    |S r   r   r   storage_namer]   s      r   r   z*_parse_pandas_metadata.<locals>.<listcomp>  s'    SSS0\4dl>R>R4>R>R>Rr   column_indexesr]   c                    g | ]
}|d          S r\   r   rd   s     r   r   z*_parse_pandas_metadata.<locals>.<listcomp>  s    @@@!F)@@@r   r   c                "    g | ]\  }}|v	|S r   r   )r   rh   r]   index_storage_names2s      r   r   z*_parse_pandas_metadata.<locals>.<listcomp>  s/     
 
 
)lDdBV6V6VD6V6V6Vr   c                $    g | ]\  }}||k    |S r   r   rg   s      r   r   z*_parse_pandas_metadata.<locals>.<listcomp>  s'    XXX!5,4<CWCWCWCWCWr   )	recompilematchr@   rc   r    r^   r!   r#   )pandas_metadataindex_storage_namesindex_name_xprpairspairs2rh   	real_nameindex_namescolumn_index_namescolumn_namesstorage_name_mappingrl   s              @r   _parse_pandas_metadatar{   k  s   :  1   Z 677N =LY=W  E F#( 1 1i 	--i88 	I|Y/0000SSFSSSK
 ),,-=?OPP@@-?@@@  Y  	%:.A!.Dd#K#K 	%"$.//"#677
 
 
 
-2
 
 
 YXXXX<<&:<NNNr   c                   | du}|du}| t          |          } n(t          | t                    r| g} nt          |           } ||}n4|du rg }||z   }n(t          |t                    r|g}nt          |          }|r|s|fd|D             nS|r|s| fd|D             n>|r8|r6| |t                                                  rt          d          n||fS )aD  Normalize user and file-provided column and index names

    Parameters
    ----------
    user_columns : None, str or list of str
    data_columns : list of str
    user_index : None, str, or list of str
    data_index : list of str

    Returns
    -------
    column_names : list of str
    index_names : list of str
    NFc                    g | ]}|v|	S r   r   )r   re   rw   s     r   r   z,_normalize_index_columns.<locals>.<listcomp>  s#    HHHa1K3G3G3G3G3Gr   c                    g | ]}|v|	S r   r   )r   re   ry   s     r   r   z,_normalize_index_columns.<locals>.<listcomp>  s#    FFFQ0E0Eq0E0E0Er   z3Specified index and column names must not intersect)r!   r    strr#   intersectionr   )user_columnsdata_columns
user_index
data_indexspecified_columnsspecified_indexry   rw   s         @@r   _normalize_index_columnsr     su    %D0 ,OL))	L#	&	& *$~L))

	u		 
!L0	J	$	$ & \

*%%
 !0 ! !HHHH<HHH	 !? ! $FFFF*FFF	 
!. 
! $ |))+66 	TRSSS	T $ $$r   Fc                b    t          | t                    } t          | ||          \  }}| ||fS )N)key)root)sortedr
   _analyze_paths)	file_listr   r   basefnss        r   _sort_and_analyze_pathsr     s:    y&6777Iy"4888ID#dCr   c                \  
 fd

fd| D             }|du rq|d         dd         |D ]N}t          |          dz
  }t          t          |                    D ]\  }\  }}||k    r|} nd|         Ot                    nS 
|                              d          t                    t	          fd	|D                       s
J d
            g }	|D ]2}|	                    d                    |d                              3d                              |	fS )zConsolidate list of file-paths into parquet relative paths

    Note: This function was mostly copied from dask/fastparquet to
    use in both `FastParquetEngine` and `ArrowEngine`.c                 @   	fd}d}| r| d         r| d         d         dk    r%d}t          |           } | d         dd          | d<   n`	j        dk    rU| d         dd                              d          r2| d         dd         }t          |           } | d         dd          | d<   g }t          |           D ]7\  }}|                     |||                              d                     8g }|D ]}|d	k    r	|d
k    r`|r7|d         d
k    r|                    |           3|                                 H|rt          d          |                    |           o|                    |           |s|r|}nd	}n|d	                    |          z   }|S )Nc                    |                     j        d          }|dk    rdS |d         dk    r
|d d         }| dk    r|d         dk    r
|dd          }|S )N/ .r   r   )replacesep)ipr   s     r   _scrubz2_analyze_paths.<locals>._join_path.<locals>._scrub
  si     		"&#&&ABwwsu||crcF1uu1abbEHr   r   r   r   r   \z:/   r   z..r   zcan not get parent of root)
r!   r   
startswith	enumerateextendsplitr@   r   	Exceptionjoin)
r?   r   
abs_prefix	_scrubbedr   r   simplersjoinedr   s
            r   
_join_pathz"_analyze_paths.<locals>._join_path	  s   
	 
	 
	 
	 
	 
 		&DG 		&AwqzS   
Dzzq'!""+Q4DGABBK$:$:4$@$@!!WQqS\
Dzzq'!""+Q	dOO 	6 	6DAqVVAq\\//445555 	" 	"ACxxd &r{d**q)))) &#$@AAANN1%%%%q!!!! 	6 #388G#4#45Fr   c                L    g | ] } |                               d           !S )r   )r   )r   fnr   s     r   r   z"_analyze_paths.<locals>.<listcomp>?  s/    EEERzz"~~++C00EEEr   Fr   Nr   r   r   c              3  4   K   | ]}|d          k    V  d S )Nr   )r   r   basepathls     r   	<genexpr>z!_analyze_paths.<locals>.<genexpr>M  sA       
 
"#AbqbEX
 
 
 
 
 
r   z(All paths must begin with the given root)lenr   zipr   allr@   r   )r   r   r   path_parts_list
path_partsjk	base_part	path_partout_listr   r   r   s    `        @@@r   r   r     s   4 4 4 4 4l FEEE9EEEOu}}"1%crc*) 	$ 	$JJ!#A-6s8Z7P7P-Q-Q  ))Iy	))AE *  |HHMM:d##))#..MM 
 
 
 
 
'6
 
 
 
 
 	6 	65	6 	6 	6 H% 
 

HHZ^$$	
 	
 	
 	

 	 r   c                   t          |          dk     ri S t          |          dk    rt          |          dk    sJ |d         S t          |          dk    ret          j        |          }| |d                                         |d                                         |d                                         g d}n | |d         d         d|d         d         g d}d}t          |          dk    rt          j        |          }t          |          D ]^\  }}|dz  }	|w|d         |	         }
|d         |	dz            }|d         |	dz            }|
|k    r |r|d	                             d
|i           c|d	                             ||
||d           |j        dd|	f                                         	                                }
|j        dd|	dz   f                                         
                                }|j        dd|	dz   f                                         }|
|k    r!|r|d	                             d
|i           >|d	                             ||
||d           `|S )z~Utility to aggregate the statistics for N row-groups
    into a single dictionary.

    Used by `Engine._construct_parts`
    r   r   num-rowstotal_byte_size)file_path_0r   znum-row-groupsr   r;   Nr      r;   
null_count)r]   minmaxr   )r   pd	DataFramesumcountr   r@   ilocdropnar   r   )rN   file_row_group_statsfile_row_group_column_statsstat_col_indicesdf_rgsr   df_colsindr]   r   minvalmaxvalr   s                r   _aggregate_statsr   \  s      1$$		(	)	)Q	.	.'((A----#A&& #$$q((\"677F(":.2244"("4":":"<"<#)*;#<#@#@#B#B AA  )03J?"##7#:;L#M A *++a//l#>??G"#344  	  	ICaA4Q7:4Q7A>8;AEB
V##
#iL''z(BCCCCiL''$(#)#)*4	     !aaad+224488:: aaaQh/6688<<>>$\!!!QU(37799
V##
#iL''z(BCCCCiL''$(#)#)*4	     r   c	           	     &   g }	g }
|rt          |          }d}|                                D ]\  }}t          |          }|r#dgt          t	          |||                    z   }nt          t	          |||                    }|D ]}||z   }|du r|r
|dk    r|}d}||z
  }|dk    r|}|||         } |||fi |}|;|	                    |           | rCt          |||         ||         ||         ||         |          }|
                    |           nq|                                D ]\\  }} |||fi |}||	                    |           | r3t          |||         ||         |          }|
                    |           ]|	|
fS )Nr   T)intitemsr   r!   ranger@   r   )r3   split_row_groupsaggregation_depthfile_row_groupsr   r   r   make_part_funcmake_part_kwargsrJ   statsresidualrF   
row_groupsrow_group_count_rgsr   i_end	_residualrg_listpartstats                         r   _row_groups_to_partsr     sS    EE ;# /00$3$9$9$;$; #	' #	' Hj!*ooO PsT%/CS"T"TUUUE(O=MNNOO ' ',,$,, %AFF (#$ % 7I 1}}#,$QuW-%~  ' 
 <T"""$ '+ ,X6qw?3H=agF(	 D LL&&&9'#	'J %4$9$9$;$; 	# 	# Hj!>  # D
 |LL  #'(2/9$	  T"""%<r   c                    | }t          | t                    r<| |v r&t          |          |                    |           z
  }nt	          |  d          |S )Nz) is not a recognized directory partition.)r    r   r   r0   r   )aggregate_filespartition_namesr   s      r   _get_aggregation_depthr     s|      (/3'' o-- !$O 4 47L7L8 8 ! "MMM   r   c                `    | +dt          |          rdndz   }t          j        |d          S | S )Nz%dataframe.parquet.metadata-task-size-localremoter   )r	   r   rc   )metadata_task_sizer   
config_strs      r   _set_metadata_task_sizer     sD     ! =#B''5GGX

 z*a(((r   	readaheadTc           	        | pi                                  } |                     di                                            }|si }d| vr|                    dd           dk    rH|                     dd          | d<   |                    ||||                    d|          d           n2|                     d|          | d<   |                     d	d
          | d	<   || fS )Nprecache_optionsopen_file_funcmethodparquet
cache_typerJ   engine)metadatar;   r   r   r   r   )copyr   rc   update)r(   r   r;   r   default_enginedefault_cacheallow_precacher   s           r   _process_open_file_optionsr   (  s&    +0b6688(,,-?DDIIKK  000$//9<<.?.C.Cg/ /l+ ## (&",.228^LL	     /@.C.Cm/ /l+ ):(=(=fd(K(Kf%...r   c                    |                                  }d|v rt          j        dt                     i |                    di                                            |                    di                                            }|                    di                                            }|                    di                                            }||||fS )Nfilez^Passing user options with the 'file' argument is now deprecated. Please use 'dataset' instead.datasetreadr(   )r   warningswarnFutureWarningr   )r5   user_kwargsr'   read_optionsr(   s        r   _split_user_optionsr  N  s     ++--K-	
 	
 	

//&"
%
%
*
*
,
,
//)R
(
(
-
-
/
/O ??62..3355L#(;R@@EEGG	 r   c                    |r|du s*t          |          dk    r|s|                    |          rd} n|sd} t          |           S )NTr   F)r   r   bool)r3   	blocksizer   r   filter_columnsstat_columnss         r   _set_gather_statisticsr  i  sq     
"'4// !!A%%*;%&&|44 & ! " "!"""r   c                f    | r.t          |          }|st          j        |           d|z  k    rdS dS )Nr   adaptiveF)r   npr   )row_group_sizesr  r   s      r   _infer_split_row_groupsr    sC     	**	 	bf_55IEE :5r   rT   )NNNNr   T)"
__future__r   rn   r  numpyr  pandasr   fsspec.corer   r   r   fsspec.specr   daskr   dask.dataframe.io.utilsr	   
dask.utilsr
   r   r   r{   r   r   r   r   r   r   r   r   r  r  r  r   r   r   <module>r     s   " " " " " " 				          R R R R R R R R R R * * * * * *       0 0 0 0 0 0 4 4 4 4 4 4 4 4X$ X$ X$ X$ X$ X$ X$ X$v
QO QO QOh;% ;% ;%|       V V V VrL L L^K K K\  B    #/ #/ #/ #/L  6# # #B     r   