Skip to content

Commit d021e6a

Browse files
timsaucerclaude
andauthored
feat: import user-defined physical optimizer rules over FFI (#1557)
* feat: user-defined OptimizerRule and AnalyzerRule from Python Expose `SessionContext.add_optimizer_rule` and `SessionContext.add_analyzer_rule` symmetric with the existing `remove_optimizer_rule`. Each accepts a Python subclass of the new `datafusion.optimizer.OptimizerRule` / `AnalyzerRule` ABCs. Implementation: * New `crates/core/src/optimizer_rules.rs` wraps user Python instances in `PyOptimizerRuleAdapter` / `PyAnalyzerRuleAdapter`, which implement the upstream `OptimizerRule` / `AnalyzerRule` traits. * `OptimizerRule.rewrite(plan)` returns `None` for "no change" or a new `LogicalPlan`. The adapter maps that to `Transformed::no` / `Transformed::yes` so the upstream optimizer's fixed-point loop terminates correctly. * `AnalyzerRule.analyze(plan)` must always return a `LogicalPlan`; returning `None` surfaces a `DataFusionError::Execution` naming the offending rule. * The upstream `&dyn OptimizerConfig` / `&ConfigOptions` arguments are not surfaced to Python in this MVP; rules that need configuration should capture it at construction time (for example by holding a `SessionContext` reference) or be implemented in Rust. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat: import FFI physical optimizer rules; drop Python logical rules Replace the Python-defined OptimizerRule/AnalyzerRule approach with FFI-imported physical optimizer rules. The Python logical-rule approach could observe plans but not transform them: there are no Python constructors for LogicalPlan node variants, so a rule could only return None or the input plan unchanged. The audience for custom rules also overlaps strongly with people who can write Rust. DataFusion exposes no FFI bridge for the logical OptimizerRule/AnalyzerRule traits, but it does export FFI_PhysicalOptimizerRule for the physical PhysicalOptimizerRule trait. This commit imports those instead. Changes: * Remove crates/core/src/optimizer_rules.rs, python/datafusion/optimizer.py, python/tests/test_optimizer.py, and the SessionContext.add_optimizer_rule / add_analyzer_rule methods. remove_optimizer_rule is unchanged (pre-existing). * New crates/core/src/physical_optimizer.rs reads a __datafusion_physical_optimizer_rule__ capsule and converts it via Arc<dyn PhysicalOptimizerRule>::from(&FFI_PhysicalOptimizerRule). * SessionContext gains a physical_optimizer_rules constructor argument. Upstream offers no API to add physical rules to a live context, so they are appended to the builder at construction time only. * The datafusion-ffi-example crate gains MyPhysicalOptimizerRule, a counter-backed rule used by _test_physical_optimizer_rule.py to prove the rule fires over FFI during physical planning. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: type physical_optimizer_rules with an Exportable Protocol Replace the `list[Any]` hint on the SessionContext `physical_optimizer_rules` argument with a `PhysicalOptimizerRuleExportable` Protocol, matching the existing `TableProviderExportable` / `*Exportable` pattern used for other FFI-capsule objects. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: reference PhysicalOptimizerRuleExportable in SessionContext docstring Point the `physical_optimizer_rules` argument docs at the new `PhysicalOptimizerRuleExportable` Protocol instead of describing the duck type inline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: move FFI capsule detail to PhysicalOptimizerRuleExportable The PyCapsule / FFI_PhysicalOptimizerRule mechanics describe the Protocol, not the SessionContext constructor. Move that detail onto PhysicalOptimizerRuleExportable and leave the constructor argument docs focused on behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: drop redundant comment in SessionContext constructor Remove the explanatory comment about FFI bridge availability; the same information already lives on PhysicalOptimizerRuleExportable. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: drop module-level doc comment from physical_optimizer Sibling FFI-import modules (udf, udaf, catalog, table) carry no module-level docs, and the rst-style markup did not match Rust conventions. The function doc comment already states intent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: import physical optimizer rule via from_pycapsule! macro Replace the hand-written crates/core/src/physical_optimizer.rs with a `from_pycapsule!` invocation in the util crate, matching `physical_codec_from_pycapsule` and the other FFI capsule importers. The macro already handles the hasattr/getattr/cast/validate/pointer_checked sequence and the infallible `Arc::from(&FFI)` conversion, so the dedicated module is no longer needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: trim PhysicalOptimizerRuleExportable docstring Drop the sentence about logical-rule FFI availability; it is background, not type-hint information, and keeps the Protocol docstring in line with the other *Exportable hints. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Minor refactor * refactor: register physical optimizer rules via live add method Drop the `physical_optimizer_rules` constructor argument on `SessionContext` and replace it with `add_physical_optimizer_rule`, matching the existing `register_*` shape on the same class. The new method rebuilds the session state via `SessionStateBuilder::new_from_existing` so previously registered tables, UDFs, and catalogs are preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: drop redundant FFI physical optimizer rule export test Coverage subsumed by test_ffi_physical_optimizer_rule_runs_during_planning, which exercises the same capsule export via add_physical_optimizer_rule. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent af38866 commit d021e6a

6 files changed

Lines changed: 202 additions & 1 deletion

File tree

crates/core/src/context.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
5959
use datafusion_python_util::{
6060
create_logical_extension_capsule, create_physical_extension_capsule,
6161
ffi_logical_codec_from_pycapsule, get_global_ctx, get_tokio_runtime,
62-
physical_codec_from_pycapsule, spawn_future, wait_for_future,
62+
physical_codec_from_pycapsule, physical_optimizer_rule_from_pycapsule, spawn_future,
63+
wait_for_future,
6364
};
6465
use object_store::ObjectStore;
6566
use pyo3::IntoPyObjectExt;
@@ -1195,6 +1196,17 @@ impl PySessionContext {
11951196
self.ctx.remove_optimizer_rule(name)
11961197
}
11971198

1199+
pub fn add_physical_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> PyDataFusionResult<()> {
1200+
let rule = physical_optimizer_rule_from_pycapsule(&rule)?;
1201+
let state_ref = self.ctx.state_ref();
1202+
let mut guard = state_ref.write();
1203+
let new_state = SessionStateBuilder::new_from_existing(guard.clone())
1204+
.with_physical_optimizer_rule(rule)
1205+
.build();
1206+
*guard = new_state;
1207+
Ok(())
1208+
}
1209+
11981210
pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
11991211
let provider = wait_for_future(py, self.ctx.table_provider(name))
12001212
// Outer error: runtime/async failure

crates/util/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use datafusion::datasource::TableProvider;
2424
use datafusion::execution::TaskContext;
2525
use datafusion::execution::context::SessionContext;
2626
use datafusion::logical_expr::Volatility;
27+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
2728
use datafusion_ffi::execution::FFI_TaskContextProvider;
29+
use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
2830
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2931
use datafusion_ffi::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
3032
use datafusion_ffi::table_provider::FFI_TableProvider;
@@ -332,6 +334,13 @@ from_pycapsule!(
332334
dyn PhysicalExtensionCodec
333335
);
334336

337+
from_pycapsule!(
338+
physical_optimizer_rule_from_pycapsule,
339+
"datafusion_physical_optimizer_rule",
340+
FFI_PhysicalOptimizerRule,
341+
dyn PhysicalOptimizerRule + Send + Sync
342+
);
343+
335344
try_from_pycapsule!(
336345
task_context_from_pycapsule,
337346
"datafusion_task_context_provider",
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pyarrow as pa
21+
from datafusion import SessionContext
22+
from datafusion_ffi_example import MyPhysicalOptimizerRule
23+
24+
25+
def test_ffi_physical_optimizer_rule_runs_during_planning():
26+
"""A rule added via add_physical_optimizer_rule is invoked while the
27+
physical plan is built, and the query still returns correct results."""
28+
rule = MyPhysicalOptimizerRule()
29+
ctx = SessionContext()
30+
ctx.add_physical_optimizer_rule(rule)
31+
batch = pa.RecordBatch.from_arrays(
32+
[pa.array([1, 2, 3])],
33+
names=["a"],
34+
)
35+
ctx.register_record_batches("t", [[batch]])
36+
37+
before = rule.optimize_calls()
38+
result = ctx.sql("SELECT a FROM t").collect()
39+
after = rule.optimize_calls()
40+
41+
assert after > before, (
42+
f"Expected user FFI physical optimizer rule to fire, "
43+
f"before={before} after={after}"
44+
)
45+
assert result[0].column(0).to_pylist() == [1, 2, 3]

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogP
2222
use crate::config::MyConfig;
2323
use crate::logical_extension_codec::MyLogicalExtensionCodec;
2424
use crate::physical_extension_codec::MyPhysicalExtensionCodec;
25+
use crate::physical_optimizer::MyPhysicalOptimizerRule;
2526
use crate::scalar_udf::IsNullUDF;
2627
use crate::table_function::MyTableFunction;
2728
use crate::table_provider::MyTableProvider;
@@ -33,6 +34,7 @@ pub(crate) mod catalog_provider;
3334
pub(crate) mod config;
3435
pub(crate) mod logical_extension_codec;
3536
pub(crate) mod physical_extension_codec;
37+
pub(crate) mod physical_optimizer;
3638
pub(crate) mod scalar_udf;
3739
pub(crate) mod table_function;
3840
pub(crate) mod table_provider;
@@ -55,5 +57,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
5557
m.add_class::<MyConfig>()?;
5658
m.add_class::<MyLogicalExtensionCodec>()?;
5759
m.add_class::<MyPhysicalExtensionCodec>()?;
60+
m.add_class::<MyPhysicalOptimizerRule>()?;
5861
Ok(())
5962
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
use std::sync::atomic::{AtomicUsize, Ordering};
20+
21+
use datafusion::common::Result;
22+
use datafusion::common::config::ConfigOptions;
23+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
24+
use datafusion::physical_plan::ExecutionPlan;
25+
use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
26+
use datafusion_python_util::get_tokio_runtime;
27+
use pyo3::prelude::*;
28+
use pyo3::types::PyCapsule;
29+
30+
/// A physical optimizer rule that leaves every plan unchanged but bumps a
31+
/// shared counter each time it runs. Tests use the counter to prove that a
32+
/// session built with this rule actually routed physical planning through a
33+
/// user-supplied [`PhysicalOptimizerRule`] over FFI.
34+
#[derive(Debug)]
35+
struct CountingPhysicalOptimizerRule {
36+
optimize_calls: Arc<AtomicUsize>,
37+
}
38+
39+
impl PhysicalOptimizerRule for CountingPhysicalOptimizerRule {
40+
fn optimize(
41+
&self,
42+
plan: Arc<dyn ExecutionPlan>,
43+
_config: &ConfigOptions,
44+
) -> Result<Arc<dyn ExecutionPlan>> {
45+
self.optimize_calls.fetch_add(1, Ordering::SeqCst);
46+
Ok(plan)
47+
}
48+
49+
fn name(&self) -> &str {
50+
"counting_physical_optimizer_rule"
51+
}
52+
53+
fn schema_check(&self) -> bool {
54+
// The plan is returned unchanged, so the schema is preserved.
55+
true
56+
}
57+
}
58+
59+
/// Python-visible handle that produces an [`FFI_PhysicalOptimizerRule`] and
60+
/// exposes the shared call counter.
61+
#[pyclass(
62+
from_py_object,
63+
name = "MyPhysicalOptimizerRule",
64+
module = "datafusion_ffi_example",
65+
subclass
66+
)]
67+
#[derive(Debug, Default, Clone)]
68+
pub(crate) struct MyPhysicalOptimizerRule {
69+
optimize_calls: Arc<AtomicUsize>,
70+
}
71+
72+
#[pymethods]
73+
impl MyPhysicalOptimizerRule {
74+
#[new]
75+
fn new() -> Self {
76+
Self::default()
77+
}
78+
79+
fn optimize_calls(&self) -> usize {
80+
self.optimize_calls.load(Ordering::SeqCst)
81+
}
82+
83+
fn __datafusion_physical_optimizer_rule__<'py>(
84+
&self,
85+
py: Python<'py>,
86+
) -> PyResult<Bound<'py, PyCapsule>> {
87+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
88+
Arc::new(CountingPhysicalOptimizerRule {
89+
optimize_calls: Arc::clone(&self.optimize_calls),
90+
});
91+
92+
let runtime = get_tokio_runtime().handle().clone();
93+
let ffi = FFI_PhysicalOptimizerRule::new(rule, Some(runtime));
94+
95+
let name = cr"datafusion_physical_optimizer_rule".into();
96+
PyCapsule::new(py, ffi, Some(name))
97+
}
98+
}

