Skip to content

feat: metrics integration for commit and scan#701

Open
evindj wants to merge 4 commits into
apache:mainfrom
evindj:metrics_integration
Open

feat: metrics integration for commit and scan#701
evindj wants to merge 4 commits into
apache:mainfrom
evindj:metrics_integration

Conversation

@evindj

@evindj evindj commented Jun 7, 2026

Copy link
Copy Markdown
Contributor

This change implements part 2 of the core metrics. it covers RestCatalog spec, integration with commit and scan workflows. It leaves the logging metrics reporter out of scope, it will be added in a subsequent diff.
some other differences is that some of the http calls for the RestCatalogMetrics reporter is still synchronous a subsequent change will cover that.

@evindj evindj marked this pull request as draft June 7, 2026 00:12
@evindj evindj marked this pull request as ready for review June 13, 2026 18:07
@wgtmac wgtmac requested a review from Copilot June 18, 2026 15:58

namespace iceberg {

class MetricsReporter;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this forward declaration to iceberg/type_fwd.h?


RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
std::unique_ptr<HttpClient> client,
std::shared_ptr<HttpClient> client,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is required?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because MakeTableReporter() hands the same client to a per-table RestMetricsReporter that must keep posting metrics independently of the catalog's lifetime. alternatively we can make the table reporter own an httpClient but I feel like the burden of initialization could be high

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds core metrics reporting to Iceberg C++ commit and scan workflows, including wiring metrics reporters through catalogs/tables and emitting REST-spec-compatible reports.

Changes:

