diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 2a359e509d0767..01bdf240968dce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -1150,20 +1150,46 @@ public long getCountFromSnapshot() throws UserException { return 0; } - Map summary = snapshot.summary(); - if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { + return getCountFromSummary(snapshot.summary(), sessionVariable.ignoreIcebergDanglingDelete); + } + + /** + * Decide whether the row count can be pushed down from an Iceberg snapshot + * summary, returning the count, or -1 if it cannot be pushed down (the + * caller then falls back to a normal scan). + * + *

A snapshot summary is NOT guaranteed to carry the {@code total-*} + * counters: snapshots produced by compaction/replace (and some writers) + * may omit {@code total-equality-deletes} / {@code total-position-deletes} + * / {@code total-records}. Previously this method called + * {@code summary.get(...).equals(...)} / {@code Long.parseLong(...)} + * directly on the map values, which threw a {@link NullPointerException} + * (e.g. "Cannot invoke String.equals because Map.get(...) is null") for + * {@code SELECT COUNT(*)} on such a table while {@code SELECT *} worked. + * When any required counter is absent we now fall back to a scan. + */ + @VisibleForTesting + public static long getCountFromSummary(Map summary, boolean ignoreDanglingDelete) { + String equalityDeletes = summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES); + String positionDeletes = summary.get(IcebergUtils.TOTAL_POSITION_DELETES); + String totalRecords = summary.get(IcebergUtils.TOTAL_RECORDS); + if (equalityDeletes == null || positionDeletes == null || totalRecords == null) { + // summary is missing a required counter, can not push down count + return -1; + } + if (!equalityDeletes.equals("0")) { // has equality delete files, can not push down count return -1; } - long deleteCount = Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES)); + long deleteCount = Long.parseLong(positionDeletes); if (deleteCount == 0) { // no delete files, can push down count directly - return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)); + return Long.parseLong(totalRecords); } - if (sessionVariable.ignoreIcebergDanglingDelete) { + if (ignoreDanglingDelete) { // has position delete files, if we ignore dangling delete, can push down count - return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) - deleteCount; + return Long.parseLong(totalRecords) - deleteCount; } else { // otherwise, can not push down count return -1; diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java new file mode 100644 index 00000000000000..213d888f2bc12d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.source; + +import org.apache.doris.datasource.iceberg.IcebergUtils; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +public class IcebergCountPushDownTest { + + private static Map summary(String equalityDeletes, String positionDeletes, String totalRecords) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + if (equalityDeletes != null) { + builder.put(IcebergUtils.TOTAL_EQUALITY_DELETES, equalityDeletes); + } + if (positionDeletes != null) { + builder.put(IcebergUtils.TOTAL_POSITION_DELETES, positionDeletes); + } + if (totalRecords != null) { + builder.put(IcebergUtils.TOTAL_RECORDS, totalRecords); + } + return builder.build(); + } + + @Test + public void testMissingCounterFallsBackToScan() { + // Snapshots written by compaction/replace (and some writers) may omit + // total-* counters. The pushdown previously NPE'd on the missing key; + // it must now fall back to a normal scan (return -1). + Assertions.assertEquals(-1L, IcebergScanNode.getCountFromSummary(summary(null, "0", "100"), false)); + Assertions.assertEquals(-1L, IcebergScanNode.getCountFromSummary(summary("0", null, "100"), false)); + Assertions.assertEquals(-1L, IcebergScanNode.getCountFromSummary(summary("0", "0", null), false)); + Assertions.assertEquals(-1L, IcebergScanNode.getCountFromSummary(Collections.emptyMap(), false)); + } + + @Test + public void testNoDeletesPushesDownTotalRecords() { + Assertions.assertEquals(100L, IcebergScanNode.getCountFromSummary(summary("0", "0", "100"), false)); + } + + @Test + public void testEqualityDeletesCannotPushDown() { + Assertions.assertEquals(-1L, IcebergScanNode.getCountFromSummary(summary("3", "0", "100"), false)); + } + + @Test + public void testPositionDeletesRespectIgnoreDangling() { + // ignoreDanglingDelete = true -> total-records minus position-deletes + Assertions.assertEquals(90L, IcebergScanNode.getCountFromSummary(summary("0", "10", "100"), true)); + // ignoreDanglingDelete = false -> cannot push down (fall back to scan) + Assertions.assertEquals(-1L, IcebergScanNode.getCountFromSummary(summary("0", "10", "100"), false)); + } +}