Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,18 @@ Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
return {};
}

Status ToAvroNodeVisitor::Visit(const VariantType&, ::avro::NodePtr*) {
return NotSupported("Writing Iceberg variant type to Avro is not supported");
}

Status ToAvroNodeVisitor::Visit(const GeometryType&, ::avro::NodePtr*) {
return NotSupported("Writing Iceberg geometry type to Avro is not supported");
}

Status ToAvroNodeVisitor::Visit(const GeographyType&, ::avro::NodePtr*) {
return NotSupported("Writing Iceberg geography type to Avro is not supported");
}

Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodeRecord>();

Expand Down Expand Up @@ -631,6 +643,11 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
break;
case TypeId::kUnknown:
return {};
case TypeId::kVariant:
case TypeId::kGeometry:
case TypeId::kGeography:
return NotSupported("Reading Iceberg type {} from Avro is not supported",
expected_type);
default:
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class ToAvroNodeVisitor {
Status Visit(const FixedType& type, ::avro::NodePtr* node);
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
Status Visit(const UnknownType&, ::avro::NodePtr*);
Status Visit(const VariantType&, ::avro::NodePtr*);
Status Visit(const GeometryType&, ::avro::NodePtr*);
Status Visit(const GeographyType&, ::avro::NodePtr*);
Status Visit(const StructType& type, ::avro::NodePtr* node);
Status Visit(const ListType& type, ::avro::NodePtr* node);
Status Visit(const MapType& type, ::avro::NodePtr* node);
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/delete_file_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status EqualityDeleteFile::ConvertBoundsIfNeeded() const {
}

const auto& schema_field = field.value().get();
if (schema_field.type()->is_nested()) {
if (!schema_field.type()->is_primitive()) {
continue;
}

Expand Down Expand Up @@ -103,7 +103,7 @@ Result<bool> CanContainEqDeletesForFile(const DataFile& data_file,
}

const auto& field = found_field.value().get();
if (field.type()->is_nested()) {
if (!field.type()->is_primitive()) {
continue;
}

Expand Down
122 changes: 88 additions & 34 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ nlohmann::json ToJson(const Type& type) {
return "uuid";
case TypeId::kUnknown:
return "unknown";
case TypeId::kVariant:
return "variant";
case TypeId::kGeometry:
return type.ToString();
case TypeId::kGeography:
return type.ToString();
}
std::unreachable();
}
Expand Down Expand Up @@ -459,9 +465,10 @@ Result<std::unique_ptr<Type>> ListTypeFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto element_required,
GetJsonValue<bool>(json, kElementRequired));

return std::make_unique<ListType>(
SchemaField(element_id, std::string(ListType::kElementName),
std::move(element_type), !element_required));
ICEBERG_ASSIGN_OR_RAISE(auto type, ListType::Make(SchemaField(
element_id, std::string(ListType::kElementName),
std::move(element_type), !element_required)));
return std::unique_ptr<Type>(std::move(type));
}

Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
Expand All @@ -478,79 +485,126 @@ Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
/*optional=*/false);
SchemaField value_field(value_id, std::string(MapType::kValueName),
std::move(value_type), !value_required);
return std::make_unique<MapType>(std::move(key_field), std::move(value_field));
ICEBERG_ASSIGN_OR_RAISE(auto type,
MapType::Make(std::move(key_field), std::move(value_field)));
return std::unique_ptr<Type>(std::move(type));
}

} // namespace

Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
if (json.is_string()) {
std::string type_str = json.get<std::string>();
if (type_str == "boolean") {
const auto type_name = json.get<std::string>();
const auto normalized_type_name = StringUtils::ToLower(type_name);
if (normalized_type_name == "boolean") {
return std::make_unique<BooleanType>();
} else if (type_str == "int") {
} else if (normalized_type_name == "int") {
return std::make_unique<IntType>();
} else if (type_str == "long") {
} else if (normalized_type_name == "long") {
return std::make_unique<LongType>();
} else if (type_str == "float") {
} else if (normalized_type_name == "float") {
return std::make_unique<FloatType>();
} else if (type_str == "double") {
} else if (normalized_type_name == "double") {
return std::make_unique<DoubleType>();
} else if (type_str == "date") {
} else if (normalized_type_name == "date") {
return std::make_unique<DateType>();
} else if (type_str == "time") {
} else if (normalized_type_name == "time") {
return std::make_unique<TimeType>();
} else if (type_str == "timestamp") {
} else if (normalized_type_name == "timestamp") {
return std::make_unique<TimestampType>();
} else if (type_str == "timestamptz") {
} else if (normalized_type_name == "timestamptz") {
return std::make_unique<TimestampTzType>();
} else if (type_str == "timestamp_ns") {
} else if (normalized_type_name == "timestamp_ns") {
return std::make_unique<TimestampNsType>();
} else if (type_str == "timestamptz_ns") {
} else if (normalized_type_name == "timestamptz_ns") {
return std::make_unique<TimestampTzNsType>();
} else if (type_str == "string") {
} else if (normalized_type_name == "string") {
return std::make_unique<StringType>();
} else if (type_str == "binary") {
} else if (normalized_type_name == "binary") {
return std::make_unique<BinaryType>();
} else if (type_str == "uuid") {
} else if (normalized_type_name == "uuid") {
return std::make_unique<UuidType>();
} else if (type_str == "unknown") {
} else if (normalized_type_name == "unknown") {
return std::make_unique<UnknownType>();
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
} else if (normalized_type_name == "variant") {
return std::make_unique<VariantType>();
} else if (normalized_type_name.starts_with("fixed")) {
static const std::regex kFixedRegex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
if (std::regex_match(type_str, match, fixed_regex)) {
if (std::regex_match(normalized_type_name, match, kFixedRegex)) {
ICEBERG_ASSIGN_OR_RAISE(auto length,
StringUtils::ParseNumber<int32_t>(match[1].str()));
return std::make_unique<FixedType>(length);
}
return JsonParseError("Invalid fixed type: {}", type_str);
} else if (type_str.starts_with("decimal")) {
std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
return JsonParseError("Invalid fixed type: {}", type_name);
} else if (normalized_type_name.starts_with("decimal")) {
static const std::regex kDecimalRegex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
std::smatch match;
if (std::regex_match(type_str, match, decimal_regex)) {
if (std::regex_match(normalized_type_name, match, kDecimalRegex)) {
ICEBERG_ASSIGN_OR_RAISE(auto precision,
StringUtils::ParseNumber<int32_t>(match[1].str()));
ICEBERG_ASSIGN_OR_RAISE(auto scale,
StringUtils::ParseNumber<int32_t>(match[2].str()));
return std::make_unique<DecimalType>(precision, scale);
}
return JsonParseError("Invalid decimal type: {}", type_str);
return JsonParseError("Invalid decimal type: {}", type_name);
} else if (normalized_type_name.starts_with("geometry")) {
static const std::regex kGeometryRegex(R"(geometry\s*(?:\(\s*([^)]*?)\s*\))?)",
std::regex_constants::icase);
std::smatch match;
if (std::regex_match(type_name, match, kGeometryRegex)) {
if (match[1].matched) {
auto crs = match[1].str();
if (crs.empty()) {
return JsonParseError("Invalid geometry type: {}", type_name);
}
ICEBERG_ASSIGN_OR_RAISE(auto type, GeometryType::Make(std::move(crs)));
return std::unique_ptr<Type>(std::move(type));
}
ICEBERG_ASSIGN_OR_RAISE(auto type, GeometryType::Make());
return std::unique_ptr<Type>(std::move(type));
}
return JsonParseError("Invalid geometry type: {}", type_name);
} else if (normalized_type_name.starts_with("geography")) {
static const std::regex kGeographyRegex(
R"(geography\s*(?:\(\s*([^,]*?)\s*(?:,\s*(\w*)\s*)?\))?)",
std::regex_constants::icase);
std::smatch match;
if (std::regex_match(type_name, match, kGeographyRegex)) {
auto crs = match[1].str();
if (match[1].matched && crs.empty()) {
return JsonParseError("Invalid geography type: {}", type_name);
}
if (match[2].matched) {
ICEBERG_ASSIGN_OR_RAISE(auto algorithm,
EdgeAlgorithmFromString(match[2].str()));
ICEBERG_ASSIGN_OR_RAISE(auto type,
GeographyType::Make(std::move(crs), algorithm));
return std::unique_ptr<Type>(std::move(type));
}
if (match[1].matched) {
ICEBERG_ASSIGN_OR_RAISE(auto type, GeographyType::Make(std::move(crs)));
return std::unique_ptr<Type>(std::move(type));
}
ICEBERG_ASSIGN_OR_RAISE(auto type, GeographyType::Make());
return std::unique_ptr<Type>(std::move(type));
}
return JsonParseError("Invalid geography type: {}", type_name);
} else {
return JsonParseError("Unknown primitive type: {}", type_str);
return JsonParseError("Cannot parse type string: {}", type_name);
}
}

// For complex types like struct, list, and map
ICEBERG_ASSIGN_OR_RAISE(auto type_str, GetJsonValue<std::string>(json, kType));
if (type_str == kStruct) {
ICEBERG_ASSIGN_OR_RAISE(auto complex_type_name, GetJsonValue<std::string>(json, kType));
if (complex_type_name == kStruct) {
return StructTypeFromJson(json);
} else if (type_str == kList) {
} else if (complex_type_name == kList) {
return ListTypeFromJson(json);
} else if (type_str == kMap) {
} else if (complex_type_name == kMap) {
return MapTypeFromJson(json);
} else {
return JsonParseError("Unknown complex type: {}", type_str);
return JsonParseError("Unknown complex type: {}", complex_type_name);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/iceberg/metrics_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
Status Visit(const Type& type) {
if (type.is_nested()) {
return VisitNested(internal::checked_cast<const NestedType&>(type));
} else if (type.is_variant()) {
return {};
} else {
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
}
Expand All @@ -207,8 +209,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
if (!ShouldContinue()) {
break;
}
// TODO(zhuo.wang): variant type should also be handled here
if (field.type()->is_primitive()) {
if (!field.type()->is_nested()) {
ids_.insert(field.field_id());
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/parquet/parquet_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ class CollectMetricsVisitor {

Status VisitMap(const MapType& /*type*/, const std::string& /*prefix*/) { return {}; }

Status VisitVariant(const VariantType& /*type*/, const std::string& /*prefix*/) {
return {};
}

Status VisitPrimitive(const PrimitiveType& /*type*/, const std::string& /*prefix*/) {
return {};
}
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/parquet/parquet_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ Status ValidateParquetSchemaEvolution(
break;
case TypeId::kUnknown:
return {};
case TypeId::kVariant:
case TypeId::kGeometry:
case TypeId::kGeography:
return NotSupported("Reading Iceberg type {} from Parquet is not supported",
expected_type);
case TypeId::kStruct:
if (arrow_type->id() == ::arrow::Type::STRUCT) {
return {};
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class FieldMetricsCollector {

Status VisitMap(const MapType& /*type*/, const ::arrow::Array& /*array*/) { return {}; }

Status VisitVariant(const VariantType& /*type*/, const ::arrow::Array& /*array*/) {
return {};
}

Status VisitPrimitive(const PrimitiveType& type, const ::arrow::Array& array) {
switch (type.type_id()) {
case TypeId::kFloat:
Expand Down
35 changes: 35 additions & 0 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/schema_internal.h"

#include <cerrno>
#include <charconv>
#include <cstring>
#include <optional>
Expand All @@ -39,6 +40,33 @@ constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
constexpr int32_t kUnknownFieldId = -1;

Status CheckArrowCompatible(const Type& type) {
switch (type.type_id()) {
case TypeId::kVariant:
case TypeId::kGeometry:
case TypeId::kGeography:
return NotSupported("Iceberg type {} is not supported by Arrow conversion",
type.ToString());
case TypeId::kStruct:
for (const auto& field : static_cast<const StructType&>(type).fields()) {
ICEBERG_RETURN_UNEXPECTED(CheckArrowCompatible(*field.type()));
}
break;
case TypeId::kList:
ICEBERG_RETURN_UNEXPECTED(
CheckArrowCompatible(*static_cast<const ListType&>(type).element().type()));
break;
case TypeId::kMap: {
const auto& map_type = static_cast<const MapType&>(type);
ICEBERG_RETURN_UNEXPECTED(CheckArrowCompatible(*map_type.key().type()));
ICEBERG_RETURN_UNEXPECTED(CheckArrowCompatible(*map_type.value().type()));
} break;
default:
break;
}
return {};
}

// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
std::optional<int32_t> field_id, ArrowSchema* schema) {
Expand Down Expand Up @@ -153,6 +181,11 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
case TypeId::kUnknown:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_NA));
break;
case TypeId::kVariant:
case TypeId::kGeometry:
case TypeId::kGeography:
ArrowBufferReset(&metadata_buffer);
return EINVAL;
}

if (!name.empty()) {
Expand All @@ -179,6 +212,8 @@ Status ToArrowSchema(const Schema& schema, ArrowSchema* out) {
return InvalidArgument("Output Arrow schema cannot be null");
}

ICEBERG_RETURN_UNEXPECTED(CheckArrowCompatible(schema));

ArrowSchemaInit(out);

if (ArrowErrorCode errorCode = ToArrowSchema(schema, /*optional=*/false, /*name=*/"",
Expand Down
5 changes: 2 additions & 3 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int64_t kInitialRowId = 0;

static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {
{TypeId::kTimestampNs, 3},
{TypeId::kTimestampTzNs, 3},
{TypeId::kUnknown, 3},
{TypeId::kTimestampNs, 3}, {TypeId::kTimestampTzNs, 3}, {TypeId::kUnknown, 3},
{TypeId::kVariant, 3}, {TypeId::kGeometry, 3}, {TypeId::kGeography, 3},
};

/// An integer version number for the format
Expand Down
14 changes: 14 additions & 0 deletions src/iceberg/test/arrow_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ INSTANTIATE_TEST_SUITE_P(
ToArrowSchemaParam{.iceberg_type = iceberg::unknown(),
.arrow_type = ::arrow::null()}));

TEST(ToArrowSchemaTest, UnsupportedV3Types) {
const std::vector<std::shared_ptr<Type>> unsupported_types = {
iceberg::variant(), iceberg::geometry(), iceberg::geography()};

for (const auto& unsupported_type : unsupported_types) {
Schema schema(
{SchemaField::MakeOptional(/*field_id=*/1, "unsupported", unsupported_type)},
/*schema_id=*/0);
ArrowSchema arrow_schema;
ASSERT_THAT(ToArrowSchema(schema, &arrow_schema),
HasErrorMessage("is not supported by Arrow conversion"));
}
}

namespace {

void CheckArrowField(const ::arrow::Field& field, ::arrow::Type::type type_id,
Expand Down
Loading
Loading