Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions python/pyspark/sql/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def to_pandas(
ndarray_as_list: bool = False,
prefer_int_ext_dtype: bool = False,
df_for_struct: bool = False,
arrow_dtype_types: Optional[tuple] = None,
) -> List[Union["pd.Series", "pd.DataFrame"]]:
"""
Convert a RecordBatch or Table to a list of pandas Series.
Expand All @@ -208,6 +209,10 @@ def to_pandas(
Whether to convert integers to Pandas ExtensionDType.
df_for_struct : bool
If True, convert struct columns to DataFrame instead of Series.
arrow_dtype_types : tuple of DataType classes, optional
If provided, columns whose Spark type matches one of these classes will be
converted via convert_pyarrow (ArrowDtype-backed). Unsupported types fall
through to convert_numpy/convert_legacy. Default is None (disabled).

Returns
-------
Expand All @@ -232,6 +237,7 @@ def to_pandas(
ndarray_as_list=ndarray_as_list,
prefer_int_ext_dtype=prefer_int_ext_dtype,
df_for_struct=df_for_struct,
arrow_dtype_types=arrow_dtype_types,
)
for i in range(batch.num_columns)
]
Expand Down Expand Up @@ -1459,6 +1465,29 @@ class ArrowArrayToPandasConversion:
where Arrow data needs to be converted to pandas for Python UDF processing.
"""

# Types supported by convert_pyarrow (ArrowDtype-backed pandas Series).
# This tuple controls which types are routed to the pyarrow path when
# arrow_cast is enabled. Expand as more types are supported.
ARROW_DTYPE_TYPES = (
NullType,
BinaryType,
BooleanType,
FloatType,
DoubleType,
ByteType,
ShortType,
IntegerType,
LongType,
DecimalType,
StringType,
DateType,
TimeType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
YearMonthIntervalType,
)

