o
    NrfS                     @  s  d dl mZ d dlZd dlZd dlZd dlmZ	 d dl
mZ d dlmZ d dlmZmZ d dlmZ d dlmZmZ d d	lmZ 				
			d$ddZ					
				d%ddZdd Zd&ddZd&ddZ										d'd(dd Zedd!d"d#ZdS ))    )annotationsN)compute)methods)PANDAS_GE_300)from_delayedfrom_pandas)pyarrow_strings_enabled)delayedtokenize)parse_bytes256 MiB   c
                 K  s  ddl }t|tstdtt| d |du rtdt|t|j|jjj	fs3tdtt| |dksK|du r?td|du rK|du rKtd|rS|rStd	|	du rYi n|	}	|j
|fi |	}t|trn||n||j|j}|j|
d
< |dkr| |}tj||fi |
}t|dkrt|ddS t rddlm}m} |  ||}|jddd | }|du r|jdd }|du ri|du r|j|jj||jj||  }t||}|jd \}}|jd }n
|\}}t|j }|du r,|j|jj!||  }t||d d }t"t#|| t$| p+d}|j%dkrNt&'tj(||d|| ) |  d}||d< ||d< n|j%dv rbt*j+|||d |d' }ntd,|g }|dd |dd }}t-t.||D ]5\}\}}|t|d kr||kn||k }| /|j0||k|}|1t2t3|||fd|	i|
 q|4  t5|||dS )ar	  
    Read SQL query into a DataFrame.

    If neither ``divisions`` or ``npartitions`` is given, the memory footprint of the
    first few rows will be determined, and partitions of size ~256MB will
    be used.

    Parameters
    ----------
    sql : SQLAlchemy Selectable
        SQL query to be executed. TextClause is not supported
    con : str
        Full sqlalchemy URI for the database connection
    index_col : str
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        ``npartitions`` or ``bytes_per_chunk``; otherwise must supply explicit
        ``divisions``.
    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override ``npartitions`` and ``bytes_per_chunk``. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
    npartitions : int
        Number of partitions, if ``divisions`` is not given. Will split the values
        of the index column linearly between ``limits``, if given, or the column
        max/min. The index column must be numeric or time for this to work
    limits: 2-tuple or None
        Manually give upper and lower range of values for use with ``npartitions``;
        if None, first fetches max/min from the DB. Upper limit, if
        given, is inclusive.
    bytes_per_chunk : str or int
        If both ``divisions`` and ``npartitions`` is None, this is the target size of
        each partition, in bytes
    head_rows : int
        How many rows to load for inferring the data-types, and memory per row
    meta : empty DataFrame or None
        If provided, do not attempt to infer dtypes, but use these, coercing
        all chunks on load
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy
    kwargs : dict
        Additional parameters to pass to `pd.read_sql()`

    Returns
    -------
    dask.dataframe

    See Also
    --------
    read_sql_table : Read SQL database table into a DataFrame.
    r   Nz'con' must be of type str, not 8Note: Dask does not support SQLAlchemy connectables herez)Must specify index column to partition onz2'index_col' must be of type str or sa.Column, not z'Must provide 'meta' if 'head_rows' is 0z=Must provide 'divisions' or 'npartitions' if 'head_rows' is 0z9Must supply either 'divisions' or 'npartitions', not both	index_col   )npartitions)check_pyarrow_string_supportedto_pyarrow_stringT)deepindexZmax_1Zcount_1Mz%is)startendfreq)iuf)dtypezwProvided index column is of type "{}".  If divisions is not provided the index column type must be numeric or datetime.engine_kwargs)	divisions)6
