From 2bea13b58d7b1d56f0fbc0c480a7b49db8ad63ef Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 24 Jun 2026 16:29:53 +0100 Subject: [PATCH] [cpp] Encode CreateTable column types losslessly --- bindings/cpp/include/fluss.hpp | 19 +- bindings/cpp/src/admin.cpp | 30 -- bindings/cpp/src/ffi_converter.hpp | 259 +++++++------ bindings/cpp/src/lib.rs | 72 +--- bindings/cpp/src/type_lowering.hpp | 15 +- bindings/cpp/src/types.rs | 382 ++++++++----------- bindings/cpp/test/test_ffi_converter.cpp | 335 ++++++++-------- bindings/cpp/test/test_kv_table.cpp | 300 ++++++++------- bindings/cpp/test/test_log_table.cpp | 221 +++++------ website/docs/user-guide/cpp/api-reference.md | 1 - website/docs/user-guide/cpp/data-types.md | 9 +- 11 files changed, 741 insertions(+), 902 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 6987e683..74800aa5 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -434,10 +434,6 @@ struct Column { struct Schema { std::vector columns; std::vector primary_keys; - /// When set (via FromArrow), the table's column types come from this Arrow - /// schema instead of `columns` — the only way to express nested MAP/ROW - /// columns. `columns` stays empty in that case. - std::shared_ptr arrow_schema; class Builder { public: @@ -459,15 +455,6 @@ struct Schema { }; static Builder NewBuilder() { return Builder(); } - - /// Build a Schema whose column types come from an Arrow schema. Use this - /// for tables with nested MAP/ROW columns (`arrow::map()`, `arrow::struct_()`); - /// `Admin::CreateTable` routes Arrow-backed schemas through the C Data - /// Interface automatically. - static Schema FromArrow(std::shared_ptr arrow_schema, - std::vector primary_keys = {}) { - return Schema{{}, std::move(primary_keys), std::move(arrow_schema)}; - } }; struct TableDescriptor { @@ -1457,9 +1444,9 @@ class Admin { bool Available() const; - /// Creates a table. For nested MAP/ROW columns, build `descriptor`'s schema - /// via `Schema::FromArrow(...)` — `CreateTable` routes Arrow-backed schemas - /// through the C Data Interface automatically. + /// Creates a table. Column types — including nested ARRAY/MAP/ROW built + /// with `DataType::Array`/`Map`/`Row` — are carried to the server exactly + /// as declared (precision, scale, length, nullability, and field names). Result CreateTable(const TablePath& table_path, const TableDescriptor& descriptor, bool ignore_if_exists = false); diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp index 4ebc6f45..2459f41c 100644 --- a/bindings/cpp/src/admin.cpp +++ b/bindings/cpp/src/admin.cpp @@ -60,36 +60,6 @@ Result Admin::CreateTable(const TablePath& table_path, const TableDescriptor& de } auto ffi_path = utils::to_ffi_table_path(table_path); - - // A MAP/ROW column can't go through the flat FFI encoding, so the schema is - // sent over Arrow instead (explicit via FromArrow, or lowered from native - // columns). Rust derives the columns from it, so the flat columns are dropped - // here; primary keys / metadata still come from the descriptor. - std::shared_ptr arrow_schema = descriptor.schema.arrow_schema; - if (!arrow_schema) { - for (const auto& col : descriptor.schema.columns) { - if (detail::is_compound(col.data_type)) { - arrow_schema = detail::columns_to_arrow_schema(descriptor.schema.columns); - break; - } - } - } - - if (arrow_schema) { - TableDescriptor arrow_desc = descriptor; - arrow_desc.schema.columns.clear(); - auto ffi_desc = utils::to_ffi_table_descriptor(arrow_desc); - size_t schema_ptr = 0; - try { - schema_ptr = detail::export_arrow_schema(*arrow_schema); - } catch (const std::exception& e) { - return utils::make_client_error(e.what()); - } - auto ffi_result = - admin_->create_table_arrow(ffi_path, ffi_desc, schema_ptr, ignore_if_exists); - return utils::from_ffi_result(ffi_result); - } - auto ffi_desc = utils::to_ffi_table_descriptor(descriptor); auto ffi_result = admin_->create_table(ffi_path, ffi_desc, ignore_if_exists); return utils::from_ffi_result(ffi_result); diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 89a70182..92115394 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -28,77 +28,159 @@ namespace fluss { namespace utils { -/// Compact FFI representation of a (possibly nested) array type. -/// -/// `nesting` counts the number of ARRAY wrappers stripped to reach the leaf -/// element type. `leaf_type`/`leaf_precision`/`leaf_scale` describe that leaf -/// scalar. A non-array input produces a zero-initialised value (nesting == 0). -/// `array_nullability` has `nesting + 1` entries: one per ARRAY wrapper -/// (outermost first) plus a trailing entry for the leaf scalar's nullability. -/// -/// Using a flat representation — rather than serialising a recursive -/// `DataType` — keeps the cxx bridge contract small while preserving schema -/// fidelity across the FFI boundary when paired with rebuild_array_type(). +/// Flattened view of an `ARRAY>>` element type, used by the +/// data-writer path: `nesting` counts the ARRAY wrappers stripped to reach the +/// leaf scalar, which `leaf_*` describe. A non-array input yields `nesting == 0` +/// and callers use the type's own id/precision/scale. struct FlattenedArrayType { int32_t nesting{0}; int32_t leaf_type{0}; int32_t leaf_precision{0}; int32_t leaf_scale{0}; - std::vector array_nullability; }; -/// Flattens an `ARRAY>>` DataType into a FlattenedArrayType. -/// -/// Contract: -/// - If `data_type` is not an ARRAY, returns a zero-valued FlattenedArrayType -/// and callers must use the column's own `id/precision/scale` instead. -/// - If `data_type` is an ARRAY but has a null element_type() chain (which -/// should only happen on malformed input), returns a zero-valued result to -/// signal the caller to reject the schema. -/// - Otherwise, `nesting >= 1`, array_nullability has `nesting + 1` entries -/// (last = leaf scalar nullability), and leaf_* describe the innermost scalar. inline FlattenedArrayType flatten_array_type(const DataType& data_type) { FlattenedArrayType out; if (data_type.id() != TypeId::Array) { return out; } - const DataType* current = &data_type; while (current && current->id() == TypeId::Array) { out.nesting += 1; - out.array_nullability.push_back(current->nullable() ? 1 : 0); current = current->element_type(); } if (!current) { return FlattenedArrayType{}; } - out.leaf_type = static_cast(current->id()); out.leaf_precision = current->precision(); out.leaf_scale = current->scale(); - out.array_nullability.push_back(current->nullable() ? 1 : 0); return out; } -/// Inverse of flatten_array_type: rebuilds an `ARRAY>>` type -/// from the compact flat form. Requires `flat.nesting >= 1`; callers handle -/// the `nesting == 0` case by using a plain scalar DataType directly. -/// `array_nullability` must have `nesting + 1` entries (last = leaf). -inline DataType rebuild_array_type(const FlattenedArrayType& flat) { - bool leaf_nullable = (static_cast(flat.nesting) < flat.array_nullability.size()) - ? (flat.array_nullability[static_cast(flat.nesting)] != 0) - : true; - DataType dt(static_cast(flat.leaf_type), flat.leaf_precision, flat.leaf_scale, - leaf_nullable); - for (int32_t i = flat.nesting - 1; i >= 0; --i) { - bool nullable = (static_cast(i) < flat.array_nullability.size()) - ? (flat.array_nullability[static_cast(i)] != 0) - : true; - auto arr = DataType::Array(std::move(dt)); - if (!nullable) { - arr = arr.NotNull(); +/// Serialize a type tree into a preorder node arena (see `FfiTypeNode`): the +/// node itself, then ARRAY's element / MAP's key+value / ROW's field subtrees. +/// `field_name` is set only for ROW field nodes. Inverse of `nodes_to_data_type`. +inline void data_type_to_nodes(const DataType& dt, const std::string& field_name, + rust::Vec& out) { + ffi::FfiTypeNode node; + node.type_id = static_cast(dt.id()); + node.nullable = dt.nullable(); + node.precision = dt.precision(); + node.scale = dt.scale(); + node.field_name = rust::String(field_name); + switch (dt.id()) { + case TypeId::Array: + node.child_count = 1; + break; + case TypeId::Map: + node.child_count = 2; + break; + case TypeId::Row: + node.child_count = static_cast(dt.field_count()); + break; + default: + node.child_count = 0; + break; + } + out.push_back(std::move(node)); + + switch (dt.id()) { + case TypeId::Array: + data_type_to_nodes(*dt.element_type(), "", out); + break; + case TypeId::Map: + data_type_to_nodes(*dt.key_type(), "", out); + data_type_to_nodes(*dt.value_type(), "", out); + break; + case TypeId::Row: + for (size_t i = 0; i < dt.field_count(); ++i) { + data_type_to_nodes(*dt.field_type(i), dt.field_name(i), out); + } + break; + default: + break; + } +} + +/// Builds a scalar DataType from a leaf node, restoring precision/scale/length. +inline DataType scalar_node_to_data_type(const ffi::FfiTypeNode& node) { + switch (static_cast(node.type_id)) { + case TypeId::Boolean: + return DataType::Boolean(); + case TypeId::TinyInt: + return DataType::TinyInt(); + case TypeId::SmallInt: + return DataType::SmallInt(); + case TypeId::Int: + return DataType::Int(); + case TypeId::BigInt: + return DataType::BigInt(); + case TypeId::Float: + return DataType::Float(); + case TypeId::Double: + return DataType::Double(); + case TypeId::String: + return DataType::String(); + case TypeId::Bytes: + return DataType::Bytes(); + case TypeId::Date: + return DataType::Date(); + case TypeId::Time: + return DataType::Time(); + case TypeId::Timestamp: + return DataType::Timestamp(node.precision); + case TypeId::TimestampLtz: + return DataType::TimestampLtz(node.precision); + case TypeId::Decimal: + return DataType::Decimal(node.precision, node.scale); + case TypeId::Char: + return DataType::Char(node.precision); + case TypeId::Binary: + return DataType::Binary(node.precision); + default: + throw std::runtime_error("Unknown scalar type id in type tree: " + + std::to_string(node.type_id)); + } +} + +/// Inverse of `data_type_to_nodes`: reconstruct one type from the arena, +/// advancing `cursor` past the nodes it consumes. +inline DataType nodes_to_data_type(const rust::Vec& nodes, size_t& cursor) { + if (cursor >= nodes.size()) { + throw std::runtime_error("type tree ended before all nodes were read"); + } + const ffi::FfiTypeNode& node = nodes[cursor++]; + DataType dt = [&]() -> DataType { + switch (static_cast(node.type_id)) { + case TypeId::Array: { + DataType element = nodes_to_data_type(nodes, cursor); + return DataType::Array(std::move(element)); + } + case TypeId::Map: { + DataType key = nodes_to_data_type(nodes, cursor); + DataType value = nodes_to_data_type(nodes, cursor); + return DataType::Map(std::move(key), std::move(value)); + } + case TypeId::Row: { + std::vector> fields; + fields.reserve(node.child_count); + for (uint32_t i = 0; i < node.child_count; ++i) { + if (cursor >= nodes.size()) { + throw std::runtime_error("ROW field missing from type tree"); + } + std::string fname(nodes[cursor].field_name); + DataType ftype = nodes_to_data_type(nodes, cursor); + fields.emplace_back(std::move(fname), std::move(ftype)); + } + return DataType::Row(std::move(fields)); + } + default: + return scalar_node_to_data_type(node); } - dt = std::move(arr); + }(); + if (!node.nullable) { + dt = dt.NotNull(); } return dt; } @@ -168,25 +250,8 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { inline ffi::FfiColumn to_ffi_column(const Column& col) { ffi::FfiColumn ffi_col; ffi_col.name = rust::String(col.name); - ffi_col.data_type = static_cast(col.data_type.id()); - ffi_col.nullable = col.data_type.nullable(); ffi_col.comment = rust::String(col.comment); - ffi_col.precision = col.data_type.precision(); - ffi_col.scale = col.data_type.scale(); - auto flat = flatten_array_type(col.data_type); - ffi_col.array_nesting = flat.nesting; - for (auto nullable : flat.array_nullability) { - ffi_col.array_nullability.push_back(nullable); - } - if (flat.nesting > 0 && flat.leaf_type != 0) { - ffi_col.element_data_type = flat.leaf_type; - ffi_col.element_precision = flat.leaf_precision; - ffi_col.element_scale = flat.leaf_scale; - } else { - ffi_col.element_data_type = 0; - ffi_col.element_precision = 0; - ffi_col.element_scale = 0; - } + data_type_to_nodes(col.data_type, "", ffi_col.type_nodes); return ffi_col; } @@ -251,72 +316,12 @@ inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& de } inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) { - auto type_id = static_cast(ffi_col.data_type); - if (type_id == TypeId::Array) { - auto is_supported_leaf_type = [](int32_t leaf_type) { - switch (static_cast(leaf_type)) { - case TypeId::Boolean: - case TypeId::TinyInt: - case TypeId::SmallInt: - case TypeId::Int: - case TypeId::BigInt: - case TypeId::Float: - case TypeId::Double: - case TypeId::String: - case TypeId::Bytes: - case TypeId::Date: - case TypeId::Time: - case TypeId::Timestamp: - case TypeId::TimestampLtz: - case TypeId::Decimal: - case TypeId::Char: - case TypeId::Binary: - return true; - default: - return false; - } - }; - // ROW/MAP element schema can't pass through the flat FFI column; give the - // array a non-null element of the right kind so element_type() is safe to deref. - auto element_id = static_cast(ffi_col.element_data_type); - if (element_id == TypeId::Map || element_id == TypeId::Row) { - return Column{std::string(ffi_col.name), DataType::Array(DataType(element_id)), - std::string(ffi_col.comment)}; - } - if (ffi_col.element_data_type == 0) { - throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) + - "': missing element_data_type"); - } - if (ffi_col.array_nesting < 0) { - throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) + - "': array_nesting must be non-negative"); - } - if (ffi_col.element_data_type == static_cast(TypeId::Array)) { - throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) + - "': leaf element_data_type cannot be ARRAY"); - } - if (!is_supported_leaf_type(ffi_col.element_data_type)) { - throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) + - "': unsupported leaf element_data_type " + - std::to_string(ffi_col.element_data_type)); - } - - int32_t nesting = ffi_col.array_nesting > 0 ? ffi_col.array_nesting : 1; - std::vector array_nullability; - for (auto nullable : ffi_col.array_nullability) { - array_nullability.push_back(nullable); - } - auto dt = rebuild_array_type( - FlattenedArrayType{ - nesting, - ffi_col.element_data_type, - ffi_col.element_precision, - ffi_col.element_scale, - std::move(array_nullability), - }); - return Column{std::string(ffi_col.name), std::move(dt), std::string(ffi_col.comment)}; + size_t cursor = 0; + DataType dt = nodes_to_data_type(ffi_col.type_nodes, cursor); + if (cursor != ffi_col.type_nodes.size()) { + throw std::runtime_error("Column '" + std::string(ffi_col.name) + + "': type tree has trailing nodes"); } - DataType dt(type_id, ffi_col.precision, ffi_col.scale, ffi_col.nullable); return Column{std::string(ffi_col.name), std::move(dt), std::string(ffi_col.comment)}; } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 2d62136a..3b59f950 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -90,18 +90,29 @@ mod ffi { table_name: String, } - struct FfiColumn { - name: String, - data_type: i32, + // One node of a type tree, serialized in preorder. A column's type is the + // sequence of nodes starting at its root; ARRAY is followed by its element + // subtree, MAP by its key then value subtrees, ROW by its `child_count` + // field subtrees (each field node carries its `field_name`). This carries + // the full recursive type losslessly — exact precision/scale/length, + // nullability at every level, and ROW field names. + struct FfiTypeNode { + type_id: i32, nullable: bool, - comment: String, + // Decimal/Timestamp/TimestampLtz precision, or Char/Binary length; 0 otherwise. precision: i32, + // Decimal scale; 0 otherwise. scale: i32, - array_nesting: i32, - array_nullability: Vec, - element_data_type: i32, - element_precision: i32, - element_scale: i32, + // Field name when this node is a ROW field; empty otherwise. + field_name: String, + // Immediate children: 0 scalar, 1 ARRAY, 2 MAP, N ROW fields. + child_count: u32, + } + + struct FfiColumn { + name: String, + comment: String, + type_nodes: Vec, } struct FfiSchema { @@ -317,17 +328,6 @@ mod ffi { descriptor: &FfiTableDescriptor, ignore_if_exists: bool, ) -> FfiResult; - // Create a table whose columns come from an Arrow schema (C Data - // Interface pointer), supporting nested MAP/ROW columns. `descriptor` - // supplies primary keys + table-level metadata; its `schema.columns` - // are ignored in favour of `arrow_schema_ptr`. - fn create_table_arrow( - self: &Admin, - table_path: &FfiTablePath, - descriptor: &FfiTableDescriptor, - arrow_schema_ptr: usize, - ignore_if_exists: bool, - ) -> FfiResult; fn drop_table( self: &Admin, table_path: &FfiTablePath, @@ -1078,38 +1078,6 @@ impl Admin { } } - fn create_table_arrow( - &self, - table_path: &ffi::FfiTablePath, - descriptor: &ffi::FfiTableDescriptor, - arrow_schema_ptr: usize, - ignore_if_exists: bool, - ) -> ffi::FfiResult { - let path = fcore::metadata::TablePath::new( - table_path.database_name.clone(), - table_path.table_name.clone(), - ); - - // Safety: C++ exports the schema via `arrow::ExportSchema` into a heap - // `FFI_ArrowSchema` whose pointer is passed here; ownership transfers. - let core_descriptor = - match unsafe { types::arrow_ffi_to_core_descriptor(arrow_schema_ptr, descriptor) } { - Ok(d) => d, - Err(e) => return client_err(e.to_string()), - }; - - let result = RUNTIME.block_on(async { - self.inner - .create_table(&path, &core_descriptor, ignore_if_exists) - .await - }); - - match result { - Ok(_) => ok_result(), - Err(e) => err_from_core_error(&e), - } - } - fn drop_table( &self, table_path: &ffi::FfiTablePath, diff --git a/bindings/cpp/src/type_lowering.hpp b/bindings/cpp/src/type_lowering.hpp index 98e67688..3608a39b 100644 --- a/bindings/cpp/src/type_lowering.hpp +++ b/bindings/cpp/src/type_lowering.hpp @@ -103,8 +103,9 @@ inline std::shared_ptr to_arrow_type(const DataType& dt) { } } -/// True if the type is (or nests) a MAP/ROW the flat FFI column can't express, -/// so it must route through Arrow. Array-of-scalar stays on the flat path. +/// True if the type is (or nests) a MAP/ROW. The data-writer path uses this to +/// route compound element/key/value types through Arrow; array-of-scalar uses +/// the flat writer path. inline bool is_compound(const DataType& dt) { switch (dt.id()) { case TypeId::Map: @@ -119,16 +120,6 @@ inline bool is_compound(const DataType& dt) { } } -inline std::shared_ptr columns_to_arrow_schema(const std::vector& columns) { - std::vector> fields; - fields.reserve(columns.size()); - for (const auto& col : columns) { - fields.push_back( - arrow::field(col.name, to_arrow_type(col.data_type), col.data_type.nullable())); - } - return arrow::schema(std::move(fields)); -} - /// Exports an Arrow schema to a heap FFI_ArrowSchema; the Rust bridge takes /// ownership of the returned pointer. Throws on failure. inline size_t export_arrow_schema(const arrow::Schema& schema) { diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 22a4adef..4d6764ea 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -19,6 +19,9 @@ use crate::ffi; use anyhow::{Result, anyhow}; use arrow::array::Array; use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +use fcore::metadata::{ + ArrayType, Column, DataField, DataType, DataTypes, DecimalType, MapType, RowType, +}; use fcore::row::Datum; use fluss as fcore; use std::borrow::Cow; @@ -44,229 +47,184 @@ pub const DATA_TYPE_ARRAY: i32 = 17; pub const DATA_TYPE_MAP: i32 = 18; pub const DATA_TYPE_ROW: i32 = 19; -/// Separates scalar and array type specs so each variant only carries -/// the fields it actually needs — no zeroed-out placeholders. -enum FfiDataTypeSpec { - Scalar { - data_type: i32, - precision: u32, - scale: u32, - nullable: bool, - }, - Array { - element_data_type: i32, - element_precision: u32, - element_scale: u32, - array_nesting: u32, - /// `nesting` entries for each ARRAY wrapper (outermost first) plus - /// one trailing entry for the leaf scalar. Length = `nesting + 1`. - array_nullability: Vec, - }, +fn ffi_column_to_core_data_type(col: &ffi::FfiColumn) -> Result { + let mut cursor = 0usize; + let dt = nodes_to_data_type(&col.type_nodes, &mut cursor)?; + if cursor != col.type_nodes.len() { + return Err(anyhow!( + "Column '{}': type tree has {} trailing nodes", + col.name, + col.type_nodes.len() - cursor + )); + } + Ok(dt) } -fn ffi_column_to_core_data_type(col: &ffi::FfiColumn) -> Result { - if col.data_type == DATA_TYPE_ARRAY { - ffi_data_type_to_core(FfiDataTypeSpec::Array { - element_data_type: col.element_data_type, - element_precision: col.element_precision as u32, - element_scale: col.element_scale as u32, - array_nesting: col.array_nesting.max(0) as u32, - array_nullability: col.array_nullability.clone(), - }) - } else { - ffi_data_type_to_core(FfiDataTypeSpec::Scalar { - data_type: col.data_type, - precision: col.precision as u32, - scale: col.scale as u32, - nullable: col.nullable, - }) +/// Reconstruct one type from a preorder node arena, advancing `cursor` past +/// the consumed nodes. Mirrors the C++ `data_type_to_nodes` encoder. +fn nodes_to_data_type(nodes: &[ffi::FfiTypeNode], cursor: &mut usize) -> Result { + let node = nodes + .get(*cursor) + .ok_or_else(|| anyhow!("type tree ended before all nodes were read"))?; + *cursor += 1; + + if node.precision < 0 || node.scale < 0 { + return Err(anyhow!( + "type node precision and scale must be non-negative" + )); } + let precision = node.precision as u32; + let scale = node.scale as u32; + + let dt = match node.type_id { + DATA_TYPE_ARRAY => { + let element = nodes_to_data_type(nodes, cursor)?; + DataType::Array(ArrayType::with_nullable(node.nullable, element)) + } + DATA_TYPE_MAP => { + let key = nodes_to_data_type(nodes, cursor)?; + let value = nodes_to_data_type(nodes, cursor)?; + DataType::Map(MapType::with_nullable(node.nullable, key, value)) + } + DATA_TYPE_ROW => { + let mut fields = Vec::with_capacity(node.child_count as usize); + for _ in 0..node.child_count { + let field_name = nodes + .get(*cursor) + .ok_or_else(|| anyhow!("ROW field missing from type tree"))? + .field_name + .clone(); + let field_type = nodes_to_data_type(nodes, cursor)?; + fields.push(DataField::new(field_name, field_type, None)); + } + DataType::Row(RowType::with_nullable(node.nullable, fields)) + } + scalar => return scalar_to_core(scalar, precision, scale, node.nullable), + }; + Ok(dt) } -fn type_precision_scale(dt: &fcore::metadata::DataType) -> (i32, i32) { +fn type_precision_scale(dt: &DataType) -> (i32, i32) { match dt { - fcore::metadata::DataType::Decimal(d) => (d.precision() as i32, d.scale() as i32), - fcore::metadata::DataType::Timestamp(ts) => (ts.precision() as i32, 0), - fcore::metadata::DataType::TimestampLTz(ts) => (ts.precision() as i32, 0), - fcore::metadata::DataType::Char(ch) => (ch.length() as i32, 0), - fcore::metadata::DataType::Binary(bin) => (bin.length() as i32, 0), + DataType::Decimal(d) => (d.precision() as i32, d.scale() as i32), + DataType::Timestamp(ts) => (ts.precision() as i32, 0), + DataType::TimestampLTz(ts) => (ts.precision() as i32, 0), + DataType::Char(ch) => (ch.length() as i32, 0), + DataType::Binary(bin) => (bin.length() as i32, 0), _ => (0, 0), } } -struct FlattenedLeafType { - nesting: i32, - leaf_type: i32, - leaf_precision: i32, - leaf_scale: i32, - /// `nesting` entries for ARRAY wrappers (outermost first) plus one - /// trailing entry for the leaf scalar. Length = `nesting + 1`. - array_nullability: Vec, +fn scalar_to_core(data_type: i32, precision: u32, scale: u32, nullable: bool) -> Result { + let dt = match data_type { + DATA_TYPE_BOOLEAN => DataTypes::boolean(), + DATA_TYPE_TINYINT => DataTypes::tinyint(), + DATA_TYPE_SMALLINT => DataTypes::smallint(), + DATA_TYPE_INT => DataTypes::int(), + DATA_TYPE_BIGINT => DataTypes::bigint(), + DATA_TYPE_FLOAT => DataTypes::float(), + DATA_TYPE_DOUBLE => DataTypes::double(), + DATA_TYPE_STRING => DataTypes::string(), + DATA_TYPE_BYTES => DataTypes::bytes(), + DATA_TYPE_DATE => DataTypes::date(), + DATA_TYPE_TIME => DataTypes::time(), + DATA_TYPE_TIMESTAMP => DataTypes::timestamp_with_precision(precision), + DATA_TYPE_TIMESTAMP_LTZ => DataTypes::timestamp_ltz_with_precision(precision), + DATA_TYPE_DECIMAL => DataType::Decimal(DecimalType::new(precision, scale)?), + DATA_TYPE_CHAR => DataTypes::char(precision), + DATA_TYPE_BINARY => DataTypes::binary(precision as usize), + _ => return Err(anyhow!("Unknown data type: {}", data_type)), + }; + if nullable { + Ok(dt) + } else { + Ok(dt.as_non_nullable()) + } } -fn flatten_array_leaf_type(dt: &fcore::metadata::DataType) -> Result { - let mut nesting = 0_i32; - let mut leaf = dt; - let mut array_nullability = Vec::new(); - while let fcore::metadata::DataType::Array(at) = leaf { - nesting += 1; - array_nullability.push(u8::from(leaf.is_nullable())); - leaf = at.get_element_type(); - } - if nesting == 0 { - return Err(anyhow!("Expected ARRAY data type, got {dt}")); - } - let leaf_type = core_data_type_to_ffi(leaf); - if leaf_type == 0 { - return Err(anyhow!( - "Unsupported ARRAY leaf type for C++ bindings: {leaf}" - )); +/// Serialize a type tree to a preorder node arena. Mirrors the C++ +/// `nodes_to_data_type` decoder; `field_name` is set only for ROW fields. +fn core_data_type_to_nodes(dt: &DataType, field_name: &str, out: &mut Vec) { + let (precision, scale) = type_precision_scale(dt); + let child_count = match dt { + DataType::Array(_) => 1, + DataType::Map(_) => 2, + DataType::Row(rt) => rt.fields().len() as u32, + _ => 0, + }; + out.push(ffi::FfiTypeNode { + type_id: core_data_type_to_ffi(dt), + nullable: dt.is_nullable(), + precision, + scale, + field_name: field_name.to_string(), + child_count, + }); + match dt { + DataType::Array(at) => core_data_type_to_nodes(at.get_element_type(), "", out), + DataType::Map(mt) => { + core_data_type_to_nodes(mt.key_type(), "", out); + core_data_type_to_nodes(mt.value_type(), "", out); + } + DataType::Row(rt) => { + for field in rt.fields() { + core_data_type_to_nodes(field.data_type(), field.name(), out); + } + } + _ => {} } - array_nullability.push(u8::from(leaf.is_nullable())); - let (leaf_precision, leaf_scale) = type_precision_scale(leaf); - Ok(FlattenedLeafType { - nesting, - leaf_type, - leaf_precision, - leaf_scale, - array_nullability, - }) } +/// Build a nested `ARRAY<…>` from a flat leaf description. Used by the +/// data-writer path (`element_type_from_ffi`), which carries array-of-scalar +/// element types without a full node arena. fn build_array_type_from_leaf( element_data_type: i32, element_precision: u32, element_scale: u32, array_nesting: u32, - array_nullability: &[u8], -) -> Result { +) -> Result { if array_nesting == 0 { return Err(anyhow!("ARRAY nesting must be >= 1")); } - let leaf_nullable = array_nullability - .get(array_nesting as usize) - .map(|v| *v != 0) - .unwrap_or(true); - let mut dt = ffi_data_type_to_core(FfiDataTypeSpec::Scalar { - data_type: element_data_type, - precision: element_precision, - scale: element_scale, - nullable: leaf_nullable, - })?; - for i in (0..array_nesting).rev() { - let nullable = array_nullability - .get(i as usize) - .map(|v| *v != 0) - .unwrap_or(true); - dt = fcore::metadata::DataType::Array(fcore::metadata::ArrayType::with_nullable( - nullable, dt, - )); + let mut dt = scalar_to_core(element_data_type, element_precision, element_scale, true)?; + for _ in 0..array_nesting { + dt = DataType::Array(ArrayType::new(dt)); } Ok(dt) } -fn ffi_data_type_to_core(spec: FfiDataTypeSpec) -> Result { - match spec { - FfiDataTypeSpec::Scalar { - data_type, - precision, - scale, - nullable, - } => { - let dt = match data_type { - DATA_TYPE_BOOLEAN => fcore::metadata::DataTypes::boolean(), - DATA_TYPE_TINYINT => fcore::metadata::DataTypes::tinyint(), - DATA_TYPE_SMALLINT => fcore::metadata::DataTypes::smallint(), - DATA_TYPE_INT => fcore::metadata::DataTypes::int(), - DATA_TYPE_BIGINT => fcore::metadata::DataTypes::bigint(), - DATA_TYPE_FLOAT => fcore::metadata::DataTypes::float(), - DATA_TYPE_DOUBLE => fcore::metadata::DataTypes::double(), - DATA_TYPE_STRING => fcore::metadata::DataTypes::string(), - DATA_TYPE_BYTES => fcore::metadata::DataTypes::bytes(), - DATA_TYPE_DATE => fcore::metadata::DataTypes::date(), - DATA_TYPE_TIME => fcore::metadata::DataTypes::time(), - DATA_TYPE_TIMESTAMP => { - fcore::metadata::DataTypes::timestamp_with_precision(precision) - } - DATA_TYPE_TIMESTAMP_LTZ => { - fcore::metadata::DataTypes::timestamp_ltz_with_precision(precision) - } - DATA_TYPE_DECIMAL => { - let dt = fcore::metadata::DecimalType::new(precision, scale)?; - fcore::metadata::DataType::Decimal(dt) - } - DATA_TYPE_CHAR => fcore::metadata::DataTypes::char(precision), - DATA_TYPE_BINARY => fcore::metadata::DataTypes::binary(precision as usize), - _ => return Err(anyhow!("Unknown data type: {}", data_type)), - }; - if nullable { - Ok(dt) - } else { - Ok(dt.as_non_nullable()) - } - } - FfiDataTypeSpec::Array { - element_data_type, - element_precision, - element_scale, - array_nesting, - ref array_nullability, - } => build_array_type_from_leaf( - element_data_type, - element_precision, - element_scale, - array_nesting, - array_nullability, - ), - } -} - -pub fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { +pub fn core_data_type_to_ffi(dt: &DataType) -> i32 { match dt { - fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN, - fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT, - fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT, - fcore::metadata::DataType::Int(_) => DATA_TYPE_INT, - fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT, - fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT, - fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE, - fcore::metadata::DataType::String(_) => DATA_TYPE_STRING, - fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES, - fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE, - fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, - fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, - fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, - fcore::metadata::DataType::Decimal(_) => DATA_TYPE_DECIMAL, - fcore::metadata::DataType::Char(_) => DATA_TYPE_CHAR, - fcore::metadata::DataType::Binary(_) => DATA_TYPE_BINARY, - fcore::metadata::DataType::Array(_) => DATA_TYPE_ARRAY, - fcore::metadata::DataType::Map(_) => DATA_TYPE_MAP, - fcore::metadata::DataType::Row(_) => DATA_TYPE_ROW, + DataType::Boolean(_) => DATA_TYPE_BOOLEAN, + DataType::TinyInt(_) => DATA_TYPE_TINYINT, + DataType::SmallInt(_) => DATA_TYPE_SMALLINT, + DataType::Int(_) => DATA_TYPE_INT, + DataType::BigInt(_) => DATA_TYPE_BIGINT, + DataType::Float(_) => DATA_TYPE_FLOAT, + DataType::Double(_) => DATA_TYPE_DOUBLE, + DataType::String(_) => DATA_TYPE_STRING, + DataType::Bytes(_) => DATA_TYPE_BYTES, + DataType::Date(_) => DATA_TYPE_DATE, + DataType::Time(_) => DATA_TYPE_TIME, + DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, + DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + DataType::Decimal(_) => DATA_TYPE_DECIMAL, + DataType::Char(_) => DATA_TYPE_CHAR, + DataType::Binary(_) => DATA_TYPE_BINARY, + DataType::Array(_) => DATA_TYPE_ARRAY, + DataType::Map(_) => DATA_TYPE_MAP, + DataType::Row(_) => DATA_TYPE_ROW, } } -fn core_column_to_ffi(col: &fcore::metadata::Column) -> ffi::FfiColumn { - let (precision, scale) = type_precision_scale(col.data_type()); - - let flat = match col.data_type() { - fcore::metadata::DataType::Array(_) => flatten_array_leaf_type(col.data_type()).ok(), - _ => None, - }; - +fn core_column_to_ffi(col: &Column) -> ffi::FfiColumn { + let mut type_nodes = Vec::new(); + core_data_type_to_nodes(col.data_type(), "", &mut type_nodes); ffi::FfiColumn { name: col.name().to_string(), - data_type: core_data_type_to_ffi(col.data_type()), - nullable: col.data_type().is_nullable(), comment: col.comment().unwrap_or("").to_string(), - precision, - scale, - array_nesting: flat.as_ref().map_or(0, |f| f.nesting), - array_nullability: flat - .as_ref() - .map_or_else(Vec::new, |f| f.array_nullability.clone()), - element_data_type: flat.as_ref().map_or(0, |f| f.leaf_type), - element_precision: flat.as_ref().map_or(0, |f| f.leaf_precision), - element_scale: flat.as_ref().map_or(0, |f| f.leaf_scale), + type_nodes, } } @@ -276,12 +234,6 @@ pub fn ffi_descriptor_to_core( let mut schema_builder = fcore::metadata::Schema::builder(); for col in &descriptor.schema.columns { - if col.precision < 0 || col.scale < 0 || col.array_nesting < 0 { - return Err(anyhow!( - "Column '{}': precision, scale, and array_nesting must be non-negative", - col.name - )); - } let dt = ffi_column_to_core_data_type(col)?; schema_builder = schema_builder.column(&col.name, dt); if !col.comment.is_empty() { @@ -298,7 +250,7 @@ pub fn ffi_descriptor_to_core( /// Assemble a core `TableDescriptor` from a pre-built `Schema` plus the /// descriptor's table-level metadata (partition/bucket keys, properties, -/// comment). Shared by the flat-column and Arrow-schema create-table paths. +/// comment). fn build_descriptor( schema: fcore::metadata::Schema, descriptor: &ffi::FfiTableDescriptor, @@ -336,34 +288,6 @@ fn build_descriptor( Ok(builder.build()?) } -/// Build a core `TableDescriptor` whose columns come from an Arrow schema -/// imported over the C Data Interface. Lets C++ define nested MAP/ROW columns -/// the flat `FfiColumn` encoding can't express, reusing core's canonical -/// `from_arrow_field` converter rather than a second conversion copy. -/// -/// # Safety -/// `schema_ptr` must be a valid `FFI_ArrowSchema` heap pointer exported by C++ -/// (e.g. via `arrow::ExportSchema`); ownership is taken and released here. -pub unsafe fn arrow_ffi_to_core_descriptor( - schema_ptr: usize, - descriptor: &ffi::FfiTableDescriptor, -) -> Result { - let ffi_schema = unsafe { Box::from_raw(schema_ptr as *mut arrow::ffi::FFI_ArrowSchema) }; - let arrow_schema = arrow::datatypes::Schema::try_from(ffi_schema.as_ref()) - .map_err(|e| anyhow!("Failed to import Arrow schema: {e}"))?; - - let mut schema_builder = fcore::metadata::Schema::builder(); - for field in arrow_schema.fields() { - let dt = fcore::record::from_arrow_field(field.as_ref())?; - schema_builder = schema_builder.column(field.name(), dt); - } - if !descriptor.schema.primary_keys.is_empty() { - schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone()); - } - - build_descriptor(schema_builder.build()?, descriptor) -} - /// Import a heap `FFI_ArrowSchema` (exported by C++) and return its fields as /// Fluss DataTypes. Lets ArrayWriter/MapWriter carry ROW/MAP element and /// key/value types that the flat FFI encoding cannot express. @@ -474,15 +398,9 @@ pub fn element_type_from_ffi( array_nesting: u32, ) -> Result { if array_nesting == 0 { - ffi_data_type_to_core(FfiDataTypeSpec::Scalar { - data_type: leaf_dt, - precision, - scale, - nullable: true, - }) + scalar_to_core(leaf_dt, precision, scale, true) } else { - let array_nullability = vec![1u8; (array_nesting + 1) as usize]; - build_array_type_from_leaf(leaf_dt, precision, scale, array_nesting, &array_nullability) + build_array_type_from_leaf(leaf_dt, precision, scale, array_nesting) } } diff --git a/bindings/cpp/test/test_ffi_converter.cpp b/bindings/cpp/test/test_ffi_converter.cpp index 2078bdab..49e64114 100644 --- a/bindings/cpp/test/test_ffi_converter.cpp +++ b/bindings/cpp/test/test_ffi_converter.cpp @@ -18,223 +18,220 @@ */ #include + #include #include "ffi_converter.hpp" namespace { -fluss::ffi::FfiColumn MakeArrayColumn(int32_t nesting, int32_t element_type, - bool nullable = true, bool leaf_nullable = true, - std::vector per_level_nullability = {}) { - fluss::ffi::FfiColumn col; - col.name = rust::String("bad_array"); - col.data_type = static_cast(fluss::TypeId::Array); - col.nullable = nullable; - col.comment = rust::String(""); - col.precision = 0; - col.scale = 0; - col.array_nesting = nesting; - if (!per_level_nullability.empty()) { - for (auto v : per_level_nullability) { - col.array_nullability.push_back(v); - } - } else { - for (int32_t i = 0; i < nesting; ++i) { - col.array_nullability.push_back((i == 0 ? nullable : true) ? 1 : 0); - } - col.array_nullability.push_back(leaf_nullable ? 1 : 0); - } - col.element_data_type = element_type; - col.element_precision = 0; - col.element_scale = 0; - return col; -} - -fluss::ffi::FfiColumn MakeScalarColumn(const char* name, fluss::TypeId type_id, - bool nullable = true, int32_t precision = 0, - int32_t scale = 0) { - fluss::ffi::FfiColumn col; - col.name = rust::String(name); - col.data_type = static_cast(type_id); - col.nullable = nullable; - col.comment = rust::String(""); - col.precision = precision; - col.scale = scale; - col.array_nesting = 0; - col.element_data_type = 0; - col.element_precision = 0; - col.element_scale = 0; - return col; -} - -} // namespace +using fluss::Column; +using fluss::DataType; +using fluss::TypeId; -TEST(FfiConverterTest, RejectsArrayWithoutElementType) { - auto col = MakeArrayColumn(1, 0); - EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); +// Encode a column to the FFI node arena and decode it back, exercising the +// full create-table / get-table-info type transport. +Column RoundTrip(const Column& col) { + auto ffi_col = fluss::utils::to_ffi_column(col); + return fluss::utils::from_ffi_column(ffi_col); } -TEST(FfiConverterTest, RejectsArrayWithArrayLeafType) { - auto col = MakeArrayColumn(2, static_cast(fluss::TypeId::Array)); - EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); +fluss::ffi::FfiTypeNode Node(int32_t type_id, uint32_t child_count = 0, bool nullable = true, + int32_t precision = 0, int32_t scale = 0, + const char* field_name = "") { + fluss::ffi::FfiTypeNode n; + n.type_id = type_id; + n.nullable = nullable; + n.precision = precision; + n.scale = scale; + n.field_name = rust::String(field_name); + n.child_count = child_count; + return n; } -TEST(FfiConverterTest, RejectsArrayWithUnknownLeafType) { - auto col = MakeArrayColumn(1, 999); - EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); -} +} // namespace -TEST(FfiConverterTest, SupportsLegacyOneLevelArrayMetadata) { - auto col = MakeArrayColumn(0, static_cast(fluss::TypeId::Int)); - auto converted = fluss::utils::from_ffi_column(col); - EXPECT_EQ(converted.data_type.id(), fluss::TypeId::Array); - ASSERT_NE(converted.data_type.element_type(), nullptr); - EXPECT_EQ(converted.data_type.element_type()->id(), fluss::TypeId::Int); -} +// --- DataType value semantics --- -// --- Nullability tests --- - -TEST(DataTypeTest, DefaultNullable) { - auto dt = fluss::DataType::Int(); - EXPECT_TRUE(dt.nullable()); -} +TEST(DataTypeTest, DefaultNullable) { EXPECT_TRUE(DataType::Int().nullable()); } TEST(DataTypeTest, NotNullMethod) { - auto dt = fluss::DataType::Int().NotNull(); + auto dt = DataType::Int().NotNull(); EXPECT_FALSE(dt.nullable()); - EXPECT_EQ(dt.id(), fluss::TypeId::Int); + EXPECT_EQ(dt.id(), TypeId::Int); } TEST(DataTypeTest, NotNullPreservesPrecisionScale) { - auto dt = fluss::DataType::Decimal(10, 2).NotNull(); + auto dt = DataType::Decimal(10, 2).NotNull(); EXPECT_FALSE(dt.nullable()); EXPECT_EQ(dt.precision(), 10); EXPECT_EQ(dt.scale(), 2); } -TEST(DataTypeTest, ArrayElementNullability) { - auto dt = fluss::DataType::Array(fluss::DataType::Int().NotNull()); - EXPECT_TRUE(dt.nullable()); - ASSERT_NE(dt.element_type(), nullptr); - EXPECT_FALSE(dt.element_type()->nullable()); -} - -TEST(DataTypeTest, NotNullArrayNullableElement) { - auto dt = fluss::DataType::Array(fluss::DataType::Int()).NotNull(); - EXPECT_FALSE(dt.nullable()); - ASSERT_NE(dt.element_type(), nullptr); - EXPECT_TRUE(dt.element_type()->nullable()); -} - -TEST(DataTypeTest, NotNullArrayNotNullElement) { - auto dt = fluss::DataType::Array(fluss::DataType::Int().NotNull()).NotNull(); - EXPECT_FALSE(dt.nullable()); - ASSERT_NE(dt.element_type(), nullptr); - EXPECT_FALSE(dt.element_type()->nullable()); -} +// --- Node-arena round trips --- -TEST(FfiConverterTest, ScalarNullableRoundTrip) { - fluss::Column col{"id", fluss::DataType::Int(), ""}; - auto ffi_col = fluss::utils::to_ffi_column(col); - EXPECT_TRUE(ffi_col.nullable); - auto back = fluss::utils::from_ffi_column(ffi_col); +TEST(FfiConverterTest, ScalarRoundTrip) { + Column col{"id", DataType::Int(), "primary id"}; + auto back = RoundTrip(col); + EXPECT_EQ(back.name, "id"); + EXPECT_EQ(back.comment, "primary id"); + EXPECT_EQ(back.data_type.id(), TypeId::Int); EXPECT_TRUE(back.data_type.nullable()); } TEST(FfiConverterTest, ScalarNotNullRoundTrip) { - fluss::Column col{"id", fluss::DataType::Int().NotNull(), ""}; - auto ffi_col = fluss::utils::to_ffi_column(col); - EXPECT_FALSE(ffi_col.nullable); - auto back = fluss::utils::from_ffi_column(ffi_col); + auto back = RoundTrip(Column{"id", DataType::Int().NotNull(), ""}); + EXPECT_EQ(back.data_type.id(), TypeId::Int); EXPECT_FALSE(back.data_type.nullable()); } -TEST(FfiConverterTest, ArrayNotNullElementRoundTrip) { - fluss::Column col{"tags", fluss::DataType::Array(fluss::DataType::String().NotNull()), ""}; - auto ffi_col = fluss::utils::to_ffi_column(col); - EXPECT_TRUE(ffi_col.nullable); - ASSERT_EQ(ffi_col.array_nullability.size(), 2u); - EXPECT_EQ(ffi_col.array_nullability[1], 0); - auto back = fluss::utils::from_ffi_column(ffi_col); - EXPECT_TRUE(back.data_type.nullable()); - ASSERT_NE(back.data_type.element_type(), nullptr); - EXPECT_FALSE(back.data_type.element_type()->nullable()); +TEST(FfiConverterTest, DecimalPrecisionScalePreserved) { + auto back = RoundTrip(Column{"amount", DataType::Decimal(18, 4), ""}); + EXPECT_EQ(back.data_type.id(), TypeId::Decimal); + EXPECT_EQ(back.data_type.precision(), 18); + EXPECT_EQ(back.data_type.scale(), 4); } -TEST(FfiConverterTest, NotNullArrayNullableElementRoundTrip) { - fluss::Column col{"ids", fluss::DataType::Array(fluss::DataType::Int()).NotNull(), ""}; - auto ffi_col = fluss::utils::to_ffi_column(col); - EXPECT_FALSE(ffi_col.nullable); - ASSERT_EQ(ffi_col.array_nullability.size(), 2u); - EXPECT_EQ(ffi_col.array_nullability[1], 1); - auto back = fluss::utils::from_ffi_column(ffi_col); - EXPECT_FALSE(back.data_type.nullable()); - ASSERT_NE(back.data_type.element_type(), nullptr); - EXPECT_TRUE(back.data_type.element_type()->nullable()); +// Non-canonical precisions are exactly what the old Arrow-schema path quantized; +// the node arena must preserve them. +TEST(FfiConverterTest, TimestampPrecisionPreserved) { + auto back = RoundTrip(Column{"ts", DataType::Timestamp(2), ""}); + EXPECT_EQ(back.data_type.id(), TypeId::Timestamp); + EXPECT_EQ(back.data_type.precision(), 2); + + auto ltz = RoundTrip(Column{"ts_ltz", DataType::TimestampLtz(9), ""}); + EXPECT_EQ(ltz.data_type.id(), TypeId::TimestampLtz); + EXPECT_EQ(ltz.data_type.precision(), 9); } -TEST(FfiConverterTest, NotNullArrayNotNullElementRoundTrip) { - fluss::Column col{ - "strict_ids", - fluss::DataType::Array(fluss::DataType::Int().NotNull()).NotNull(), - "", - }; - auto ffi_col = fluss::utils::to_ffi_column(col); - EXPECT_FALSE(ffi_col.nullable); - ASSERT_EQ(ffi_col.array_nullability.size(), 2u); - EXPECT_EQ(ffi_col.array_nullability[1], 0); - auto back = fluss::utils::from_ffi_column(ffi_col); - EXPECT_FALSE(back.data_type.nullable()); - ASSERT_NE(back.data_type.element_type(), nullptr); - EXPECT_FALSE(back.data_type.element_type()->nullable()); +TEST(FfiConverterTest, CharBinaryLengthPreserved) { + auto ch = RoundTrip(Column{"code", DataType::Char(12), ""}); + EXPECT_EQ(ch.data_type.id(), TypeId::Char); + EXPECT_EQ(ch.data_type.precision(), 12); + + auto bin = RoundTrip(Column{"hash", DataType::Binary(32), ""}); + EXPECT_EQ(bin.data_type.id(), TypeId::Binary); + EXPECT_EQ(bin.data_type.precision(), 32); } -TEST(FfiConverterTest, NestedArrayIntermediateNullabilityRoundTrip) { - fluss::Column col{ - "nested", - fluss::DataType::Array(fluss::DataType::Array(fluss::DataType::Int()).NotNull()), - "", - }; - auto ffi_col = fluss::utils::to_ffi_column(col); - auto back = fluss::utils::from_ffi_column(ffi_col); +TEST(FfiConverterTest, ArrayRoundTrip) { + auto back = RoundTrip(Column{"tags", DataType::Array(DataType::String()), ""}); + EXPECT_EQ(back.data_type.id(), TypeId::Array); + ASSERT_NE(back.data_type.element_type(), nullptr); + EXPECT_EQ(back.data_type.element_type()->id(), TypeId::String); +} +TEST(FfiConverterTest, NestedArrayPerLevelNullabilityRoundTrip) { + // array NOT NULL> (outer nullable) + Column col{"nested", + DataType::Array(DataType::Array(DataType::Int().NotNull()).NotNull()), ""}; + auto back = RoundTrip(col); EXPECT_TRUE(back.data_type.nullable()); ASSERT_NE(back.data_type.element_type(), nullptr); EXPECT_FALSE(back.data_type.element_type()->nullable()); ASSERT_NE(back.data_type.element_type()->element_type(), nullptr); - EXPECT_TRUE(back.data_type.element_type()->element_type()->nullable()); + EXPECT_EQ(back.data_type.element_type()->element_type()->id(), TypeId::Int); + EXPECT_FALSE(back.data_type.element_type()->element_type()->nullable()); } -TEST(FfiConverterTest, NestedArrayAllLevelsNullabilityRoundTrip) { - fluss::Column col{ - "strict_nested", - fluss::DataType::Array( - fluss::DataType::Array(fluss::DataType::Int().NotNull()).NotNull()) - .NotNull(), - "", - }; - auto ffi_col = fluss::utils::to_ffi_column(col); - auto back = fluss::utils::from_ffi_column(ffi_col); +TEST(FfiConverterTest, MapRoundTrip) { + Column col{"attrs", DataType::Map(DataType::String(), DataType::Int().NotNull()), ""}; + auto back = RoundTrip(col); + EXPECT_EQ(back.data_type.id(), TypeId::Map); + ASSERT_NE(back.data_type.key_type(), nullptr); + ASSERT_NE(back.data_type.value_type(), nullptr); + EXPECT_EQ(back.data_type.key_type()->id(), TypeId::String); + EXPECT_EQ(back.data_type.value_type()->id(), TypeId::Int); + EXPECT_FALSE(back.data_type.value_type()->nullable()); +} + +TEST(FfiConverterTest, RowRoundTrip) { + Column col{"profile", + DataType::Row({{"age", DataType::Int().NotNull()}, {"city", DataType::String()}}), + "user profile"}; + auto back = RoundTrip(col); + EXPECT_EQ(back.comment, "user profile"); + EXPECT_EQ(back.data_type.id(), TypeId::Row); + ASSERT_EQ(back.data_type.field_count(), 2u); + EXPECT_EQ(back.data_type.field_name(0), "age"); + ASSERT_NE(back.data_type.field_type(0), nullptr); + EXPECT_EQ(back.data_type.field_type(0)->id(), TypeId::Int); + EXPECT_FALSE(back.data_type.field_type(0)->nullable()); + EXPECT_EQ(back.data_type.field_name(1), "city"); + EXPECT_EQ(back.data_type.field_type(1)->id(), TypeId::String); + EXPECT_TRUE(back.data_type.field_type(1)->nullable()); +} + +// array>> — every kind +// of nesting plus precision-bearing leaves, round-tripped exactly. +TEST(FfiConverterTest, DeeplyNestedRoundTrip) { + auto row = DataType::Row({{"amount", DataType::Decimal(18, 4)}, {"ts", DataType::Timestamp(3)}}); + Column col{"events", DataType::Array(DataType::Map(DataType::String(), std::move(row))), ""}; + auto back = RoundTrip(col); + + ASSERT_EQ(back.data_type.id(), TypeId::Array); + const DataType* map = back.data_type.element_type(); + ASSERT_NE(map, nullptr); + ASSERT_EQ(map->id(), TypeId::Map); + EXPECT_EQ(map->key_type()->id(), TypeId::String); + const DataType* inner = map->value_type(); + ASSERT_NE(inner, nullptr); + ASSERT_EQ(inner->id(), TypeId::Row); + ASSERT_EQ(inner->field_count(), 2u); + EXPECT_EQ(inner->field_name(0), "amount"); + EXPECT_EQ(inner->field_type(0)->precision(), 18); + EXPECT_EQ(inner->field_type(0)->scale(), 4); + EXPECT_EQ(inner->field_name(1), "ts"); + EXPECT_EQ(inner->field_type(1)->precision(), 3); +} + +TEST(FfiConverterTest, NodeArenaShape) { + EXPECT_EQ(fluss::utils::to_ffi_column(Column{"a", DataType::Int(), ""}).type_nodes.size(), 1u); + EXPECT_EQ( + fluss::utils::to_ffi_column(Column{"a", DataType::Array(DataType::Int()), ""}).type_nodes.size(), + 2u); + EXPECT_EQ(fluss::utils::to_ffi_column( + Column{"a", DataType::Map(DataType::String(), DataType::Int()), ""}) + .type_nodes.size(), + 3u); + EXPECT_EQ(fluss::utils::to_ffi_column( + Column{"a", DataType::Row({{"x", DataType::Int()}, {"y", DataType::String()}}), ""}) + .type_nodes.size(), + 3u); +} + +// --- Decoder rejects malformed arenas --- + +TEST(FfiConverterTest, RejectsEmptyTypeTree) { + fluss::ffi::FfiColumn col; + col.name = rust::String("broken"); + col.comment = rust::String(""); + EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); +} - EXPECT_FALSE(back.data_type.nullable()); - ASSERT_NE(back.data_type.element_type(), nullptr); - EXPECT_FALSE(back.data_type.element_type()->nullable()); - ASSERT_NE(back.data_type.element_type()->element_type(), nullptr); - EXPECT_FALSE(back.data_type.element_type()->element_type()->nullable()); +TEST(FfiConverterTest, RejectsUnknownTypeId) { + fluss::ffi::FfiColumn col; + col.name = rust::String("broken"); + col.comment = rust::String(""); + col.type_nodes.push_back(Node(999)); + EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); } -TEST(FfiConverterTest, FfiColumnNonNullableScalarReconstructed) { - auto col = MakeScalarColumn("id", fluss::TypeId::Int, false); - auto converted = fluss::utils::from_ffi_column(col); - EXPECT_FALSE(converted.data_type.nullable()); - EXPECT_EQ(converted.data_type.id(), fluss::TypeId::Int); +TEST(FfiConverterTest, RejectsTrailingNodes) { + fluss::ffi::FfiColumn col; + col.name = rust::String("broken"); + col.comment = rust::String(""); + col.type_nodes.push_back(Node(static_cast(TypeId::Int))); + col.type_nodes.push_back(Node(static_cast(TypeId::Int))); + EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); } -TEST(FfiConverterTest, FfiColumnNonNullableArrayReconstructed) { - auto col = MakeArrayColumn(1, static_cast(fluss::TypeId::String), false, false); - auto converted = fluss::utils::from_ffi_column(col); - EXPECT_FALSE(converted.data_type.nullable()); - ASSERT_NE(converted.data_type.element_type(), nullptr); - EXPECT_FALSE(converted.data_type.element_type()->nullable()); +TEST(FfiConverterTest, RejectsRowMissingField) { + fluss::ffi::FfiColumn col; + col.name = rust::String("broken"); + col.comment = rust::String(""); + // ROW claims two fields but only one follows. + col.type_nodes.push_back(Node(static_cast(TypeId::Row), /*child_count=*/2)); + col.type_nodes.push_back(Node(static_cast(TypeId::Int), 0, true, 0, 0, "x")); + EXPECT_THROW((void)fluss::utils::from_ffi_column(col), std::runtime_error); } diff --git a/bindings/cpp/test/test_kv_table.cpp b/bindings/cpp/test/test_kv_table.cpp index b7902149..20c7fb62 100644 --- a/bindings/cpp/test/test_kv_table.cpp +++ b/bindings/cpp/test/test_kv_table.cpp @@ -23,6 +23,8 @@ #include "test_utils.h" +using fluss::DataType; + class KvTableTest : public ::testing::Test { protected: fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } @@ -39,9 +41,9 @@ TEST_F(KvTableTest, UpsertDeleteAndLookup) { fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("age", fluss::DataType::BigInt()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) + .AddColumn("age", DataType::BigInt()) .SetPrimaryKeys({"id"}) .Build(); @@ -164,8 +166,8 @@ TEST_F(KvTableTest, LimitScan) { fluss::TablePath table_path("fluss", "test_limit_scan_pk_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) .SetPrimaryKeys({"id"}) .Build(); @@ -234,9 +236,9 @@ TEST_F(KvTableTest, LookupWithNestedArray) { fluss::TablePath table_path("fluss", "test_lookup_nested_array_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) + .AddColumn("id", DataType::Int()) .AddColumn("matrix", - fluss::DataType::Array(fluss::DataType::Array(fluss::DataType::Int()))) + DataType::Array(DataType::Array(DataType::Int()))) .SetPrimaryKeys({"id"}) .Build(); @@ -258,15 +260,15 @@ TEST_F(KvTableTest, LookupWithNestedArray) { auto row = table.NewRow(); row.Set("id", 1); - fluss::ArrayWriter inner1(2, fluss::DataType::Int()); + fluss::ArrayWriter inner1(2, DataType::Int()); inner1.SetInt32(0, 11); inner1.SetInt32(1, 12); - fluss::ArrayWriter inner2(2, fluss::DataType::Int()); + fluss::ArrayWriter inner2(2, DataType::Int()); inner2.SetInt32(0, 21); inner2.SetInt32(1, 22); - fluss::ArrayWriter outer(2, fluss::DataType::Array(fluss::DataType::Int())); + fluss::ArrayWriter outer(2, DataType::Array(DataType::Int())); outer.SetArray(0, std::move(inner1)); outer.SetArray(1, std::move(inner2)); row.Set("matrix", std::move(outer)); @@ -308,45 +310,47 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { fluss::TablePath table_path("fluss", "test_lookup_complex_matrix_cpp"); - auto row_seq_label = arrow::struct_( - {arrow::field("seq", arrow::int32()), arrow::field("label", arrow::utf8())}); - - auto arrow_schema = arrow::schema({ - arrow::field("id", arrow::int32()), - arrow::field("m_str_int", arrow::map(arrow::utf8(), arrow::int32())), - arrow::field("m_str_row", arrow::map(arrow::utf8(), row_seq_label)), - arrow::field("m_str_map", - arrow::map(arrow::utf8(), arrow::map(arrow::utf8(), arrow::int32()))), - arrow::field("m_str_arr", arrow::map(arrow::utf8(), arrow::list(arrow::int32()))), - arrow::field("arr_map", arrow::list(arrow::map(arrow::utf8(), arrow::int32()))), - arrow::field("arr_row", arrow::list(row_seq_label)), - arrow::field("r_deep", arrow::struct_({arrow::field( - "inner", arrow::struct_({arrow::field("n", arrow::int32())}))})), - arrow::field("r_with_arr", - arrow::struct_({arrow::field("f_int", arrow::int32()), - arrow::field("f_arr", arrow::list(arrow::int32()))})), - // row_rich: every scalar type + an array field in one ROW. - arrow::field("r_rich", - arrow::struct_({ - arrow::field("f_bool", arrow::boolean()), - arrow::field("f_int", arrow::int32()), - arrow::field("f_long", arrow::int64()), - arrow::field("f_float", arrow::float32()), - arrow::field("f_double", arrow::float64()), - arrow::field("f_str", arrow::utf8()), - arrow::field("f_bytes", arrow::binary()), - arrow::field("f_decimal", arrow::decimal128(10, 2)), - arrow::field("f_date", arrow::date32()), - arrow::field("f_time", arrow::time32(arrow::TimeUnit::MILLI)), - arrow::field("f_ts_ntz", arrow::timestamp(arrow::TimeUnit::MICRO)), - arrow::field("f_ts_ltz", arrow::timestamp(arrow::TimeUnit::MICRO, "UTC")), - arrow::field("f_binary", arrow::fixed_size_binary(4)), - arrow::field("f_arr", arrow::list(arrow::int32())), - })), - arrow::field("m_str_tiny", arrow::map(arrow::utf8(), arrow::int8())), - arrow::field("arr_small", arrow::list(arrow::int16())), - }); - auto schema = fluss::Schema::FromArrow(arrow_schema, {"id"}); + auto row_seq_label = DataType::Row({{"seq", DataType::Int()}, {"label", DataType::String()}}); + + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("id", DataType::Int()) + .AddColumn("m_str_int", DataType::Map(DataType::String(), DataType::Int())) + .AddColumn("m_str_row", DataType::Map(DataType::String(), row_seq_label)) + .AddColumn("m_str_map", + DataType::Map(DataType::String(), + DataType::Map(DataType::String(), DataType::Int()))) + .AddColumn("m_str_arr", + DataType::Map(DataType::String(), DataType::Array(DataType::Int()))) + .AddColumn("arr_map", + DataType::Array(DataType::Map(DataType::String(), DataType::Int()))) + .AddColumn("arr_row", DataType::Array(row_seq_label)) + .AddColumn("r_deep", + DataType::Row({{"inner", DataType::Row({{"n", DataType::Int()}})}})) + .AddColumn("r_with_arr", + DataType::Row({{"f_int", DataType::Int()}, + {"f_arr", DataType::Array(DataType::Int())}})) + // r_rich: every scalar type + an array field in one ROW. + .AddColumn("r_rich", DataType::Row({ + {"f_bool", DataType::Boolean()}, + {"f_int", DataType::Int()}, + {"f_long", DataType::BigInt()}, + {"f_float", DataType::Float()}, + {"f_double", DataType::Double()}, + {"f_str", DataType::String()}, + {"f_bytes", DataType::Bytes()}, + {"f_decimal", DataType::Decimal(10, 2)}, + {"f_date", DataType::Date()}, + {"f_time", DataType::Time()}, + {"f_ts_ntz", DataType::Timestamp(6)}, + {"f_ts_ltz", DataType::TimestampLtz(6)}, + {"f_binary", DataType::Binary(4)}, + {"f_arr", DataType::Array(DataType::Int())}, + })) + .AddColumn("m_str_tiny", DataType::Map(DataType::String(), DataType::TinyInt())) + .AddColumn("arr_small", DataType::Array(DataType::SmallInt())) + .SetPrimaryKeys({"id"}) + .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -366,7 +370,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { // map — second entry has a NULL value. { - fluss::MapWriter m(2, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter m(2, DataType::String(), DataType::Int()); m.SetKeyString("a"); m.SetValueInt32(1); m.Commit(); @@ -375,9 +379,9 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { m.Commit(); row.Set("m_str_int", std::move(m)); } - // map> — value is a ROW, so the Arrow ctor. + // map> { - fluss::MapWriter m(1, arrow::utf8(), row_seq_label); + fluss::MapWriter m(1, DataType::String(), row_seq_label); m.SetKeyString("k"); fluss::GenericRow v(2); v.SetInt32(0, 7); @@ -390,7 +394,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { { fluss::MapWriter m(1, arrow::utf8(), arrow::map(arrow::utf8(), arrow::int32())); m.SetKeyString("k"); - fluss::MapWriter inner(1, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter inner(1, DataType::String(), DataType::Int()); inner.SetKeyString("x"); inner.SetValueInt32(9); inner.Commit(); @@ -400,10 +404,10 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { } // map> — value array fits the flat ctor. { - fluss::MapWriter m(1, fluss::DataType::String(), - fluss::DataType::Array(fluss::DataType::Int())); + fluss::MapWriter m(1, DataType::String(), + DataType::Array(DataType::Int())); m.SetKeyString("k"); - fluss::ArrayWriter v(2, fluss::DataType::Int()); + fluss::ArrayWriter v(2, DataType::Int()); v.SetInt32(0, 10); v.SetInt32(1, 20); m.SetValueArray(std::move(v)); @@ -413,7 +417,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { // array> — element is a MAP, so the Arrow ctor. { fluss::ArrayWriter a(1, arrow::map(arrow::utf8(), arrow::int32())); - fluss::MapWriter e(1, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter e(1, DataType::String(), DataType::Int()); e.SetKeyString("p"); e.SetValueInt32(5); e.Commit(); @@ -445,7 +449,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { { fluss::GenericRow r(2); r.SetInt32(0, 100); - fluss::ArrayWriter arr(3, fluss::DataType::Int()); + fluss::ArrayWriter arr(3, DataType::Int()); arr.SetInt32(0, 1); arr.SetInt32(1, 2); arr.SetInt32(2, 3); @@ -468,7 +472,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { rr.SetTimestampNtz(10, fluss::Timestamp{1769163227123LL, 456000}); rr.SetTimestampLtz(11, fluss::Timestamp{1769163227456LL, 0}); rr.SetBytes(12, {1, 2, 3, 4}); - fluss::ArrayWriter farr(3, fluss::DataType::Int()); + fluss::ArrayWriter farr(3, DataType::Int()); farr.SetInt32(0, 7); farr.SetNull(1); farr.SetInt32(2, 11); @@ -477,7 +481,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { } // map { - fluss::MapWriter m(2, fluss::DataType::String(), fluss::DataType::TinyInt()); + fluss::MapWriter m(2, DataType::String(), DataType::TinyInt()); m.SetKeyString("lo"); m.SetValueInt32(-128); m.Commit(); @@ -488,7 +492,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { } // array { - fluss::ArrayWriter a(2, fluss::DataType::SmallInt()); + fluss::ArrayWriter a(2, DataType::SmallInt()); a.SetInt32(0, 1000); a.SetInt32(1, -2000); row.Set("arr_small", std::move(a)); @@ -616,7 +620,7 @@ TEST_F(KvTableTest, LookupComplexTypesMatrix) { // Row 2 (id=2) — every compound column NULL. { - const int column_count = arrow_schema->num_fields(); + const int column_count = static_cast(schema.columns.size()); auto row = table.NewRow(); row.SetInt32(0, 2); for (int i = 1; i < column_count; ++i) { @@ -647,13 +651,13 @@ TEST_F(KvTableTest, MapWithTimestampValuesNtzAndLtz) { fluss::TablePath table_path("fluss", "test_map_timestamp_values_cpp"); - auto arrow_schema = arrow::schema({ - arrow::field("id", arrow::int32()), - arrow::field("mn", arrow::map(arrow::utf8(), arrow::timestamp(arrow::TimeUnit::MICRO))), - arrow::field("ml", - arrow::map(arrow::utf8(), arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"))), - }); - auto schema = fluss::Schema::FromArrow(arrow_schema, {"id"}); + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("id", DataType::Int()) + .AddColumn("mn", DataType::Map(DataType::String(), DataType::Timestamp(6))) + .AddColumn("ml", DataType::Map(DataType::String(), DataType::TimestampLtz(6))) + .SetPrimaryKeys({"id"}) + .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -716,16 +720,16 @@ TEST_F(KvTableTest, NativeNestedBuilderNoArrow) { fluss::TablePath table_path("fluss", "test_native_nested_builder_cpp"); // array>> - auto event = fluss::DataType::Row({ - {"seq", fluss::DataType::Int()}, - {"attrs", fluss::DataType::Map(fluss::DataType::String(), fluss::DataType::Int())}, + auto event = DataType::Row({ + {"seq", DataType::Int()}, + {"attrs", DataType::Map(DataType::String(), DataType::Int())}, }); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("events", fluss::DataType::Array(event)) - .AddColumn("profile", fluss::DataType::Row({ - {"name", fluss::DataType::String()}, - {"score", fluss::DataType::Double()}, + .AddColumn("id", DataType::Int()) + .AddColumn("events", DataType::Array(event)) + .AddColumn("profile", DataType::Row({ + {"name", DataType::String()}, + {"score", DataType::Double()}, })) .SetPrimaryKeys({"id"}) .Build(); @@ -750,8 +754,8 @@ TEST_F(KvTableTest, NativeNestedBuilderNoArrow) { for (int i = 0; i < 2; i++) { fluss::GenericRow ev(2); ev.SetInt32(0, i); - fluss::MapWriter attrs(static_cast(i + 1), fluss::DataType::String(), - fluss::DataType::Int()); + fluss::MapWriter attrs(static_cast(i + 1), DataType::String(), + DataType::Int()); attrs.SetKeyString("a"); attrs.SetValueInt32(i * 10); attrs.Commit(); @@ -805,16 +809,17 @@ TEST_F(KvTableTest, ComplexTypesPartialUpdate) { fluss::TablePath table_path("fluss", "test_complex_partial_update_cpp"); - auto arrow_schema = arrow::schema({ - arrow::field("id", arrow::int32()), - arrow::field("name", arrow::utf8()), - arrow::field("score", arrow::int64()), - arrow::field("nested", arrow::struct_({arrow::field("seq", arrow::int32()), - arrow::field("label", arrow::utf8())})), - arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int32())), - arrow::field("tags", arrow::list(arrow::utf8())), - }); - auto schema = fluss::Schema::FromArrow(arrow_schema, {"id"}); + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) + .AddColumn("score", DataType::BigInt()) + .AddColumn("nested", DataType::Row({{"seq", DataType::Int()}, + {"label", DataType::String()}})) + .AddColumn("attrs", DataType::Map(DataType::String(), DataType::Int())) + .AddColumn("tags", DataType::Array(DataType::String())) + .SetPrimaryKeys({"id"}) + .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -837,7 +842,7 @@ TEST_F(KvTableTest, ComplexTypesPartialUpdate) { nested.SetInt32(0, 10); nested.SetString(1, "alpha"); row.SetRow(3, std::move(nested)); - fluss::MapWriter attrs(2, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter attrs(2, DataType::String(), DataType::Int()); attrs.SetKeyString("a"); attrs.SetValueInt32(1); attrs.Commit(); @@ -845,7 +850,7 @@ TEST_F(KvTableTest, ComplexTypesPartialUpdate) { attrs.SetValueInt32(2); attrs.Commit(); row.SetMap(4, std::move(attrs)); - fluss::ArrayWriter tags(2, fluss::DataType::String()); + fluss::ArrayWriter tags(2, DataType::String()); tags.SetString(0, "x"); tags.SetString(1, "y"); row.SetArray(5, std::move(tags)); @@ -925,14 +930,15 @@ TEST_F(KvTableTest, PartitionedComplexTypes) { fluss::TablePath table_path("fluss", "test_partitioned_complex_cpp"); - auto arrow_schema = arrow::schema({ - arrow::field("region", arrow::utf8()), - arrow::field("user_id", arrow::int32()), - arrow::field("nested", arrow::struct_({arrow::field("seq", arrow::int32()), - arrow::field("label", arrow::utf8())})), - arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int32())), - }); - auto schema = fluss::Schema::FromArrow(arrow_schema, {"region", "user_id"}); + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("region", DataType::String()) + .AddColumn("user_id", DataType::Int()) + .AddColumn("nested", DataType::Row({{"seq", DataType::Int()}, + {"label", DataType::String()}})) + .AddColumn("attrs", DataType::Map(DataType::String(), DataType::Int())) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -964,7 +970,7 @@ TEST_F(KvTableTest, PartitionedComplexTypes) { nested.SetInt32(0, d.seq); nested.SetString(1, d.label); row.SetRow(2, std::move(nested)); - fluss::MapWriter attrs(1, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter attrs(1, DataType::String(), DataType::Int()); attrs.SetKeyString(d.label); attrs.SetValueInt32(d.seq); attrs.Commit(); @@ -1002,8 +1008,8 @@ TEST_F(KvTableTest, LookupArrayValidationErrors) { fluss::TablePath table_path("fluss", "test_lookup_array_validation_errors_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("vals", fluss::DataType::Array(fluss::DataType::Int())) + .AddColumn("id", DataType::Int()) + .AddColumn("vals", DataType::Array(DataType::Int())) .SetPrimaryKeys({"id"}) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -1020,7 +1026,7 @@ TEST_F(KvTableTest, LookupArrayValidationErrors) { auto row = table.NewRow(); row.Set("id", 1); - fluss::ArrayWriter vals(2, fluss::DataType::Int()); + fluss::ArrayWriter vals(2, DataType::Int()); vals.SetInt32(0, 99); vals.SetNull(1); row.Set("vals", std::move(vals)); @@ -1070,9 +1076,9 @@ TEST_F(KvTableTest, CompositePrimaryKeys) { fluss::TablePath table_path("fluss", "test_composite_pk_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("region", fluss::DataType::String()) - .AddColumn("score", fluss::DataType::BigInt()) - .AddColumn("user_id", fluss::DataType::Int()) + .AddColumn("region", DataType::String()) + .AddColumn("score", DataType::BigInt()) + .AddColumn("user_id", DataType::Int()) .SetPrimaryKeys({"region", "user_id"}) .Build(); @@ -1167,10 +1173,10 @@ TEST_F(KvTableTest, PrefixLookupByBucketKey) { // Bucket key (a, b) is a strict prefix of the PK (a, b, c), enabling prefix lookup on (a, b). auto schema = fluss::Schema::NewBuilder() - .AddColumn("a", fluss::DataType::Int()) - .AddColumn("b", fluss::DataType::String()) - .AddColumn("c", fluss::DataType::BigInt()) - .AddColumn("d", fluss::DataType::String()) + .AddColumn("a", DataType::Int()) + .AddColumn("b", DataType::String()) + .AddColumn("c", DataType::BigInt()) + .AddColumn("d", DataType::String()) .SetPrimaryKeys({"a", "b", "c"}) .Build(); @@ -1265,9 +1271,9 @@ TEST_F(KvTableTest, PrefixLookupValidationErrors) { fluss::TablePath table_path("fluss", "test_prefix_lookup_validation_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("a", fluss::DataType::Int()) - .AddColumn("b", fluss::DataType::String()) - .AddColumn("c", fluss::DataType::BigInt()) + .AddColumn("a", DataType::Int()) + .AddColumn("b", DataType::String()) + .AddColumn("c", DataType::BigInt()) .SetPrimaryKeys({"a", "b", "c"}) .Build(); @@ -1311,11 +1317,11 @@ TEST_F(KvTableTest, PrefixLookupPartitioned) { // Partitioned by region; bucket key (a, b) is a prefix of the PK minus the partition column. auto schema = fluss::Schema::NewBuilder() - .AddColumn("region", fluss::DataType::String()) - .AddColumn("a", fluss::DataType::Int()) - .AddColumn("b", fluss::DataType::String()) - .AddColumn("c", fluss::DataType::BigInt()) - .AddColumn("d", fluss::DataType::String()) + .AddColumn("region", DataType::String()) + .AddColumn("a", DataType::Int()) + .AddColumn("b", DataType::String()) + .AddColumn("c", DataType::BigInt()) + .AddColumn("d", DataType::String()) .SetPrimaryKeys({"region", "a", "b", "c"}) .Build(); @@ -1415,10 +1421,10 @@ TEST_F(KvTableTest, PartialUpdate) { fluss::TablePath table_path("fluss", "test_partial_update_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("age", fluss::DataType::BigInt()) - .AddColumn("score", fluss::DataType::BigInt()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) + .AddColumn("age", DataType::BigInt()) + .AddColumn("score", DataType::BigInt()) .SetPrimaryKeys({"id"}) .Build(); @@ -1505,10 +1511,10 @@ TEST_F(KvTableTest, PartialUpdateByIndex) { fluss::TablePath table_path("fluss", "test_partial_update_by_index_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("age", fluss::DataType::BigInt()) - .AddColumn("score", fluss::DataType::BigInt()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) + .AddColumn("age", DataType::BigInt()) + .AddColumn("score", DataType::BigInt()) .SetPrimaryKeys({"id"}) .Build(); @@ -1596,10 +1602,10 @@ TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) { // Create a partitioned KV table with region as partition key auto schema = fluss::Schema::NewBuilder() - .AddColumn("region", fluss::DataType::String()) - .AddColumn("user_id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("score", fluss::DataType::BigInt()) + .AddColumn("region", DataType::String()) + .AddColumn("user_id", DataType::Int()) + .AddColumn("name", DataType::String()) + .AddColumn("score", DataType::BigInt()) .SetPrimaryKeys({"region", "user_id"}) .Build(); @@ -1738,23 +1744,23 @@ TEST_F(KvTableTest, AllSupportedDatatypes) { // Create a table with all supported datatypes auto schema = fluss::Schema::NewBuilder() - .AddColumn("pk_int", fluss::DataType::Int()) - .AddColumn("col_boolean", fluss::DataType::Boolean()) - .AddColumn("col_tinyint", fluss::DataType::TinyInt()) - .AddColumn("col_smallint", fluss::DataType::SmallInt()) - .AddColumn("col_int", fluss::DataType::Int()) - .AddColumn("col_bigint", fluss::DataType::BigInt()) - .AddColumn("col_float", fluss::DataType::Float()) - .AddColumn("col_double", fluss::DataType::Double()) - .AddColumn("col_char", fluss::DataType::Char(10)) - .AddColumn("col_string", fluss::DataType::String()) - .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) - .AddColumn("col_date", fluss::DataType::Date()) - .AddColumn("col_time", fluss::DataType::Time()) - .AddColumn("col_timestamp", fluss::DataType::Timestamp()) - .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) - .AddColumn("col_bytes", fluss::DataType::Bytes()) - .AddColumn("col_binary", fluss::DataType::Binary(20)) + .AddColumn("pk_int", DataType::Int()) + .AddColumn("col_boolean", DataType::Boolean()) + .AddColumn("col_tinyint", DataType::TinyInt()) + .AddColumn("col_smallint", DataType::SmallInt()) + .AddColumn("col_int", DataType::Int()) + .AddColumn("col_bigint", DataType::BigInt()) + .AddColumn("col_float", DataType::Float()) + .AddColumn("col_double", DataType::Double()) + .AddColumn("col_char", DataType::Char(10)) + .AddColumn("col_string", DataType::String()) + .AddColumn("col_decimal", DataType::Decimal(10, 2)) + .AddColumn("col_date", DataType::Date()) + .AddColumn("col_time", DataType::Time()) + .AddColumn("col_timestamp", DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", DataType::TimestampLtz()) + .AddColumn("col_bytes", DataType::Bytes()) + .AddColumn("col_binary", DataType::Binary(20)) .SetPrimaryKeys({"pk_int"}) .Build(); diff --git a/bindings/cpp/test/test_log_table.cpp b/bindings/cpp/test/test_log_table.cpp index 9078b27e..4a46ee86 100644 --- a/bindings/cpp/test/test_log_table.cpp +++ b/bindings/cpp/test/test_log_table.cpp @@ -29,6 +29,8 @@ #include "test_utils.h" +using fluss::DataType; + class LogTableTest : public ::testing::Test { protected: fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } @@ -45,8 +47,8 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { fluss::TablePath table_path("fluss", "test_append_record_batch_and_scan_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("c1", fluss::DataType::Int()) - .AddColumn("c2", fluss::DataType::String()) + .AddColumn("c1", DataType::Int()) + .AddColumn("c2", DataType::String()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -178,8 +180,8 @@ TEST_F(LogTableTest, LimitScan) { fluss::TablePath table_path("fluss", "test_limit_scan_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("c1", fluss::DataType::Int()) - .AddColumn("c2", fluss::DataType::String()) + .AddColumn("c1", DataType::Int()) + .AddColumn("c2", DataType::String()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -253,9 +255,9 @@ TEST_F(LogTableTest, LimitScanProjection) { fluss::TablePath table_path("fluss", "test_limit_scan_projection_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("c1", fluss::DataType::Int()) - .AddColumn("c2", fluss::DataType::String()) - .AddColumn("c3", fluss::DataType::BigInt()) + .AddColumn("c1", DataType::Int()) + .AddColumn("c2", DataType::String()) + .AddColumn("c3", DataType::BigInt()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -319,7 +321,7 @@ TEST_F(LogTableTest, LimitScanErrors) { fluss::TablePath table_path("fluss", "test_limit_scan_errors_cpp"); - auto schema = fluss::Schema::NewBuilder().AddColumn("c1", fluss::DataType::Int()).Build(); + auto schema = fluss::Schema::NewBuilder().AddColumn("c1", DataType::Int()).Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) .SetBucketCount(1) @@ -384,8 +386,8 @@ TEST_F(LogTableTest, ListOffsets) { fluss::TablePath table_path("fluss", "test_list_offsets_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -476,9 +478,9 @@ TEST_F(LogTableTest, TestProject) { fluss::TablePath table_path("fluss", "test_project_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("col_a", fluss::DataType::Int()) - .AddColumn("col_b", fluss::DataType::String()) - .AddColumn("col_c", fluss::DataType::Int()) + .AddColumn("col_a", DataType::Int()) + .AddColumn("col_b", DataType::String()) + .AddColumn("col_c", DataType::Int()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -591,8 +593,8 @@ TEST_F(LogTableTest, TestPollBatches) { fluss::TablePath table_path("fluss", "test_poll_batches_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -711,22 +713,22 @@ TEST_F(LogTableTest, AllSupportedDatatypes) { // Create a log table with all supported datatypes auto schema = fluss::Schema::NewBuilder() - .AddColumn("col_tinyint", fluss::DataType::TinyInt()) - .AddColumn("col_smallint", fluss::DataType::SmallInt()) - .AddColumn("col_int", fluss::DataType::Int()) - .AddColumn("col_bigint", fluss::DataType::BigInt()) - .AddColumn("col_float", fluss::DataType::Float()) - .AddColumn("col_double", fluss::DataType::Double()) - .AddColumn("col_boolean", fluss::DataType::Boolean()) - .AddColumn("col_char", fluss::DataType::Char(10)) - .AddColumn("col_string", fluss::DataType::String()) - .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) - .AddColumn("col_date", fluss::DataType::Date()) - .AddColumn("col_time", fluss::DataType::Time()) - .AddColumn("col_timestamp", fluss::DataType::Timestamp()) - .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) - .AddColumn("col_bytes", fluss::DataType::Bytes()) - .AddColumn("col_binary", fluss::DataType::Binary(4)) + .AddColumn("col_tinyint", DataType::TinyInt()) + .AddColumn("col_smallint", DataType::SmallInt()) + .AddColumn("col_int", DataType::Int()) + .AddColumn("col_bigint", DataType::BigInt()) + .AddColumn("col_float", DataType::Float()) + .AddColumn("col_double", DataType::Double()) + .AddColumn("col_boolean", DataType::Boolean()) + .AddColumn("col_char", DataType::Char(10)) + .AddColumn("col_string", DataType::String()) + .AddColumn("col_decimal", DataType::Decimal(10, 2)) + .AddColumn("col_date", DataType::Date()) + .AddColumn("col_time", DataType::Time()) + .AddColumn("col_timestamp", DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", DataType::TimestampLtz()) + .AddColumn("col_bytes", DataType::Bytes()) + .AddColumn("col_binary", DataType::Binary(4)) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -862,9 +864,9 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { // Create a partitioned log table auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("region", fluss::DataType::String()) - .AddColumn("value", fluss::DataType::BigInt()) + .AddColumn("id", DataType::Int()) + .AddColumn("region", DataType::String()) + .AddColumn("value", DataType::BigInt()) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -1049,9 +1051,9 @@ TEST_F(LogTableTest, AppendAndScanWithArray) { fluss::TablePath table_path("fluss", "test_append_scan_with_array_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("tags", fluss::DataType::Array(fluss::DataType::String())) - .AddColumn("scores", fluss::DataType::Array(fluss::DataType::Int())) + .AddColumn("id", DataType::Int()) + .AddColumn("tags", DataType::Array(DataType::String())) + .AddColumn("scores", DataType::Array(DataType::Int())) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -1084,12 +1086,12 @@ TEST_F(LogTableTest, AppendAndScanWithArray) { auto row = table.NewRow(); row.Set("id", 1); - fluss::ArrayWriter tags(2, fluss::DataType::String()); + fluss::ArrayWriter tags(2, DataType::String()); tags.SetString(0, "hello"); tags.SetString(1, "world"); row.SetArray(1, std::move(tags)); - fluss::ArrayWriter scores(3, fluss::DataType::Int()); + fluss::ArrayWriter scores(3, DataType::Int()); scores.SetInt32(0, 10); scores.SetInt32(1, 20); scores.SetInt32(2, 30); @@ -1101,11 +1103,11 @@ TEST_F(LogTableTest, AppendAndScanWithArray) { auto row = table.NewRow(); row.Set("id", 2); - fluss::ArrayWriter tags(1, fluss::DataType::String()); + fluss::ArrayWriter tags(1, DataType::String()); tags.SetNull(0); row.SetArray(1, std::move(tags)); - fluss::ArrayWriter scores(0, fluss::DataType::Int()); + fluss::ArrayWriter scores(0, DataType::Int()); row.SetArray(2, std::move(scores)); ASSERT_OK(append_writer.Append(row)); @@ -1178,14 +1180,13 @@ TEST_F(LogTableTest, AppendAndScanWithMapAndRow) { fluss::TablePath table_path("fluss", "test_append_scan_map_row_cpp"); - // MAP / ROW columns can't be built with the flat schema builder. - auto arrow_schema = arrow::schema({ - arrow::field("id", arrow::int32()), - arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int32())), - arrow::field("nested", arrow::struct_({arrow::field("seq", arrow::int32()), - arrow::field("label", arrow::utf8())})), - }); - auto schema = fluss::Schema::FromArrow(arrow_schema); + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("id", DataType::Int()) + .AddColumn("attrs", DataType::Map(DataType::String(), DataType::Int())) + .AddColumn("nested", DataType::Row({{"seq", DataType::Int()}, + {"label", DataType::String()}})) + .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -1203,7 +1204,7 @@ TEST_F(LogTableTest, AppendAndScanWithMapAndRow) { auto row = table.NewRow(); row.Set("id", 1); - fluss::MapWriter attrs(2, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter attrs(2, DataType::String(), DataType::Int()); attrs.SetKeyString("a"); attrs.SetValueInt32(1); attrs.Commit(); @@ -1267,15 +1268,15 @@ TEST_F(LogTableTest, ProjectionWithCompoundTypes) { fluss::TablePath table_path("fluss", "test_log_projection_compound_cpp"); - auto arrow_schema = arrow::schema({ - arrow::field("id", arrow::int32()), - arrow::field("nested", arrow::struct_({arrow::field("seq", arrow::int32()), - arrow::field("label", arrow::utf8())})), - arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int32())), - arrow::field("tags", arrow::list(arrow::utf8())), - arrow::field("extra", arrow::utf8()), - }); - auto schema = fluss::Schema::FromArrow(arrow_schema); + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("id", DataType::Int()) + .AddColumn("nested", DataType::Row({{"seq", DataType::Int()}, + {"label", DataType::String()}})) + .AddColumn("attrs", DataType::Map(DataType::String(), DataType::Int())) + .AddColumn("tags", DataType::Array(DataType::String())) + .AddColumn("extra", DataType::String()) + .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -1296,7 +1297,7 @@ TEST_F(LogTableTest, ProjectionWithCompoundTypes) { nested.SetInt32(0, 42); nested.SetString(1, "hello"); row.SetRow(1, std::move(nested)); - fluss::MapWriter attrs(2, fluss::DataType::String(), fluss::DataType::Int()); + fluss::MapWriter attrs(2, DataType::String(), DataType::Int()); attrs.SetKeyString("x"); attrs.SetValueInt32(1); attrs.Commit(); @@ -1304,7 +1305,7 @@ TEST_F(LogTableTest, ProjectionWithCompoundTypes) { attrs.SetValueInt32(2); attrs.Commit(); row.SetMap(2, std::move(attrs)); - fluss::ArrayWriter tags(2, fluss::DataType::String()); + fluss::ArrayWriter tags(2, DataType::String()); tags.SetString(0, "alpha"); tags.SetString(1, "beta"); row.SetArray(3, std::move(tags)); @@ -1365,9 +1366,9 @@ TEST_F(LogTableTest, AppendAndScanWithNestedArray) { auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) + .AddColumn("id", DataType::Int()) .AddColumn("matrix", - fluss::DataType::Array(fluss::DataType::Array(fluss::DataType::Int()))) + DataType::Array(DataType::Array(DataType::Int()))) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -1389,15 +1390,15 @@ TEST_F(LogTableTest, AppendAndScanWithNestedArray) { auto row = table.NewRow(); row.Set("id", 1); - fluss::ArrayWriter inner1(2, fluss::DataType::Int()); + fluss::ArrayWriter inner1(2, DataType::Int()); inner1.SetInt32(0, 1); inner1.SetInt32(1, 2); - fluss::ArrayWriter inner2(2, fluss::DataType::Int()); + fluss::ArrayWriter inner2(2, DataType::Int()); inner2.SetInt32(0, 3); inner2.SetInt32(1, 4); - fluss::ArrayWriter outer(2, fluss::DataType::Array(fluss::DataType::Int())); + fluss::ArrayWriter outer(2, DataType::Array(DataType::Int())); outer.SetArray(0, std::move(inner1)); outer.SetArray(1, std::move(inner2)); @@ -1460,12 +1461,12 @@ TEST_F(LogTableTest, AppendAndScanWithArrayRichTypes) { auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("arr_bytes", fluss::DataType::Array(fluss::DataType::Bytes())) - .AddColumn("arr_date", fluss::DataType::Array(fluss::DataType::Date())) - .AddColumn("arr_time", fluss::DataType::Array(fluss::DataType::Time())) - .AddColumn("arr_ts", fluss::DataType::Array(fluss::DataType::Timestamp(6))) - .AddColumn("arr_decimal", fluss::DataType::Array(fluss::DataType::Decimal(10, 2))) + .AddColumn("id", DataType::Int()) + .AddColumn("arr_bytes", DataType::Array(DataType::Bytes())) + .AddColumn("arr_date", DataType::Array(DataType::Date())) + .AddColumn("arr_time", DataType::Array(DataType::Time())) + .AddColumn("arr_ts", DataType::Array(DataType::Timestamp(6))) + .AddColumn("arr_decimal", DataType::Array(DataType::Decimal(10, 2))) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -1485,28 +1486,28 @@ TEST_F(LogTableTest, AppendAndScanWithArrayRichTypes) { auto row = table.NewRow(); row.Set("id", 1); - fluss::ArrayWriter arr_bytes(2, fluss::DataType::Bytes()); + fluss::ArrayWriter arr_bytes(2, DataType::Bytes()); arr_bytes.SetBytes(0, std::vector{0x10, 0x20, 0x30}); arr_bytes.SetNull(1); row.SetArray(1, std::move(arr_bytes)); - fluss::ArrayWriter arr_date(2, fluss::DataType::Date()); + fluss::ArrayWriter arr_date(2, DataType::Date()); auto d0 = fluss::Date::FromDays(20000); arr_date.SetDate(0, d0); arr_date.SetNull(1); row.SetArray(2, std::move(arr_date)); - fluss::ArrayWriter arr_time(1, fluss::DataType::Time()); + fluss::ArrayWriter arr_time(1, DataType::Time()); auto t0 = fluss::Time::FromMillis(3600000); arr_time.SetTime(0, t0); row.SetArray(3, std::move(arr_time)); - fluss::ArrayWriter arr_ts(1, fluss::DataType::Timestamp(6)); + fluss::ArrayWriter arr_ts(1, DataType::Timestamp(6)); auto ts0 = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); arr_ts.SetTimestampNtz(0, ts0); row.SetArray(4, std::move(arr_ts)); - fluss::ArrayWriter arr_decimal(2, fluss::DataType::Decimal(10, 2)); + fluss::ArrayWriter arr_decimal(2, DataType::Decimal(10, 2)); arr_decimal.SetDecimal(0, "123.45"); arr_decimal.SetNull(1); row.SetArray(5, std::move(arr_decimal)); @@ -1566,7 +1567,7 @@ TEST_F(LogTableTest, AppendAndScanWithArrayRichTypes) { TEST_F(LogTableTest, ArrayApiValidationErrors) { // Type mismatch setter should fail through FFI Result propagation. { - fluss::ArrayWriter bool_array(1, fluss::DataType::Boolean()); + fluss::ArrayWriter bool_array(1, DataType::Boolean()); bool threw = false; try { bool_array.SetInt32(0, 42); @@ -1581,8 +1582,8 @@ TEST_F(LogTableTest, ArrayApiValidationErrors) { fluss::TablePath table_path("fluss", "test_array_api_validation_errors_cpp"); auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("vals", fluss::DataType::Array(fluss::DataType::Int())) + .AddColumn("id", DataType::Int()) + .AddColumn("vals", DataType::Array(DataType::Int())) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -1598,7 +1599,7 @@ TEST_F(LogTableTest, ArrayApiValidationErrors) { ASSERT_OK(table.NewAppend().CreateWriter(append_writer)); auto row = table.NewRow(); row.Set("id", 1); - fluss::ArrayWriter vals(2, fluss::DataType::Int()); + fluss::ArrayWriter vals(2, DataType::Int()); vals.SetInt32(0, 7); vals.SetNull(1); row.SetArray(1, std::move(vals)); @@ -1659,13 +1660,13 @@ TEST_F(LogTableTest, AppendAndScanWithArrayEncodingEdgeCases) { auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("arr_long_str", fluss::DataType::Array(fluss::DataType::String())) - .AddColumn("arr_big_decimal", fluss::DataType::Array(fluss::DataType::Decimal(22, 5))) - .AddColumn("arr_ts_nano", fluss::DataType::Array(fluss::DataType::Timestamp(9))) - .AddColumn("arr_float", fluss::DataType::Array(fluss::DataType::Float())) - .AddColumn("arr_double", fluss::DataType::Array(fluss::DataType::Double())) - .AddColumn("arr_binary", fluss::DataType::Array(fluss::DataType::Binary(4))) + .AddColumn("id", DataType::Int()) + .AddColumn("arr_long_str", DataType::Array(DataType::String())) + .AddColumn("arr_big_decimal", DataType::Array(DataType::Decimal(22, 5))) + .AddColumn("arr_ts_nano", DataType::Array(DataType::Timestamp(9))) + .AddColumn("arr_float", DataType::Array(DataType::Float())) + .AddColumn("arr_double", DataType::Array(DataType::Double())) + .AddColumn("arr_binary", DataType::Array(DataType::Binary(4))) .Build(); auto table_descriptor = fluss::TableDescriptor::NewBuilder() @@ -1686,38 +1687,38 @@ TEST_F(LogTableTest, AppendAndScanWithArrayEncodingEdgeCases) { row.Set("id", 1); // >= 8 bytes forces the heap-pointer variable-length path (threshold: 7) - fluss::ArrayWriter arr_long_str(2, fluss::DataType::String()); + fluss::ArrayWriter arr_long_str(2, DataType::String()); arr_long_str.SetString(0, "abcdefgh"); arr_long_str.SetString(1, "this is a much longer string that definitely exceeds inline"); row.SetArray(1, std::move(arr_long_str)); // precision > 18 forces non-compact decimal encoding - fluss::ArrayWriter arr_big_decimal(2, fluss::DataType::Decimal(22, 5)); + fluss::ArrayWriter arr_big_decimal(2, DataType::Decimal(22, 5)); arr_big_decimal.SetDecimal(0, "12345678901234567.12345"); arr_big_decimal.SetDecimal(1, "-99999999999999999.99999"); row.SetArray(2, std::move(arr_big_decimal)); // precision > 3 forces non-compact timestamp (millis + nanos-of-millis) - fluss::ArrayWriter arr_ts_nano(1, fluss::DataType::Timestamp(9)); + fluss::ArrayWriter arr_ts_nano(1, DataType::Timestamp(9)); auto ts_nano = fluss::Timestamp::FromMillisNanos(1769163227123, 456789); arr_ts_nano.SetTimestampNtz(0, ts_nano); row.SetArray(3, std::move(arr_ts_nano)); // IEEE 754 special values: NaN, +Infinity, -Infinity - fluss::ArrayWriter arr_float(3, fluss::DataType::Float()); + fluss::ArrayWriter arr_float(3, DataType::Float()); arr_float.SetFloat32(0, std::numeric_limits::quiet_NaN()); arr_float.SetFloat32(1, std::numeric_limits::infinity()); arr_float.SetFloat32(2, -std::numeric_limits::infinity()); row.SetArray(4, std::move(arr_float)); - fluss::ArrayWriter arr_double(3, fluss::DataType::Double()); + fluss::ArrayWriter arr_double(3, DataType::Double()); arr_double.SetFloat64(0, std::numeric_limits::quiet_NaN()); arr_double.SetFloat64(1, std::numeric_limits::infinity()); arr_double.SetFloat64(2, -std::numeric_limits::infinity()); row.SetArray(5, std::move(arr_double)); // Fixed-length binary - fluss::ArrayWriter arr_binary(2, fluss::DataType::Binary(4)); + fluss::ArrayWriter arr_binary(2, DataType::Binary(4)); arr_binary.SetBytes(0, std::vector{0xDE, 0xAD, 0xBE, 0xEF}); arr_binary.SetNull(1); row.SetArray(6, std::move(arr_binary)); @@ -1795,7 +1796,7 @@ TEST_F(LogTableTest, AppendAndScanWithArrayEncodingEdgeCases) { TEST_F(LogTableTest, ArrayWriterOverflowDetection) { // SetInt32 on TINYINT array must throw when value overflows i8 range (-128..127) { - fluss::ArrayWriter tinyint_arr(1, fluss::DataType::TinyInt()); + fluss::ArrayWriter tinyint_arr(1, DataType::TinyInt()); EXPECT_EQ(tinyint_arr.Size(), 1u); bool threw = false; try { @@ -1810,7 +1811,7 @@ TEST_F(LogTableTest, ArrayWriterOverflowDetection) { // SetInt32 on SMALLINT array must throw when value overflows i16 range (-32768..32767) { - fluss::ArrayWriter smallint_arr(1, fluss::DataType::SmallInt()); + fluss::ArrayWriter smallint_arr(1, DataType::SmallInt()); bool threw = false; try { smallint_arr.SetInt32(0, 40000); @@ -1824,7 +1825,7 @@ TEST_F(LogTableTest, ArrayWriterOverflowDetection) { // Negative overflow: -200 doesn't fit TINYINT { - fluss::ArrayWriter tinyint_arr(1, fluss::DataType::TinyInt()); + fluss::ArrayWriter tinyint_arr(1, DataType::TinyInt()); bool threw = false; try { tinyint_arr.SetInt32(0, -200); @@ -1836,15 +1837,15 @@ TEST_F(LogTableTest, ArrayWriterOverflowDetection) { // Values within range must succeed { - fluss::ArrayWriter tinyint_arr(1, fluss::DataType::TinyInt()); + fluss::ArrayWriter tinyint_arr(1, DataType::TinyInt()); EXPECT_NO_THROW(tinyint_arr.SetInt32(0, 127)); } { - fluss::ArrayWriter tinyint_arr(1, fluss::DataType::TinyInt()); + fluss::ArrayWriter tinyint_arr(1, DataType::TinyInt()); EXPECT_NO_THROW(tinyint_arr.SetInt32(0, -128)); } { - fluss::ArrayWriter smallint_arr(1, fluss::DataType::SmallInt()); + fluss::ArrayWriter smallint_arr(1, DataType::SmallInt()); EXPECT_NO_THROW(smallint_arr.SetInt32(0, 32767)); } } @@ -1855,7 +1856,7 @@ TEST_F(LogTableTest, MapWriterOverflowDetection) { // TINYINT map value overflowing i8 (-128..127) must throw. { - fluss::MapWriter m(1, fluss::DataType::String(), fluss::DataType::TinyInt()); + fluss::MapWriter m(1, DataType::String(), DataType::TinyInt()); bool threw = false; try { m.SetValueInt32(1000); @@ -1868,7 +1869,7 @@ TEST_F(LogTableTest, MapWriterOverflowDetection) { // SMALLINT map value overflowing i16 must throw. { - fluss::MapWriter m(1, fluss::DataType::String(), fluss::DataType::SmallInt()); + fluss::MapWriter m(1, DataType::String(), DataType::SmallInt()); bool threw = false; try { m.SetValueInt32(40000); @@ -1881,7 +1882,7 @@ TEST_F(LogTableTest, MapWriterOverflowDetection) { // Keys are checked the same way: a TINYINT key out of range throws. { - fluss::MapWriter m(1, fluss::DataType::TinyInt(), fluss::DataType::Int()); + fluss::MapWriter m(1, DataType::TinyInt(), DataType::Int()); bool threw = false; try { m.SetKeyInt32(-200); @@ -1894,7 +1895,7 @@ TEST_F(LogTableTest, MapWriterOverflowDetection) { // In-range key and value must succeed. { - fluss::MapWriter m(1, fluss::DataType::TinyInt(), fluss::DataType::SmallInt()); + fluss::MapWriter m(1, DataType::TinyInt(), DataType::SmallInt()); EXPECT_NO_THROW(m.SetKeyInt32(127)); EXPECT_NO_THROW(m.SetValueInt32(32767)); } @@ -1908,13 +1909,13 @@ TEST_F(LogTableTest, NullabilityPreservedInTableInfo) { auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("tags", fluss::DataType::Array(fluss::DataType::String().NotNull())) - .AddColumn("ids", fluss::DataType::Array(fluss::DataType::Int()).NotNull()) + .AddColumn("id", DataType::Int()) + .AddColumn("name", DataType::String()) + .AddColumn("tags", DataType::Array(DataType::String().NotNull())) + .AddColumn("ids", DataType::Array(DataType::Int()).NotNull()) .AddColumn("nested", - fluss::DataType::Array( - fluss::DataType::Array(fluss::DataType::Int()).NotNull())) + DataType::Array( + DataType::Array(DataType::Int()).NotNull())) .SetPrimaryKeys({"id"}) .Build(); diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index c845010f..ff5201a9 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -461,7 +461,6 @@ Read-only result of a prefix lookup — zero or more matched rows. Each row is a | Method | Description | |-----------------------------------|-----------------------------| | `NewBuilder() -> Schema::Builder` | Create a new schema builder | -| `FromArrow(std::shared_ptr schema, std::vector primary_keys = {}) -> Schema` | Build a schema from an existing Arrow schema (escape hatch; prefer `DataType::Map` / `DataType::Row`) | ## `Schema::Builder` diff --git a/website/docs/user-guide/cpp/data-types.md b/website/docs/user-guide/cpp/data-types.md index 9b8ab0c6..106db41a 100644 --- a/website/docs/user-guide/cpp/data-types.md +++ b/website/docs/user-guide/cpp/data-types.md @@ -85,12 +85,9 @@ auto schema = fluss::Schema::NewBuilder() .Build(); ``` -:::note Arrow escape hatch -If you already have an `arrow::Schema`, pass it directly with -`fluss::Schema::FromArrow(arrow_schema, /*primary_keys=*/{"id"})`. It's -equivalent — the native factories above lower to the same Arrow types -internally, without pulling Arrow into your code. -::: +Column types are sent to the server exactly as declared: precision, scale, +length, per-level nullability, and `ROW` field names all round-trip losslessly +through `CreateTable` and `GetTableInfo`. ## GenericRow Setters