[AURON #2321] Support Iceberg column rename and drop-then-add in the native scan#2322
[AURON #2321] Support Iceberg column rename and drop-then-add in the native scan#2322lyne7-sc wants to merge 7 commits into
Conversation
There was a problem hiding this comment.
@lyne7-sc, thanks for contribution. I left some comments for this pull request. PTAL.
| .metadata() | ||
| .get(PARQUET_FIELD_ID_META_KEY) | ||
| .is_some_and(|file_field_id| file_field_id == table_field_id), | ||
| None => table_field.name().eq_ignore_ascii_case(file_field.name()), |
There was a problem hiding this comment.
When table_field has a PARQUET:field_id but file_field does not, is_some_and returns false and there is no name-based fallback — the column simply doesn't match.
For spec-compliant Iceberg Parquet files this is fine (the Iceberg spec mandates field IDs, and arrow-rs populates them into Arrow metadata). But if an older Parquet writer omitted the field_id in the Thrift SchemaElement, or if a non-Iceberg Parquet file happens to be served through this path, every column would fail to match and the scan would produce all-NULL rows.
Consider falling back to name matching when file_field lacks a field ID:
fn fields_match(table_field: &Field, file_field: &Field) -> bool {
match table_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
Some(table_field_id) => match file_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
Some(file_field_id) => file_field_id == table_field_id,
None => table_field.name().eq_ignore_ascii_case(file_field.name()),
},
None => table_field.name().eq_ignore_ascii_case(file_field.name()),
}
}This preserves field-id matching when both sides have IDs, but degrades gracefully to name matching otherwise.
There was a problem hiding this comment.
Updated fields_match to use a nested match. When file_field lacks a field id, it now falls back to case-insensitive name matching.
| @@ -75,6 +76,27 @@ object IcebergScanSupport extends Logging { | |||
| partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), | |||
| "Has unsupported schema type.") | |||
|
|
|||
There was a problem hiding this comment.
The inspected block bundles detectRenameOrDrop and expectedFieldIds in a single try/catch. If detectRenameOrDrop throws (e.g., a catalog timeout in table.schemas()), expectedFieldIds — which is independent and likely to succeed since expectedSchema() is a local field access — is also discarded. The scan falls back to Spark entirely.
Consider separating them:
val fieldIdsByName = try {
AuronIcebergSourceUtil.expectedFieldIds(scan.asInstanceOf[AnyRef])
} catch { case NonFatal(t) => logWarning(...); return None }
val renameOrDrop = try {
AuronIcebergSourceUtil.detectRenameOrDrop(scan.asInstanceOf[AnyRef])
} catch { case NonFatal(t) =>
logWarning(...)
AuronIcebergSourceUtil.RenameOrDrop(topLevel = true, nested = true) // conservative
}This way a transient schema-history failure can still fall back on the ORC/nested guards while preserving field-id matching for Parquet.
There was a problem hiding this comment.
Split this into two independent inspection steps.
expectedFieldIds failure returns None because field-id mapping is required for safe native planning.
detectRenameOrDrop failure returns None because rename/drop safety cannot be determined reliably.
This avoids reporting a misleading nested rename/drop fallback reason when the actual issue is schema-history inspection failure.
| @@ -75,6 +76,27 @@ object IcebergScanSupport extends Logging { | |||
| partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), | |||
| "Has unsupported schema type.") | |||
|
|
|||
There was a problem hiding this comment.
Minor: plan() is called twice per query (once in isSupported, once in convert — pattern from IcebergConvertProvider). This PR adds detectRenameOrDrop inside plan(), which iterates all historical schema versions via table.schemas().values(). For long-lived tables this loading + comparison now happens twice per scan node. Consider caching the plan result (e.g., via a TreeNodeTag on the BatchScanExec) to avoid this.
There was a problem hiding this comment.
Added TreeNodeTag caching on the BatchScanExec. plan(exec) now reuses the cached result on the second call
|
|
||
| def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = { | ||
| val table = asBatchQueryScan(scan).table() | ||
| val currentFields = collectFieldIdToName(table.schema()) |
There was a problem hiding this comment.
Two observations on detectRenameOrDrop:
-
table.schemas().values()includes the current schema. When compared againstcurrentFields(built fromtable.schema()), every field matches itself — the entire iteration is a no-op. Consider filtering it out:table.schemas().asScala.filterNot(_._1 == table.schema().schemaId()). -
collectFieldIdToNamehand-rolls a recursive field-ID collector. Iceberg providesTypeUtil.indexById(schema.asStruct())which returnsMap<Integer, NestedField>covering all nested fields. The only extra piece is thetopLevelflag, which can be derived trivially fromschema.columns(). Using the Iceberg utility would reduce maintenance surface and benefit from upstream fixes for new type variants.
There was a problem hiding this comment.
Addressed both points:
- Skipped the current schema.
- Replaced the local recursive collector with
TypeUtil.indexById(schema.asStruct()).
| fileSchema.fields.forall(field => fieldIdsByName.contains(field.name)), | ||
| "Failed to find field ids for all Iceberg data columns.") | ||
|
|
||
| val partitions = inputPartitions(exec) |
There was a problem hiding this comment.
Nit: the assertion message "Failed to find field ids for all Iceberg data columns." doesn't include which columns are missing, making debugging harder. Consider:
val missing = fileSchema.fields.filterNot(f => fieldIdsByName.contains(f.name)).map(_.name)
assert(missing.isEmpty, s"Missing Iceberg field ids for columns: ${missing.mkString(", ")}")There was a problem hiding this comment.
Updated the assertion to be more explicit.
…berg_rename # Conflicts: # thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
lyne7-sc
left a comment
There was a problem hiding this comment.
@SteNicholas Thanks for the detailed review. I’ve addressed the comments and updated the PR. Please take another look when you get a chance.
| .metadata() | ||
| .get(PARQUET_FIELD_ID_META_KEY) | ||
| .is_some_and(|file_field_id| file_field_id == table_field_id), | ||
| None => table_field.name().eq_ignore_ascii_case(file_field.name()), |
There was a problem hiding this comment.
Updated fields_match to use a nested match. When file_field lacks a field id, it now falls back to case-insensitive name matching.
|
|
||
| def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = { | ||
| val table = asBatchQueryScan(scan).table() | ||
| val currentFields = collectFieldIdToName(table.schema()) |
There was a problem hiding this comment.
Addressed both points:
- Skipped the current schema.
- Replaced the local recursive collector with
TypeUtil.indexById(schema.asStruct()).
| @@ -75,6 +76,27 @@ object IcebergScanSupport extends Logging { | |||
| partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), | |||
| "Has unsupported schema type.") | |||
|
|
|||
There was a problem hiding this comment.
Added TreeNodeTag caching on the BatchScanExec. plan(exec) now reuses the cached result on the second call
| fileSchema.fields.forall(field => fieldIdsByName.contains(field.name)), | ||
| "Failed to find field ids for all Iceberg data columns.") | ||
|
|
||
| val partitions = inputPartitions(exec) |
There was a problem hiding this comment.
Updated the assertion to be more explicit.
| @@ -75,6 +76,27 @@ object IcebergScanSupport extends Logging { | |||
| partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), | |||
| "Has unsupported schema type.") | |||
|
|
|||
There was a problem hiding this comment.
Split this into two independent inspection steps.
expectedFieldIds failure returns None because field-id mapping is required for safe native planning.
detectRenameOrDrop failure returns None because rename/drop safety cannot be determined reliably.
This avoids reporting a misleading nested rename/drop fallback reason when the actual issue is schema-history inspection failure.
Which issue does this PR close?
Closes #2321
Rationale for this change
The native Iceberg scan matches data-file columns by name, but Iceberg tracks them by field-id. After a column rename, old files read as all-NULL; after a drop-then-add of the same name, the new column reads the old column's data.
What changes are included in this PR?
Resolve columns by Iceberg field-id instead of by name:
field_idtoField.AuronIcebergSourceUtil,IcebergScanSupport,NativeConverters): extract top-levelname → field-idfrom the scan'sexpectedSchema()and serialize it into the plan.auron-planner,scan/mod.rs): stamp the id into Arrow field metadata (PARQUET:field_id);fields_matchmatches by id when present, else falls back to case-insensitive name matching (non-Iceberg scans unchanged).Nested-struct evolution and ORC rename/drop fall back to Spark, additive evolution stays native.
Are there any user-facing changes?
Yes. Iceberg queries on renamed or drop-then-added columns now return correct results under the native scan. Unsupported cases fall back to Spark. No API change.
How was this patch tested?
Added cases to
AuronIcebergIntegrationSuite