sqlalchemy
isinstancestr	TypeErrortype
ValueErrorColumnsqlelementsZColumnClausecreate_enginenamelimitpdread_sqllenr   r   Zdask.dataframe._pyarrowr   r   Zmemory_usagesumZilocselectfuncmaxminselect_fromZsubquerydtypesZSeriesr   countintroundr   kindr   tolistZ
date_rangetotal_secondsnpZlinspaceformat	enumeratezipwhereand_appendr	   _read_sql_chunkdisposer   )r(   conr   r    r   limitsbytes_per_chunk	head_rowsmetar   kwargssaenginer   qheadr   r   Zbytes_per_rowZminmaxZmaxiZminir   r7   partsZlowersZuppersr   lowerupperZcond rS   ^/var/www/html/software/conda/envs/catlas/lib/python3.10/site-packages/dask/dataframe/io/sql.pyread_sql_query   s   D






 
"rU   c                   s  ddl  ddl m} d|v rtdt |dd|v r(tdt |d}tts7tdtt	 |durR|D ]}t| j
tfsQtd	tt	| q=t|tsctd
tt	| d |du rii n|} j|fi |}  }ttr j|||	dn
tdtt	 |  |r fdd|D n	 fddjD }t|tr 
|j| j	n 
|j|j	}|jdd |D vr|| |j| }td|||||||||
|d
|S )a  
    Read SQL database table into a DataFrame.

    If neither ``divisions`` or ``npartitions`` is given, the memory footprint of the
    first few rows will be determined, and partitions of size ~256MB will
    be used.

    Parameters
    ----------
    table_name : str
        Name of SQL table in database.
    con : str
        Full sqlalchemy URI for the database connection
    index_col : str
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        ``npartitions`` or ``bytes_per_chunk``; otherwise must supply explicit
        ``divisions``.
    columns : sequence of str or SqlAlchemy column or None
        Which columns to select; if None, gets all. Note can be a mix of str and SqlAlchemy columns
    schema : str or None
        Pass this to sqlalchemy to select which DB schema to use within the
        URI connection
    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override ``npartitions`` and ``bytes_per_chunk``. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
    npartitions : int
        Number of partitions, if ``divisions`` is not given. Will split the values
        of the index column linearly between ``limits``, if given, or the column
        max/min. The index column must be numeric or time for this to work
    limits: 2-tuple or None
        Manually give upper and lower range of values for use with ``npartitions``;
        if None, first fetches max/min from the DB. Upper limit, if
        given, is inclusive.
    bytes_per_chunk : str or int
        If both ``divisions`` and ``npartitions`` is None, this is the target size of
        each partition, in bytes
    head_rows : int
        How many rows to load for inferring the data-types, and memory per row
    meta : empty DataFrame or None
        If provided, do not attempt to infer dtypes, but use these, coercing
        all chunks on load
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy
    kwargs : dict
        Additional parameters to pass to `pd.read_sql()`

    Returns
    -------
    dask.dataframe

    See Also
    --------
    read_sql_query : Read SQL query into a DataFrame.

    Examples
    --------
    >>> df = dd.read_sql_table('accounts', 'sqlite:///path/to/bank.db',
    ...                  npartitions=10, index_col='id')  # doctest: +SKIP
    r   N)r(   tablezWThe `table` keyword has been replaced by `table_name`. Please use `table_name` instead.urizGThe `uri` keyword has been replaced by `con`. Please use `con` instead.z&`table_name` must be of type str, not z8`columns` must be of type List[str], and cannot contain z`con` must be of type str, not r   )Zautoload_withschemac                   s:   g | ]}t |tr |j| jn |j|jqS rS   )r"   r#   r'   columnsr%   r+   .0crL   
table_namerS   rT   
<listcomp>C  s    z"read_sql_table.<locals>.<listcomp>c                   s   g | ]
}  |j|jqS rS   )r'   r+   r%   rZ   )rL   rS   rT   r_   L  s    c                 S  s   g | ]}|j qS rS   )r+   rZ   rS   rS   rT   r_   T  s    )
r(   rF   r   r    r   rG   rH   rI   rJ   r   rS   )r!   r(   warningswarnDeprecationWarningpopr"   r#   r$   r%   r'   r*   ZMetaDataTablerE   rY   r+   rC   r1   r5   rU   )r^   rF   r   r    r   rG   rY   rH   rI   rX   rJ   r   rK   r(   colrM   mr   queryrS   r]   rT   read_sql_table   s   P






	
rh   c                 K  s2   t | trt| ||fi |S t| ||fi |S )a  
    Read SQL query or database table into a DataFrame.

    This function is a convenience wrapper around ``read_sql_table`` and
    ``read_sql_query``. It will delegate to the specific function depending
    on the provided input. A SQL query will be routed to ``read_sql_query``,
    while a database table name will be routed to ``read_sql_table``.
    Note that the delegated function might have more specific notes about
    their functionality not listed here.

    Parameters
    ----------
    sql : str or SQLAlchemy Selectable
        Name of SQL table in database or SQL query to be executed. TextClause is not supported
    con : str
        Full sqlalchemy URI for the database connection
    index_col : str
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        ``npartitions`` or ``bytes_per_chunk``; otherwise must supply explicit
        ``divisions``.

    Returns
    -------
    dask.dataframe

    See Also
    --------
    read_sql_table : Read SQL database table into a DataFrame.
    read_sql_query : Read SQL query into a DataFrame.
    )r"   r#   rh   rU   )r(   rF   r   rK   rS   rS   rT   r.   h  s   
