Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ fn get_session_config(args: &Args) -> Result<SessionConfig> {
if batch_size == 0 {
return config_err!("batch_size must be greater than 0");
}
config_options.execution.batch_size = batch_size;
config_options.execution.batch_size =
datafusion_common::config::ConfigNonZeroUsize::try_new(batch_size)?;
};

// use easier to understand "tree" mode by default
Expand Down
85 changes: 83 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_ipc::CompressionType;

#[cfg(feature = "parquet_encryption")]
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::error::{_config_datafusion_err, _config_err};
use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
use crate::parquet_config::DFParquetWriterVersion;
use crate::parsers::{CompressionTypeVariant, CsvQuoteStyle};
Expand All @@ -33,6 +33,7 @@ use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::num::NonZeroUsize;
use std::str::FromStr;
#[cfg(feature = "parquet_encryption")]
use std::sync::Arc;
Expand Down Expand Up @@ -582,6 +583,86 @@ impl Display for SpillCompression {
}
}

/// A `usize` configuration value that rejects zero when set from strings.
///
/// Use this for options where zero is never a meaningful runtime value.
/// Invalid values return a configuration error through [`ConfigField`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ConfigNonZeroUsize(NonZeroUsize);

/// Private helper for hard-coded defaults in `config_namespace!`, which cannot
/// use `?`. All external construction should use [`ConfigNonZeroUsize::try_new`].
const fn non_zero_usize_default(value: usize) -> ConfigNonZeroUsize {
match NonZeroUsize::new(value) {
Some(value) => ConfigNonZeroUsize(value),
None => panic!("value must be greater than 0"),
}
}

impl ConfigNonZeroUsize {
/// Creates a [`ConfigNonZeroUsize`], returning a configuration error if
/// `value` is zero.
pub fn try_new(value: usize) -> Result<Self> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe new with Result<Self> is more obvious choice?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment.

NonZeroUsize::new(value)
.map(Self)
.ok_or_else(|| _config_datafusion_err!("value must be greater than 0"))
}

/// Returns the wrapped `usize`.
pub const fn get(self) -> usize {
self.0.get()
}
}

impl From<ConfigNonZeroUsize> for usize {
fn from(value: ConfigNonZeroUsize) -> Self {
value.get()
}
}

impl FromStr for ConfigNonZeroUsize {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::try_new(default_config_transform(s)?)
}
}

impl ConfigField for ConfigNonZeroUsize {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
if !key.is_empty() {
return _config_err!(
"Config field batch_size is a scalar ConfigNonZeroUsize and does not have nested field \"{}\"",
key
);
}

*self = ConfigNonZeroUsize::from_str(value)?;
Ok(())
}

fn reset(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
Ok(())
} else {
_config_err!(
"Config field batch_size is a scalar ConfigNonZeroUsize and does not have nested field \"{}\"",
key
)
}
}
}

impl Display for ConfigNonZeroUsize {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.get())
}
}