python/datafusion/context.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ class TableProviderExportable(Protocol):
133133
def __datafusion_table_provider__(self, session: Any) -> object: ... # noqa: D105
134134

135135

136+
class PhysicalOptimizerRuleExportable(Protocol):
137+
"""Type hint for object that has __datafusion_physical_optimizer_rule__ PyCapsule.
138+
139+
The method returns a PyCapsule wrapping an ``FFI_PhysicalOptimizerRule``,
140+
typically produced by a separate compiled extension.
141+
"""
142+
143+
def __datafusion_physical_optimizer_rule__(self) -> object: ... # noqa: D105
144+
145+
136146
class SessionConfig:
137147
"""Session configuration options."""
138148

@@ -1566,6 +1576,30 @@ def remove_optimizer_rule(self, name: str) -> bool:
15661576
"""
15671577
return self.ctx.remove_optimizer_rule(name)
15681578

1579+
def add_physical_optimizer_rule(
1580+
self, rule: PhysicalOptimizerRuleExportable
1581+
) -> None:
1582+
"""Append a user-defined physical optimizer rule to the session.
1583+
1584+
The rule is imported via its ``__datafusion_physical_optimizer_rule__``
1585+
PyCapsule, typically produced by a separate compiled extension. The
1586+
underlying :class:`SessionState` is rebuilt from its current state
1587+
with the new rule appended, so previously registered tables, UDFs,
1588+
and catalogs are preserved.
1589+
1590+
Args:
1591+
rule: Object exposing ``__datafusion_physical_optimizer_rule__``,
1592+
a :class:`PhysicalOptimizerRuleExportable`.
1593+
1594+
Examples:
1595+
>>> from datafusion import SessionContext
1596+
>>> ctx = SessionContext()
1597+
>>> from my_extension import MyPhysicalOptimizerRule # doctest: +SKIP
1598+
>>> rule = MyPhysicalOptimizerRule() # doctest: +SKIP
1599+
>>> ctx.add_physical_optimizer_rule(rule) # doctest: +SKIP
1600+
"""
1601+
self.ctx.add_physical_optimizer_rule(rule)
1602+
15691603
def table_provider(self, name: str) -> Table:
15701604
"""Return the :py:class:`~datafusion.catalog.Table` for the given table name.
15711605

0 commit comments

Comments
 (0)