!r.   c                 K  s   dd l }|pi }|j|fi |}tj| |fi |}|  t|dkr'|S t|j dkr2|S tr6i nddi}|j	|j fi |S )Nr   copyF)
r!   r*   r-   r.   rE   r/   r6   to_dictr   Zastype)rN   rW   rJ   r   rK   rL   rM   dfrS   rS   rT   rD     s   rD   c                 K  sB   dd l }|pi }|j|fi |}| jdd|i|}|  |S )Nr   rF   rS   )r!   r*   to_sqlrE   )drW   r   rK   rL   rM   rN   rS   rS   rT   _to_sql_chunk  s   rn   failTFr+   r#   rW   	if_existsr   boolc                   s   t |tstdt| dt||||||||||	d
}tt| jfi | t|dd|r= fdd|  D }n)g } }|  D ] }|	t
t|fd|id	d
t|fi  i |d }qEt|}|
rrt| dS |S )a  Store Dask Dataframe to a SQL table

    An empty table is created based on the "meta" DataFrame (and conforming to the caller's "if_exists" preference), and
    then each block calls pd.DataFrame.to_sql (with `if_exists="append"`).

    Databases supported by SQLAlchemy [1]_ are supported. Tables can be
    newly created, appended to, or overwritten.

    Parameters
    ----------
    name : str
        Name of SQL table.
    uri : string
        Full sqlalchemy URI for the database connection
    schema : str, optional
        Specify the schema (if database flavor supports this). If None, use
        default schema.
    if_exists : {'fail', 'replace', 'append'}, default 'fail'
        How to behave if the table already exists.

        * fail: Raise a ValueError.
        * replace: Drop the table before inserting new values.
        * append: Insert new values to the existing table.

    index : bool, default True
        Write DataFrame index as a column. Uses `index_label` as the column
        name in the table.
    index_label : str or sequence, default None
        Column label for index column(s). If None is given (default) and
        `index` is True, then the index names are used.
        A sequence should be given if the DataFrame uses MultiIndex.
    chunksize : int, optional
        Specify the number of rows in each batch to be written at a time.
        By default, all rows will be written at once.
    dtype : dict or scalar, optional
        Specifying the datatype for columns. If a dictionary is used, the
        keys should be the column names and the values should be the
        SQLAlchemy types or strings for the sqlite3 legacy mode. If a
        scalar is provided, it will be applied to all columns.
    method : {None, 'multi', callable}, optional
        Controls the SQL insertion clause used:

        * None : Uses standard SQL ``INSERT`` clause (one per row).
        * 'multi': Pass multiple values in a single ``INSERT`` clause.
        * callable with signature ``(pd_table, conn, keys, data_iter)``.

        Details and a sample callable implementation can be found in the
        section :ref:`insert method <io.sql.method>`.
    compute : bool, default True
        When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
        per-block objects when parallel=True)
    parallel : bool, default False
        When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
        different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
        sequence.
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy

    Raises
    ------
    ValueError
        When the table already exists and `if_exists` is 'fail' (the
        default).

    See Also
    --------
    read_sql : Read a DataFrame from a table.

    Notes
    -----
    Timezone aware datetime columns will be written as
    ``Timestamp with timezone`` type with SQLAlchemy if supported by the
    database. Otherwise, the datetimes will be stored as timezone unaware
    timestamps local to the original timezone.

    .. versionadded:: 0.24.0

    References
    ----------
    .. [1] https://docs.sqlalchemy.org
    .. [2] https://www.python.org/dev/peps/pep-0249/

    Examples
    --------
    Create a table from scratch with 4 rows.

    >>> import pandas as pd
    >>> import dask.dataframe as dd
    >>> df = pd.DataFrame([ {'i':i, 's':str(i)*2 } for i in range(4) ])
    >>> ddf = dd.from_pandas(df, npartitions=2)
    >>> ddf  # doctest: +SKIP
    Dask DataFrame Structure:
                       i       s
    npartitions=2
    0              int64  object
    2                ...     ...
    3                ...     ...
    Dask Name: from_pandas, 2 tasks

    >>> from dask.utils import tmpfile
    >>> from sqlalchemy import create_engine, text
    >>> with tmpfile() as f:
    ...     db = 'sqlite:///%s' %f
    ...     ddf.to_sql('test', db)
    ...     engine = create_engine(db, echo=False)
    ...     with engine.connect() as conn:
    ...         result = conn.execute(text("SELECT * FROM test")).fetchall()
    >>> result
    [(0, 0, '00'), (1, 1, '11'), (2, 2, '22'), (3, 3, '33')]
    z!Expected URI to be a string, got .)
r+   rW   r   rX   rp   r   index_label	chunksizer   methodrC   )rp   c                   s:   g | ]}t t|fd  iddt|fi  iqS )extrasdask_key_name	to_sql-%s)_extra_depsrn   r
   )r[   rm   Z	meta_taskZworker_kwargsrS   rT   r_   B  s    zto_sql.<locals>.<listcomp>rv   rw   rx   r   N)r"   r#   r&   r%   dictr	   rn   _metaZ
to_delayedrC   ry   r
   dask_compute)rk   r+   rW   rX   rp   r   rs   rt   r   ru   r   Zparallelr   rK   resultlastrm   rS   rz   rT   rl     sP   
}
	rl   )rv   c                O  s   | |i |S NrS   )r2   rv   argsrK   rS   rS   rT   ry   c  s   ry   )NNNr   r   NN)	NNNNr   r   NNNr   )
Nro   TNNNNTFN)r+   r#   rW   r#   rp   r#   r   rq   )
__future__r   r`   numpyr=   Zpandasr-   Z	dask.baser   r}   Zdask.dataframer   Zdask.dataframe._compatr   Zdask.dataframe.io.ior   r   Zdask.dataframe.utilsr   Zdask.delayedr	   r
   Z
dask.utilsr   rU   rh   r.   rD   rn   rl   ry   rS   rS   rS   rT   <module>   s`    
 8
 %
'
 7