/// Policy for handling duplicate keys in Spark-compatible map-construction
/// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors
/// Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961).
Expand Down Expand Up @@ -649,7 +730,7 @@ config_namespace! {
/// Default batch size while creating new batches, it's especially useful for
/// buffer-in-memory batches since creating tiny batches would result in too much
/// metadata memory consumption
pub batch_size: usize, default = 8192
pub batch_size: ConfigNonZeroUsize, default = non_zero_usize_default(8192)

/// A perfect hash join (see `HashJoinExec` for more details) will be considered
/// if the range of keys (max - min) on the build side is < this threshold.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn from_env() {
// for valid testing
env::set_var(env_key, "4096");
let config = ConfigOptions::from_env().unwrap();
assert_eq!(config.execution.batch_size, 4096);
assert_eq!(config.execution.batch_size.get(), 4096);

// for invalid testing
env::set_var(env_key, "abc");
Expand All @@ -57,6 +57,6 @@ fn from_env() {

env::remove_var(env_key);
let config = ConfigOptions::from_env().unwrap();
assert_eq!(config.execution.batch_size, 8192); // set to its default value
assert_eq!(config.execution.batch_size.get(), 8192); // set to its default value
}
}
3 changes: 2 additions & 1 deletion datafusion/core/tests/execution/datasource_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn datasource_splits_large_batches() -> datafusion_common::Result<()> {
.options()
.execution
.batch_size
.get()
);
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, batch_size);
Expand All @@ -70,7 +71,7 @@ async fn datasource_splits_large_batches() -> datafusion_common::Result<()> {
#[tokio::test]
async fn datasource_exact_batch_size_no_split() -> datafusion_common::Result<()> {
let session_config = datafusion_execution::config::SessionConfig::new();
let configured_batch_size = session_config.options().execution.batch_size;
let configured_batch_size = session_config.options().execution.batch_size.get();

let batches = create_and_collect_batches(configured_batch_size).await?;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ async fn predicate_cache_stats_issue_19561() -> datafusion_common::Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
// force to get multiple batches to trigger repeated metric compound bug
config.options_mut().execution.batch_size = 1;
config.options_mut().execution.batch_size =
datafusion_common::config::ConfigNonZeroUsize::try_new(1)?;
let ctx = SessionContext::new_with_config(config);
// The cache is on by default, and used when filter pushdown is enabled
PredicateCacheTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,9 @@ fn test_suite_default_config_options() -> ConfigOptions {
config.execution.target_partitions = 10;

// Use a small batch size, to trigger RoundRobin in tests
config.execution.batch_size = 1;
config.execution.batch_size =
datafusion_common::config::ConfigNonZeroUsize::try_new(1)
.expect("test batch size must be greater than zero");

config
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn test_multiple_configs() {
assert!(result.is_ok(), "Should not fail due to memory limit");

let state = ctx.state();
let batch_size = state.config().options().execution.batch_size;
let batch_size = state.config().options().execution.batch_size.get();
assert_eq!(batch_size, 2048);
}

Expand Down
26 changes: 14 additions & 12 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{collections::HashMap, sync::Arc};

use datafusion_common::{
Result, ScalarValue,
config::{ConfigExtension, ConfigOptions, SpillCompression},
config::{ConfigExtension, ConfigNonZeroUsize, ConfigOptions, SpillCompression},
extensions::Extensions,
};

Expand Down Expand Up @@ -51,7 +51,7 @@ use datafusion_common::{
/// .set_bool("datafusion.execution.parquet.pushdown_filters", true);
///
/// assert_eq!(config.batch_size(), 1234);
/// assert_eq!(config.options().execution.batch_size, 1234);
/// assert_eq!(config.options().execution.batch_size.get(), 1234);
/// assert_eq!(config.options().execution.parquet.pushdown_filters, true);
/// ```
///
Expand All @@ -60,15 +60,16 @@ use datafusion_common::{
///
/// ```
/// # use datafusion_execution::config::SessionConfig;
/// # use datafusion_common::ScalarValue;
/// # use datafusion_common::config::ConfigNonZeroUsize;
/// #
/// let mut config = SessionConfig::new();
/// config.options_mut().execution.batch_size = 1234;
/// config.options_mut().execution.batch_size = ConfigNonZeroUsize::try_new(1234)?;
/// config.options_mut().execution.parquet.pushdown_filters = true;
/// #
/// # assert_eq!(config.batch_size(), 1234);
/// # assert_eq!(config.options().execution.batch_size, 1234);
/// # assert_eq!(config.options().execution.batch_size.get(), 1234);
/// # assert_eq!(config.options().execution.parquet.pushdown_filters, true);
/// # datafusion_common::Result::<()>::Ok(())
/// ```
///
/// ## Built-in options
Expand Down Expand Up @@ -137,7 +138,7 @@ impl SessionConfig {
/// use datafusion_execution::config::SessionConfig;
///
/// let config = SessionConfig::new();
/// assert!(config.options().execution.batch_size > 0);
/// assert!(config.options().execution.batch_size.get() > 0);
/// ```
pub fn options(&self) -> &Arc<ConfigOptions> {
&self.options
Expand All @@ -148,11 +149,13 @@ impl SessionConfig {
/// Can be used to set configuration options.
///
/// ```
/// use datafusion_common::config::ConfigNonZeroUsize;
/// use datafusion_execution::config::SessionConfig;
///
/// let mut config = SessionConfig::new();
/// config.options_mut().execution.batch_size = 1024;
/// assert_eq!(config.options().execution.batch_size, 1024);
/// config.options_mut().execution.batch_size = ConfigNonZeroUsize::try_new(1024)?;
/// assert_eq!(config.options().execution.batch_size.get(), 1024);
/// # datafusion_common::Result::<()>::Ok(())
/// ```
pub fn options_mut(&mut self) -> &mut ConfigOptions {
Arc::make_mut(&mut self.options)
Expand Down Expand Up @@ -186,9 +189,8 @@ impl SessionConfig {

/// Customize batch size
pub fn with_batch_size(mut self, n: usize) -> Self {
// batch size must be greater than zero
assert!(n > 0);
self.options_mut().execution.batch_size = n;
self.options_mut().execution.batch_size =
ConfigNonZeroUsize::try_new(n).expect("batch size must be greater than zero");
self
}

Expand Down Expand Up @@ -391,7 +393,7 @@ impl SessionConfig {

/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.options.execution.batch_size
self.options.execution.batch_size.get()
}

/// Enables or disables the coalescence of small batches into larger batches
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-table/src/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl TableProvider for GenerateSeriesTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let batch_size = state.config_options().execution.batch_size;
let batch_size = state.config_options().execution.batch_size.get();
let generator = self.as_generator(batch_size)?;
let mut exec = LazyMemoryExec::try_new(self.schema(), vec![generator])?
.with_projection(projection.cloned());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ pub fn ensure_distribution(
// When `false`, round robin repartition will not be added to increase parallelism
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let batch_size = config.execution.batch_size;
let batch_size = config.execution.batch_size.get();
let should_use_estimates = config
.execution
.use_row_number_estimates_to_optimize_partitioning;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl ExecutionPlan for AsyncFuncExec {
input_stream,
batch_coalescer: LimitedBatchCoalescer::new(
Arc::clone(&self.input.schema()),
config_options_ref.execution.batch_size,
config_options_ref.execution.batch_size.get(),
None,
),
};
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ mod tests {
.with_memory_limit(20_000_000, 1.0)
.build_arc()?;
let mut config = SessionConfig::new();
config.options_mut().execution.batch_size = target_batch_size;
config.options_mut().execution.batch_size =
datafusion_common::config::ConfigNonZeroUsize::try_new(target_batch_size)?;
let task_ctx = TaskContext::default()
.with_runtime(runtime)
.with_session_config(config);
Expand Down
10 changes: 7 additions & 3 deletions datafusion/sqllogictest/test_files/set_variable.slt
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ statement ok
set datafusion.catalog.information_schema = true

statement ok
SET datafusion.execution.batch_size to 0
SET datafusion.execution.batch_size to 310104

query TT
SHOW datafusion.execution.batch_size
----
datafusion.execution.batch_size 0
datafusion.execution.batch_size 310104

statement ok
SET datafusion.execution.batch_size to '1'
Expand Down Expand Up @@ -382,7 +382,7 @@ statement error DataFusion error: Invalid or Unsupported Configuration: Config v
RESET datafusion.execution.batches_size

# reset invalid variable - extra suffix on valid field
statement error DataFusion error: Invalid or Unsupported Configuration: Config field is a scalar usize and does not have nested field "bar"
statement error DataFusion error: Invalid or Unsupported Configuration: Config field batch_size is a scalar ConfigNonZeroUsize and does not have nested field "bar"
RESET datafusion.execution.batch_size.bar

#############################################
Expand Down Expand Up @@ -707,6 +707,10 @@ SET datafusion.runtime.list_files_cache_ttl = '1m18446744073709551555s'
statement error DataFusion error: Error during planning: Duration has overflowed allowed maximum limit due to 'mins \* 60 \+ secs' when setting 'datafusion\.runtime\.list_files_cache_ttl'
SET datafusion.runtime.list_files_cache_ttl = '1m18446744073709551556s'

# Set invalid value and ensures error
statement error DataFusion error: Invalid or Unsupported Configuration: value must be greater than 0
SET datafusion.execution.batch_size = 0

# Config reset
statement ok
RESET datafusion.catalog.create_default_catalog_and_schema
Expand Down
Loading