  • Emit CommitReport from Transaction::Commit() when a commit produces a new snapshot, and propagate table reporters into update operations.
  • Collect and emit ScanReport (with scan planning counters/timers) from DataTableScan::PlanFiles(), including new manifest/file skip counters.
  • Introduce RestMetricsReporter and catalog-level wiring (REST/SQL/InMemory) plus new end-to-end/unit tests for scan planning and REST reporter behavior.

Reviewed changes

Copilot reviewed 28 out of 29 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/iceberg/update/snapshot_update.h Adds per-snapshot-update reporter plumbing (ReportWith) and stores a reporter pointer.
src/iceberg/transaction.cc Adds commit timing/attempt metrics and emits CommitReport after successful snapshot-producing commits; propagates reporter to FastAppend in transactions.
src/iceberg/test/scan_planning_metrics_test.cc New end-to-end tests validating ScanReport emission and scan planning counters.
src/iceberg/test/rest_metrics_reporter_test.cc New tests asserting REST reporter suppresses HTTP failures.
src/iceberg/test/fast_append_test.cc Adds integration tests verifying CommitReport emission behavior for snapshot vs property-only commits.
src/iceberg/test/CMakeLists.txt Registers the new test sources in the build.
src/iceberg/table.h Extends Table::Make/StagedTable::Make to accept an optional reporter; adds reporter accessors and combining API.
src/iceberg/table.cc Stores reporter on tables; wires reporter + table name into scans and propagates reporter to FastAppend.
src/iceberg/table_scan.h Extends scan context and builder API to include table name + metrics reporter.
src/iceberg/table_scan.cc Collects scan planning metrics, attaches them to manifest planning, and emits ScanReport.
src/iceberg/manifest/manifest_reader.h Adds SkipCounter() hook to count per-entry skips during manifest reading.
src/iceberg/manifest/manifest_reader.cc Implements skip counting for partition/metrics/partition-set filtering.
src/iceberg/manifest/manifest_reader_internal.h Stores skip_counter_ in the reader implementation.
src/iceberg/manifest/manifest_group.h Adds ability to attach scan metrics to manifest planning.
src/iceberg/manifest/manifest_group.cc Increments scan counters for manifests/files and delete-file result accounting; wires reader skip counter.
src/iceberg/delete_file_index.h Adds optional scan metrics hook (non-owning) for delete-manifest accounting.
src/iceberg/delete_file_index.cc Counts scanned/skipped delete manifests and skipped delete files while building the delete index.
src/iceberg/catalog/sql/sql_catalog.h Stores a catalog-level reporter for tables loaded/created by SQL catalog.
src/iceberg/catalog/sql/sql_catalog.cc Loads configured reporter and passes it into created/loaded tables.
src/iceberg/catalog/rest/rest_metrics_reporter.h Introduces REST metrics reporter interface for POSTing reports to the REST endpoint.
src/iceberg/catalog/rest/rest_metrics_reporter.cc Implements JSON serialization + POST and suppresses failures (fire-and-forget).
src/iceberg/catalog/rest/rest_catalog.h Switches REST catalog HTTP client ownership to shared_ptr; adds per-table reporter builder.
src/iceberg/catalog/rest/rest_catalog.cc Loads configured reporter and optionally combines it with a per-table REST reporter when enabled/supported.
src/iceberg/catalog/rest/meson.build Adds new REST metrics reporter source to Meson build.
src/iceberg/catalog/rest/CMakeLists.txt Adds new REST metrics reporter source to CMake build.
src/iceberg/catalog/rest/catalog_properties.h Adds rest-metrics-reporting-enabled property.
src/iceberg/catalog/memory/in_memory_catalog.h Stores a catalog-level reporter for in-memory catalog tables.
src/iceberg/catalog/memory/in_memory_catalog.cc Loads configured reporter and passes it into created/loaded tables.
.gitignore Ignores additional build output directories.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/iceberg/table_scan.cc
Comment on lines +543 to +545
auto metrics_context = MetricsContext::Default();
std::shared_ptr<ScanMetrics> scan_metrics = ScanMetrics::Make(*metrics_context);
auto timed = scan_metrics->total_planning_duration->Start();
Comment on lines +50 to +67
Status RestMetricsReporter::Report(const MetricsReport& report) {
// Serialize the report variant to JSON.
Result<nlohmann::json> json_result = std::visit(
[](const auto& r) -> Result<nlohmann::json> { return ToJson(r); }, report);
if (!json_result) {
return {};
}

// Inject "report-type" required by the REST spec (not included in core ToJson).
auto& json = json_result.value();
json[kReportType] =
std::holds_alternative<ScanReport>(report) ? kScanReportType : kCommitReportType;

// POST to the metrics endpoint; suppress errors to match Java fire-and-forget behavior.
std::ignore = client_->Post(metrics_endpoint_, json.dump(), /*headers=*/{},
*DefaultErrorHandler::Instance(), *session_);
return {};
}
Comment on lines +227 to +238
CapturingReporter* g_capturing_reporter = nullptr;

void RegisterCapturingReporter() {
static std::once_flag flag;
std::call_once(flag, [] {
(void)MetricsReporters::Register(
"fast.append.test.reporter",
[](const auto&) -> Result<std::unique_ptr<MetricsReporter>> {
auto ptr = std::make_unique<CapturingReporter>();
g_capturing_reporter = ptr.get();
return ptr;
});
Comment thread src/iceberg/table_scan.cc Outdated
ScanReport report{
.table_name = context_.table_name,
.snapshot_id = snapshot->snapshot_id,
.filter = context_.filter,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sends the raw scan filter to metrics reporters. Java sanitizes the filter before reporting, so literal predicate values are not exposed. With the current code, a scan like email = 'x' posts that value to the REST metrics endpoint or any custom reporter.

/// \param reporter The metrics reporter to use.
/// \return Reference to this for method chaining.
auto& ReportWith(this auto& self, std::shared_ptr<MetricsReporter> reporter) {
static_cast<SnapshotUpdate&>(self).reporter_ = std::move(reporter);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stores the reporter on SnapshotUpdate, but nothing reads reporter_ when building the CommitReport; Transaction::Commit uses ctx_->table->reporter() instead. A caller that does fast_append->ReportWith(custom_reporter) will not receive a commit report unless the table already has that reporter. Java's SnapshotProducer.reportWith overrides the operation reporter.

Comment thread src/iceberg/table_scan.cc Outdated
const auto& schema_ptr = projected_schema.get();
std::vector<int32_t> projected_field_ids;
std::vector<std::string> projected_field_names;
for (const auto& field : schema_ptr->fields()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only reports top-level projected fields. For nested projections, Java uses TypeUtil.getProjectedIds(schema()) and includes nested child IDs/names. The C++ scan already uses GetProjectedIdsVisitor when resolving selected columns; the report should use the same recursive IDs plus FindColumnNameById.

Comment thread src/iceberg/manifest/manifest_group.cc Outdated
if (scan_metrics_) {
for (const auto& task : file_tasks) {
for (const auto& df : task->delete_files()) {
scan_metrics_->result_delete_files->Increment(1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These counters are updated per FileScanTask, so the same delete file is counted once for every data file it applies to. Java increments indexed/type delete counters once while building DeleteFileIndex. A global or partition delete file that matches N data files will inflate indexed_delete_files and the delete type counters by N here.

Comment thread src/iceberg/delete_file_index.cc Outdated
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
files.emplace_back(std::move(entry));
} else {
if (scan_metrics_) scan_metrics_->skipped_delete_files->Increment(1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This counts delete files older than min_sequence_number_ as skipped_delete_files. Java drops those files in the same sequence-number filter without counting them as skipped. Snapshots with old delete files will report extra skipped delete files.

auto it = props.find(std::string(kMetricsReporterImpl));
if (it != props.end() && !it->second.empty() &&
it->second != kMetricsReporterTypeNoop) {
if (auto r = MetricsReporters::Load(props); r.has_value()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsReporters::Load errors are silently ignored. If metrics-reporter-impl is misspelled or the factory fails, the REST catalog still initializes with no custom reporter and no diagnostic. Java fails catalog initialization for an invalid metrics reporter; this should propagate the load error.

auto it = properties_.find(std::string(kMetricsReporterImpl));
if (it != properties_.end() && !it->second.empty() &&
it->second != kMetricsReporterTypeNoop) {
if (auto r = MetricsReporters::Load(properties_); r.has_value()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here: MetricsReporters::Load errors are ignored, so an invalid metrics-reporter-impl silently disables reporting for the in-memory catalog. This should propagate the load failure.

Comment thread src/iceberg/catalog/sql/sql_catalog.cc Outdated
auto it = config_.props.find(std::string(kMetricsReporterImpl));
if (it != config_.props.end() && !it->second.empty() &&
it->second != kMetricsReporterTypeNoop) {
if (auto r = MetricsReporters::Load(config_.props); r.has_value()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here: MetricsReporters::Load errors are ignored, so an invalid metrics-reporter-impl silently disables reporting for the SQL catalog. This should propagate the load failure.

Comment thread src/iceberg/table.cc Outdated
Result<std::unique_ptr<DataTableScanBuilder>> Table::NewScan() const {
return DataTableScanBuilder::Make(metadata_, io_);
ICEBERG_ASSIGN_OR_RAISE(auto builder, DataTableScanBuilder::Make(metadata_, io_));
builder->TableName(identifier_.ToString());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the table identifier, not the fully qualified table name. Java reports BaseTable.name(), and REST builds that as catalog.namespace.table. Scan reports from two catalogs with the same identifier will collide and differ from Java REST payloads.

Comment thread src/iceberg/transaction.cc Outdated
const auto& snapshot = snapshot_result.value();
const auto op = snapshot->Operation();
CommitReport report{
.table_name = ctx_->table->name().ToString(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same table-name issue for commit reports: this uses TableIdentifier::ToString(), so the catalog name is missing. Java commit reports use the table's full name (catalog.namespace.table), which avoids collisions across catalogs.

@evindj evindj force-pushed the metrics_integration branch from 1b32f3d to 5d8344f Compare June 25, 2026 20:46
@evindj evindj requested a review from wgtmac June 30, 2026 06:52
auto it = props.find(std::string(kMetricsReporterImpl));
if (it != props.end() && !it->second.empty() &&
it->second != kMetricsReporterTypeNoop) {
if (auto r = MetricsReporters::Load(props); r.has_value()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The REST catalog loads the custom metrics reporter twice: once in the constructor here and again in RestCatalog::Make after construction. That means a valid reporter factory is constructed and initialized twice, and the first instance is immediately discarded. Reporter initialization can have side effects or allocate resources; Java loads and initializes the configured reporter once. The constructor load should be removed and the Make path should be the single source of truth.

auto path = paths_->Metrics(identifier);
if (path.has_value()) {
auto rest_reporter =
std::make_shared<RestMetricsReporter>(client_, *path, catalog_session_);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This RestMetricsReporter still uses the catalog-level auth session even after MakeTableFromLoadResult creates a table-scoped session for the table. Java wires the REST metrics reporter with the table REST client, so metrics POSTs use the same table-scoped credentials/headers as table operations. With a REST catalog that returns table-specific credentials, scan/commit metrics can be posted with the wrong auth and get silently dropped.

}
return {};
}
if (scan_metrics_) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scanned_data_manifests is incremented before the later ignore_deleted_ / ignore_existing_ manifest filters run. Java counts scanned data manifests only after those manifest-level filters, so a manifest skipped by either filter will be reported as scanned here.

if (ignore_deleted_) {
// only scan manifests that have entries other than deletes
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
if (scan_metrics_) scan_metrics_->skipped_data_files->Increment(1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a manifest-level skip, not a skipped data file. Java increments skippedDataManifests for ignoreDeleted manifest pruning, so a manifest containing only deleted entries should not add to skipped_data_files.

if (ignore_existing_) {
// only scan manifests that have entries other than existing
if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
if (scan_metrics_) scan_metrics_->skipped_data_files->Increment(1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same counter issue for ignoreExisting: this skips the whole manifest, so it should increment skipped_data_manifests instead of skipped_data_files.

.CaseSensitive(case_sensitive_)
.Select(std::move(columns));

if (scan_metrics_) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SkipCounter only covers filtering done inside ManifestReader. The later local filters in ReadEntries (ignore_existing_, data_file_evaluator, and manifest_entry_predicate_) still continue without incrementing skipped_data_files, unlike Java's entry-level filters wrapped with skippedDataFiles.

'resource_paths.cc',
'rest_catalog.cc',
'rest_file_io.cc',
'rest_metrics_reporter.cc',

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this adds RestMetricsReporter, the Meson install list should also include rest_metrics_reporter.h; otherwise CMake installs the new public header but Meson installs do not.

endpoint_test.cc
rest_file_io_test.cc
rest_json_serde_test.cc
rest_metrics_reporter_test.cc

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CMake now builds rest_metrics_reporter_test.cc, but src/iceberg/test/meson.build is not updated. Meson test runs will miss this coverage.

Comment thread src/iceberg/meson.build
'expression/projections.cc',
'expression/residual_evaluator.cc',
'expression/rewrite_not.cc',
'expression/sanitize_expression.cc',

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanitize_expression.h is a new public header, but src/iceberg/expression/meson.build does not install it. Meson installs will miss this API even though the source is built.

std::holds_alternative<ScanReport>(report) ? kScanReportType : kCommitReportType;

// POST to the metrics endpoint; suppress errors to match Java fire-and-forget behavior.
std::ignore = client_->Post(metrics_endpoint_, json.dump(), /*headers=*/{},

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report() must not throw, but json.dump() and the HTTP call can throw here. Since scan/commit callers ignore the returned status, this should catch exceptions locally and keep the fire-and-forget metrics path from unwinding user operations.

constexpr int64_t kNinetyDaysInHours = 90LL * 24LL;

std::string is_past = now > micros ? "ago" : "from-now";
int64_t diff = std::abs(now - micros);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This subtraction can overflow before std::abs when the sanitized timestamp is near the int64_t limits. It is an edge case, but using a saturating or unsigned distance would avoid undefined behavior in the sanitizer.

Comment thread src/iceberg/table_scan.cc
projected_field_names.reserve(projected_field_ids.size());
for (int32_t field_id : projected_field_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto field_name, schema_ptr->FindColumnNameById(field_id));
projected_field_names.emplace_back(field_name.value_or(""));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field_id comes from the projected schema, so failing to resolve its name would indicate a schema/cache bug. Emitting an empty field name would hide that invariant violation in the scan report; this should return an error instead of value_or("").

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants