
    0Fie                        d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ erd dlZd dlZd'dZd(dZd)dZd*dZd+dZd,dZd-dZd.d!Zd/d%Zd0d&ZdS )1    )annotations)Iterable)Path)TYPE_CHECKING)parse)parse_bytesN
meta_inputpd.DataFramereturnNonec                    dd l }| D ]m}| |         }|j        j                            |          rt	          d|j         d| d          t          |j        |j                  rt	          d          nd S )Nr   z#p2p does not support data of type 'z' found in column 'z'.z9p2p does not support sparse data found in column '{name}')pandasapitypesis_complex_dtype	TypeErrordtype
isinstanceSparseDtype)r	   pdnamecolumns       :lib/python3.11/site-packages/distributed/shuffle/_arrow.pycheck_dtype_supportr      s     	Y 	YD!6<((00 	_fl__W[___   flBN33 	YWXXX	Y	Y 	Y    c                     d} 	 ddl }n # t          $ r t          d|            w xY wt          |j                  t          |           k     rt	          d|  d|j                   dS )zVerify that the the correct version of pyarrow is installed to support
    the P2P extension.

    Raises a ModuleNotFoundError if pyarrow is not installed or an
    ImportError if the installed version is not recent enough.
    z7.0.0r   Nz P2P shuffling requires pyarrow>=z but only found )pyarrowModuleNotFoundErrorr   __version__ImportError)
minversionpas     r   check_minimal_arrow_versionr#      s     JS S S S!"QZ"Q"QRRRSR^uZ0000[z[[2>[[
 
 	
 10s   	 &tablesIterable[pa.Table]pa.Tablec                   dd l }t          |j                  t          d          k    r |j        | d          S 	  |j        | d          S # |j        $ r-}t          |j                  t          d          k    r| d }~ww xY w)Nr   z14.0.0
permissive)promote_optionsT)promote12.0.0)r   r   r   concat_tablesArrowNotImplementedError)r$   r"   es      r   r,   r,   1   s    R^h//rEEEEr5555&     E(OO33Gs   A 
B(BBshardslist[pa.Table]metac                   dd l }ddlm} ddlm} t          |           } |||d          }i }|j                                        D ]\  }}	||         j        }
|
|	k    rt          |
|j
                  rt          |	|j
                  r|	||<   It          |
|j                  rt          |	|j                  rt ||
|	g          ||<   |                    |d          S )Nr   )find_common_type)from_pyarrow_table_dispatchT)self_destructF)copy)r   pandas.core.dtypes.castr3   dask.dataframe.dispatchr4   r,   dtypesitemsr   r   StringDtypeCategoricalDtypeastype)r/   r1   r   r3   r4   tabledfreconciled_dtypesr   r   actuals              r   convert_shardsrB   >   s5   888888CCCCCC&!!E	$	$T5	E	E	EB**,, F FF!U??fbn-- 	*UBN2S2S 	(-f% fb122 	z2&8
 8
 	 $4$4fe_$E$E&!!99&U9333r   datalist[bytes]c                8    d | D             }t          |          S )z>Convert a list of arrow buffers and a schema to an Arrow Tablec              3  4   K   | ]}t          |          V  d S )N)deserialize_table).0buffers     r   	<genexpr>z+list_of_buffers_to_table.<locals>.<genexpr>]   s+      ;;F'';;;;;;r   )r,   )rC   r$   s     r   list_of_buffers_to_tablerK   Z   s%     <;d;;;F   r   r>   bytesc                   dd l } |j                    }|j                            || j                  5 }|                    |            d d d            n# 1 swxY w Y   |                                                                S Nr   )r   BufferOutputStreamipc
new_streamschemawrite_tablegetvalue
to_pybytes)r>   r"   streamwriters       r   serialize_tablerX   a   s    "R"$$F			65<	0	0 "F5!!!" " " " " " " " " " " " " " "??'')))s   AAArI   c                    dd l }|j                             |j        |                     5 }|                                cd d d            S # 1 swxY w Y   d S rN   )r   rP   open_stream	py_bufferread_all)rI   r"   readers      r   rG   rG   j   s    			LBL00	1	1 !V  ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !s   AAApathr   tuple[list[pa.Table], int]c                   dd l }t          d          }g }g } |j        t          |           d          5 }|                    dd          }|                    d           d}|                                }||k     r |j        |          }	|	                                }
|                                }|                    |
           ||z
  |k    r5t          |          }|                    t          |                     g }|}||k     d d d            n# 1 swxY w Y   |r1t          |          }|                    t          |                     ||fS )Nr   z1 MiBrb)mode   )whence)r   r   OSFilestrseektellRecordBatchStreamReaderr\   appendr,   _copy_table)r^   r"   
batch_sizebatchr/   fsizeprevoffsetsrshardr>   s               r   read_from_diskrt   q   s   W%%JEF	3t994	(	(	( Avvav""	q			tmm++A..BKKMMEVVXXFLL}
**%e,,k%00111 tmm                 *e$$k%(()))4<s   CDD!$D!arraysIterable[pa.Array]pa.Arrayc                    dd l }	  |j        |           S # |j        $ r\}t          |j                  t          d          k    r |j        d                             d          rt          d          | d }~ww xY w)Nr   r+   zconcatenation of extensionzBP2P shuffling requires pyarrow>=12.0.0 to support extension types.)r   concat_arraysr-   r   r   args
startswithRuntimeError)ru   r"   r.   s      r   ry   ry      s    	r'''&     E(OO336!9 <== 	T  	s    
A<AA77A<c                Z    dd l }d | j        D             } |j        || j                  S )Nr   c                6    g | ]}t          |j                  S  )ry   chunks)rH   r   s     r   
<listcomp>z_copy_table.<locals>.<listcomp>   s"    EEEVM&-((EEEr   )rC   rR   )r   columnsr>   rR   )r>   r"   arrss      r   rk   rk      s<    EEu}EEED28el3333r   )r	   r
   r   r   )r   r   )r$   r%   r   r&   )r/   r0   r1   r
   r   r
   )rC   rD   r   r&   )r>   r&   r   rL   )rI   rL   r   r&   )r^   r   r   r_   )ru   rv   r   rw   )r>   r&   r   r&   )
__future__r   collections.abcr   pathlibr   typingr   packaging.versionr   
dask.utilsr   r   r   r   r"   r   r#   r,   rB   rK   rX   rG   rt   ry   rk   r   r   r   <module>r      sg   " " " " " " $ $ $ $ $ $                   # # # # # # " " " " " " Y Y Y Y
 
 
 
$
 
 
 
4 4 4 48! ! ! !* * * *! ! ! !   :   4 4 4 4 4 4r   