@classmethod
def convert(
cls,
Expand All @@ -1471,6 +1500,7 @@ def convert(
ndarray_as_list: bool = False,
prefer_int_ext_dtype: bool = False,
df_for_struct: bool = False,
arrow_dtype_types: Optional[tuple] = None,
) -> Union["pd.Series", "pd.DataFrame"]:
"""
Convert a PyArrow Array or ChunkedArray to a pandas Series or DataFrame.
Expand All @@ -1495,13 +1525,24 @@ def convert(
df_for_struct : bool, optional
If True, convert struct columns to a DataFrame with columns corresponding
to struct fields instead of a Series. Default is False.
arrow_dtype_types : tuple of DataType classes, optional
If provided, columns whose Spark type matches one of these classes will be
converted via convert_pyarrow (ArrowDtype-backed). Unsupported types fall
through to convert_numpy/convert_legacy. Default is None (disabled).

Returns
-------
pd.Series or pd.DataFrame
Converted pandas Series. If df_for_struct is True and the type is StructType,
returns a DataFrame with columns corresponding to struct fields.
"""
if arrow_dtype_types is not None and isinstance(spark_type, arrow_dtype_types):
return cls.convert_pyarrow(
arr,
spark_type,
ser_name=ser_name,
)

if cls._prefer_convert_numpy(spark_type, df_for_struct):
return cls.convert_numpy(
arr,
Expand Down Expand Up @@ -1780,3 +1821,89 @@ def convert_numpy(
assert False, f"Need converter for {spark_type} but failed to find one."

return series.rename(ser_name)

@classmethod
def convert_pyarrow(
cls,
arr: Union["pa.Array", "pa.ChunkedArray"],
spark_type: DataType,
*,
ser_name: Optional[str] = None,
) -> "pd.Series":
"""
Convert a PyArrow Array or ChunkedArray to a pandas Series backed by ArrowDtype.

This is similar to :meth:`convert_numpy`, but instead of producing
numpy-backed pandas Series, it produces ArrowDtype-backed Series via
``arr.to_pandas(types_mapper=pd.ArrowDtype)``.

Parameters
----------
arr : pa.Array or pa.ChunkedArray
The Arrow column to convert.
spark_type : DataType
The target Spark type for the column to be converted to.
ser_name : str, optional
The name of returned pd.Series. If not set, will try to get it from arr._name.

Returns
-------
pd.Series
Converted pandas Series backed by ArrowDtype.
"""
import pyarrow as pa
import pandas as pd

assert isinstance(arr, (pa.Array, pa.ChunkedArray))

if ser_name is None:
ser_name = arr._name

arr = ArrowArrayConversion.preprocess_time(arr)

series: pd.Series

if isinstance(
spark_type,
(
NullType,
BinaryType,
BooleanType,
FloatType,
DoubleType,
ByteType,
ShortType,
IntegerType,
LongType,
DecimalType,
StringType,
DateType,
TimeType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
YearMonthIntervalType,
),
):
series = arr.to_pandas(types_mapper=pd.ArrowDtype)
# elif isinstance(spark_type, UserDefinedType):
# TODO: Support UserDefinedType
# elif isinstance(spark_type, VariantType):
# TODO: Support VariantType
# elif isinstance(spark_type, GeographyType):
# TODO: Support GeographyType
# elif isinstance(spark_type, GeometryType):
# TODO: Support GeometryType
# elif isinstance(
# spark_type,
# (
# ArrayType,
# MapType,
# StructType,
# ),
# ):
# TODO: Support complex types
else: # pragma: no cover
assert False, f"Need converter for {spark_type} but failed to find one."

return series.rename(ser_name)
43 changes: 30 additions & 13 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
ArrayType,
MapType,
TimestampType,
StructField,
StructType,
_has_type,
DataType,
Expand Down Expand Up @@ -184,6 +185,7 @@ def _convert_arrow_table_to_pandas(
struct_handling_mode: Optional[str] = None,
date_as_object: bool = False,
self_destruct: bool = False,
arrow_dtype: bool = False,
) -> "PandasDataFrameLike":
"""
Helper function to convert Arrow table columns to a pandas DataFrame.
Expand All @@ -207,6 +209,9 @@ def _convert_arrow_table_to_pandas(
Whether to convert date values to Python datetime.date objects (default: False)
self_destruct : bool
Whether to enable memory-efficient self-destruct mode for large tables (default: False)
arrow_dtype : bool
Whether to produce ArrowDtype-backed pandas Series for supported types
(default: False)

Returns
-------
Expand Down Expand Up @@ -254,23 +259,32 @@ def _convert_arrow_table_to_pandas(
error_on_duplicated_field_names = True
struct_handling_mode = "dict"

# Convert arrow columns to pandas Series
column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in arrow_table.columns)
if arrow_dtype:
from pyspark.sql.conversion import ArrowArrayToPandasConversion

arrow_dtype_types = ArrowArrayToPandasConversion.ARROW_DTYPE_TYPES

def _convert_column(arrow_col: "pa.ChunkedArray", field: "StructField") -> "pd.Series":
if arrow_dtype and isinstance(field.dataType, arrow_dtype_types):
return ArrowArrayToPandasConversion.convert_pyarrow(
arrow_col, field.dataType, ser_name=field.name
)
series = arrow_col.to_pandas(**pandas_options)
return _create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_handling_mode,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(series)

# Apply Spark-specific type converters to each column
pdf = pd.concat(
objs=cast(
Sequence[pd.Series],
(
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_handling_mode,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(series)
for series, field in zip(column_data, schema.fields)
),
[
_convert_column(arrow_table.column(i), schema.fields[i])
for i in range(len(schema.fields))
],
),
axis="columns",
)
Expand Down Expand Up @@ -306,6 +320,7 @@ def _to_pandas(self, **kwargs: Any) -> "PandasDataFrameLike":
arrowPySparkFallbackEnabled,
arrowPySparkSelfDestructEnabled,
pandasStructHandlingMode,
arrowPySparkArrowDtypeEnabled,
) = self.sparkSession._jconf.getConfs(
[
"spark.sql.session.timeZone",
Expand All @@ -314,6 +329,7 @@ def _to_pandas(self, **kwargs: Any) -> "PandasDataFrameLike":
"spark.sql.execution.arrow.pyspark.fallback.enabled",
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled",
"spark.sql.execution.pandas.structHandlingMode",
"spark.sql.execution.arrow.pyspark.arrowDtype.enabled",
]
)

Expand Down Expand Up @@ -386,6 +402,7 @@ def _to_pandas(self, **kwargs: Any) -> "PandasDataFrameLike":
struct_handling_mode=pandasStructHandlingMode,
date_as_object=True,
self_destruct=arrowPySparkSelfDestructEnabled == "true",
arrow_dtype=arrowPySparkArrowDtypeEnabled == "true",
)

return pdf
Expand Down
Loading