diff --git a/CMakeLists.txt b/CMakeLists.txt index e7281fb11..675e8e98f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,7 +24,7 @@ endif() list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules") project(Iceberg - VERSION 0.3.0 + VERSION 0.3.1 DESCRIPTION "Iceberg C++ Project" LANGUAGES CXX) diff --git a/conanfile.py b/conanfile.py index f5f280294..a0a650566 100644 --- a/conanfile.py +++ b/conanfile.py @@ -30,7 +30,7 @@ class IcebergCppConan(ConanFile): name = "iceberg-cpp" description = "Apache Iceberg C++ client library" - license = "Apache-2.0" f + license = "Apache-2.0" homepage = "https://github.com/redpanda-data/iceberg-cpp" url = "https://github.com/redpanda-data/iceberg-cpp" package_type = "static-library" diff --git a/src/iceberg/catalog/rest/auth/auth_manager.cc b/src/iceberg/catalog/rest/auth/auth_manager.cc index 47370bd3b..7a50f3281 100644 --- a/src/iceberg/catalog/rest/auth/auth_manager.cc +++ b/src/iceberg/catalog/rest/auth/auth_manager.cc @@ -128,9 +128,9 @@ class OAuth2Manager : public AuthManager { if (init_token_response_.has_value()) { auto token_response = std::move(*init_token_response_); init_token_response_.reset(); - return AuthSession::MakeOAuth2(token_response, config.oauth2_server_uri(), - config.client_id(), config.client_secret(), - config.scope(), client); + return AuthSession::MakeOAuth2( + token_response, config.oauth2_server_uri(), config.client_id(), + config.client_secret(), config.scope(), client, config.expiry_margin_seconds()); } // If token is provided, use it directly. @@ -143,10 +143,9 @@ class OAuth2Manager : public AuthManager { auto base_session = AuthSession::MakeDefault(AuthHeaders(config.token())); OAuthTokenResponse token_response; ICEBERG_ASSIGN_OR_RAISE(token_response, FetchToken(client, *base_session, config)); - // TODO(lishuxu): should we directly pass config to the MakeOAuth2 call? - return AuthSession::MakeOAuth2(token_response, config.oauth2_server_uri(), - config.client_id(), config.client_secret(), - config.scope(), client); + return AuthSession::MakeOAuth2( + token_response, config.oauth2_server_uri(), config.client_id(), + config.client_secret(), config.scope(), client, config.expiry_margin_seconds()); } return AuthSession::MakeDefault({}); diff --git a/src/iceberg/catalog/rest/auth/auth_properties.h b/src/iceberg/catalog/rest/auth/auth_properties.h index 05a7ea2c6..f6a20baa3 100644 --- a/src/iceberg/catalog/rest/auth/auth_properties.h +++ b/src/iceberg/catalog/rest/auth/auth_properties.h @@ -70,6 +70,8 @@ class ICEBERG_REST_EXPORT AuthProperties : public ConfigBase { inline static Entry kExchangeEnabled{"token-exchange-enabled", true}; inline static Entry kAudience{"audience", ""}; inline static Entry kResource{"resource", ""}; + inline static Entry kExpiryMarginSeconds{"oauth2.token-refresh-margin-seconds", + 300}; /// \brief Build an AuthProperties from a properties map. static Result FromProperties( @@ -87,6 +89,8 @@ class ICEBERG_REST_EXPORT AuthProperties : public ConfigBase { bool keep_refreshed() const { return Get(kKeepRefreshed); } /// \brief Whether token exchange is enabled. bool exchange_enabled() const { return Get(kExchangeEnabled); } + /// \brief Token expiry safety margin in seconds. + int64_t expiry_margin_seconds() const { return Get(kExpiryMarginSeconds); } /// \brief Parsed client_id from credential (empty if no colon). const std::string& client_id() const { return client_id_; } diff --git a/src/iceberg/catalog/rest/auth/auth_session.cc b/src/iceberg/catalog/rest/auth/auth_session.cc index 7251dc4a9..490e009c5 100644 --- a/src/iceberg/catalog/rest/auth/auth_session.cc +++ b/src/iceberg/catalog/rest/auth/auth_session.cc @@ -19,9 +19,14 @@ #include "iceberg/catalog/rest/auth/auth_session.h" +#include +#include #include #include "iceberg/catalog/rest/auth/oauth2_util.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/catalog/rest/types.h" +#include "iceberg/util/macros.h" namespace iceberg::rest::auth { @@ -44,6 +49,101 @@ class DefaultAuthSession : public AuthSession { std::unordered_map headers_; }; +using SteadyClock = std::chrono::steady_clock; +using TimePoint = SteadyClock::time_point; + +/// \brief OAuth2 session with transparent token refresh. +/// +/// Thread-safe: multiple concurrent callers of Authenticate() are serialized +/// through a mutex. Only the first caller that detects an expired token +/// performs the refresh; the rest wait and reuse the result. +class OAuth2AuthSession : public AuthSession { + public: + OAuth2AuthSession(OAuthTokenResponse initial_token, std::string token_endpoint, + std::string client_id, std::string client_secret, std::string scope, + HttpClient& client, std::chrono::seconds expiry_margin) + : token_(std::move(initial_token)), + token_endpoint_(std::move(token_endpoint)), + client_id_(std::move(client_id)), + client_secret_(std::move(client_secret)), + scope_(std::move(scope)), + client_(client), + expiry_margin_(expiry_margin) { + UpdateExpiry(); + } + + Status Authenticate(std::unordered_map& headers) override { + std::lock_guard lock(mutex_); + if (IsExpired()) { + ICEBERG_RETURN_UNEXPECTED(DoRefresh()); + } + headers.insert_or_assign(std::string(kAuthorizationHeader), + std::string(kBearerPrefix) + token_.access_token); + return {}; + } + + private: + bool IsExpired() const { + if (expires_at_ == TimePoint{}) { + return false; // no expiry info → assume valid + } + return SteadyClock::now() >= expires_at_; + } + + void UpdateExpiry() { + if (token_.expires_in_secs.has_value() && *token_.expires_in_secs > 0) { + expires_at_ = SteadyClock::now() + std::chrono::seconds(*token_.expires_in_secs) - + expiry_margin_; + } else { + expires_at_ = {}; // no expiry + } + } + + /// Try refresh_token grant first, fall back to client_credentials. + Status DoRefresh() { + // Use a noop session for the refresh request itself to avoid recursion. + auto noop = AuthSession::MakeDefault({}); + + if (!token_.refresh_token.empty()) { + auto result = RefreshToken(client_, *noop, token_endpoint_, client_id_, + token_.refresh_token, scope_); + if (result.has_value()) { + token_ = std::move(*result); + UpdateExpiry(); + return {}; + } + // refresh_token grant failed, fall through to client_credentials + } + + if (!client_secret_.empty()) { + auto result = + FetchToken(client_, *noop, token_endpoint_, client_id_, client_secret_, scope_); + if (!result.has_value()) { + return AuthenticationFailed("Failed to refresh OAuth2 token: {}", + result.error().message); + } + token_ = std::move(*result); + UpdateExpiry(); + return {}; + } + + return AuthenticationFailed( + "Failed to refresh OAuth2 token: no refresh_token " + "or client_secret available"); + } + + std::mutex mutex_; + OAuthTokenResponse token_; + TimePoint expires_at_{}; + + const std::string token_endpoint_; + const std::string client_id_; + const std::string client_secret_; + const std::string scope_; + HttpClient& client_; + const std::chrono::seconds expiry_margin_; +}; + } // namespace std::shared_ptr AuthSession::MakeDefault( @@ -52,12 +152,12 @@ std::shared_ptr AuthSession::MakeDefault( } std::shared_ptr AuthSession::MakeOAuth2( - const OAuthTokenResponse& initial_token, const std::string& /*token_endpoint*/, - const std::string& /*client_id*/, const std::string& /*client_secret*/, - const std::string& /*scope*/, HttpClient& /*client*/) { - // TODO(lishuxu): Create OAuth2AuthSession with auto-refresh support. - return MakeDefault({{std::string(kAuthorizationHeader), - std::string(kBearerPrefix) + initial_token.access_token}}); + const OAuthTokenResponse& initial_token, const std::string& token_endpoint, + const std::string& client_id, const std::string& client_secret, + const std::string& scope, HttpClient& client, int64_t expiry_margin_seconds) { + return std::make_shared(initial_token, token_endpoint, client_id, + client_secret, scope, client, + std::chrono::seconds(expiry_margin_seconds)); } } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/auth_session.h b/src/iceberg/catalog/rest/auth/auth_session.h index 26b93877b..89f0c442e 100644 --- a/src/iceberg/catalog/rest/auth/auth_session.h +++ b/src/iceberg/catalog/rest/auth/auth_session.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -86,12 +87,10 @@ class ICEBERG_REST_EXPORT AuthSession { /// \param scope OAuth2 scope for refresh requests. /// \param client HTTP client for making refresh requests. /// \return A new session that manages token lifecycle automatically. - static std::shared_ptr MakeOAuth2(const OAuthTokenResponse& initial_token, - const std::string& token_endpoint, - const std::string& client_id, - const std::string& client_secret, - const std::string& scope, - HttpClient& client); + static std::shared_ptr MakeOAuth2( + const OAuthTokenResponse& initial_token, const std::string& token_endpoint, + const std::string& client_id, const std::string& client_secret, + const std::string& scope, HttpClient& client, int64_t expiry_margin_seconds = 300); }; } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.cc b/src/iceberg/catalog/rest/auth/oauth2_util.cc index 3d209d2bd..2a99b1ce2 100644 --- a/src/iceberg/catalog/rest/auth/oauth2_util.cc +++ b/src/iceberg/catalog/rest/auth/oauth2_util.cc @@ -74,4 +74,54 @@ Result FetchToken(HttpClient& client, AuthSession& session, return token_response; } +Result FetchToken(HttpClient& client, AuthSession& session, + const std::string& token_endpoint, + const std::string& client_id, + const std::string& client_secret, + const std::string& scope) { + std::unordered_map form_data{ + {std::string(kGrantType), std::string(kClientCredentials)}, + {std::string(kClientSecret), client_secret}, + }; + if (!client_id.empty()) { + form_data.emplace(std::string(kClientId), client_id); + } + if (!scope.empty()) { + form_data.emplace(std::string(kScope), scope); + } + + ICEBERG_ASSIGN_OR_RAISE(auto response, + client.PostForm(token_endpoint, form_data, /*headers=*/{}, + *DefaultErrorHandler::Instance(), session)); + + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + ICEBERG_ASSIGN_OR_RAISE(auto token_response, FromJson(json)); + ICEBERG_RETURN_UNEXPECTED(token_response.Validate()); + return token_response; +} + +Result RefreshToken(HttpClient& client, AuthSession& session, + const std::string& token_endpoint, + const std::string& client_id, + const std::string& refresh_token, + const std::string& scope) { + std::unordered_map form_data{ + {std::string(kGrantType), "refresh_token"}, + {"refresh_token", refresh_token}, + {std::string(kClientId), client_id}, + }; + if (!scope.empty()) { + form_data.emplace(std::string(kScope), scope); + } + + ICEBERG_ASSIGN_OR_RAISE(auto response, + client.PostForm(token_endpoint, form_data, /*headers=*/{}, + *DefaultErrorHandler::Instance(), session)); + + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + ICEBERG_ASSIGN_OR_RAISE(auto token_response, FromJson(json)); + ICEBERG_RETURN_UNEXPECTED(token_response.Validate()); + return token_response; +} + } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.h b/src/iceberg/catalog/rest/auth/oauth2_util.h index 39dd12964..063f8aa70 100644 --- a/src/iceberg/catalog/rest/auth/oauth2_util.h +++ b/src/iceberg/catalog/rest/auth/oauth2_util.h @@ -46,6 +46,35 @@ inline constexpr std::string_view kBearerPrefix = "Bearer "; ICEBERG_REST_EXPORT Result FetchToken( HttpClient& client, AuthSession& session, const AuthProperties& properties); +/// \brief Fetch an OAuth2 token using the client_credentials grant type with +/// explicit parameters (not an AuthProperties bundle). +/// +/// \param client HTTP client to use for the request. +/// \param session Auth session for the request headers. +/// \param token_endpoint Full URL of the OAuth2 token endpoint. +/// \param client_id OAuth2 client ID. +/// \param client_secret OAuth2 client secret. +/// \param scope OAuth2 scope to request. +/// \return The token response or an error. +ICEBERG_REST_EXPORT Result FetchToken( + HttpClient& client, AuthSession& session, const std::string& token_endpoint, + const std::string& client_id, const std::string& client_secret, + const std::string& scope); + +/// \brief Refresh an expired access token using a refresh_token grant. +/// +/// \param client HTTP client to use for the request. +/// \param session Auth session for the request headers. +/// \param token_endpoint Full URL of the OAuth2 token endpoint. +/// \param client_id OAuth2 client ID. +/// \param refresh_token The refresh token to use. +/// \param scope OAuth2 scope to request. +/// \return The refreshed token response or an error. +ICEBERG_REST_EXPORT Result RefreshToken( + HttpClient& client, AuthSession& session, const std::string& token_endpoint, + const std::string& client_id, const std::string& refresh_token, + const std::string& scope); + /// \brief Build auth headers from a token string. /// /// \param token Bearer token string (may be empty). diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index a68e9e61e..3d1352402 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -27,11 +27,19 @@ #include #include #include +#include #include +#include +#include #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/expression/literal.h" +#include "iceberg/metrics.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" #include "iceberg/schema_internal.h" +#include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -109,6 +117,8 @@ class ParquetWriter::Impl { ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_, std::move(arrow_writer_properties), &writer_)); + iceberg_schema_ = options.schema; + return {}; } @@ -133,6 +143,7 @@ class ParquetWriter::Impl { for (int i = 0; i < metadata->num_row_groups(); ++i) { split_offsets_.push_back(metadata->RowGroup(i)->file_offset()); } + PopulateMetrics(*metadata); writer_.reset(); ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell()); @@ -155,11 +166,107 @@ class ParquetWriter::Impl { std::vector split_offsets() const { return split_offsets_; } + const Metrics& metrics() const { return metrics_; } + private: + // Populate the Metrics struct from the Parquet file's FileMetaData. + // Column chunk stats are aggregated across row groups, and Parquet physical + // min/max bytes are decoded into Iceberg Literals via the single-value + // binary serialization format (which matches Parquet's physical encoding + // for primitive types). + void PopulateMetrics(const ::parquet::FileMetaData& metadata) { + metrics_ = {}; + metrics_.row_count = metadata.num_rows(); + + if (iceberg_schema_ == nullptr) { + return; + } + + const int num_columns = metadata.num_columns(); + std::vector field_ids(num_columns, -1); + std::vector> field_types(num_columns); + for (int c = 0; c < num_columns; ++c) { + const auto* col_desc = metadata.schema()->Column(c); + auto node = col_desc->schema_node(); + if (!node || node->field_id() < 0) continue; + const int32_t fid = node->field_id(); + auto field_result = iceberg_schema_->FindFieldById(fid); + if (!field_result.has_value() || !field_result->has_value()) continue; + const auto& type = field_result->value().get().type(); + if (!type || !type->is_primitive()) continue; + field_ids[c] = fid; + field_types[c] = std::static_pointer_cast(type); + } + + // Holders for running min/max bytes per field_id. + std::unordered_map min_bounds; + std::unordered_map max_bounds; + + for (int rg = 0; rg < metadata.num_row_groups(); ++rg) { + const auto row_group = metadata.RowGroup(rg); + for (int c = 0; c < num_columns; ++c) { + const int32_t fid = field_ids[c]; + if (fid < 0) continue; + const auto col_chunk = row_group->ColumnChunk(c); + metrics_.column_sizes[fid] += col_chunk->total_compressed_size(); + metrics_.value_counts[fid] += col_chunk->num_values(); + + if (!col_chunk->is_stats_set()) continue; + auto stats = col_chunk->statistics(); + if (!stats) continue; + if (stats->HasNullCount()) { + metrics_.null_value_counts[fid] += stats->null_count(); + } + if (stats->HasMinMax()) { + auto min_bytes = stats->EncodeMin(); + auto max_bytes = stats->EncodeMax(); + auto& cur_min = min_bounds[fid]; + auto& cur_max = max_bounds[fid]; + if (cur_min.empty() || min_bytes < cur_min) { + cur_min = std::move(min_bytes); + } + if (cur_max.empty() || max_bytes > cur_max) { + cur_max = std::move(max_bytes); + } + } + } + } + + for (auto& [fid, bytes] : min_bounds) { + const auto& type = field_types[FindColumnIndex(field_ids, fid)]; + auto lit = Literal::Deserialize( + std::span(reinterpret_cast(bytes.data()), + bytes.size()), + type); + if (lit.has_value()) { + metrics_.lower_bounds.emplace(fid, std::move(*lit)); + } + } + for (auto& [fid, bytes] : max_bounds) { + const auto& type = field_types[FindColumnIndex(field_ids, fid)]; + auto lit = Literal::Deserialize( + std::span(reinterpret_cast(bytes.data()), + bytes.size()), + type); + if (lit.has_value()) { + metrics_.upper_bounds.emplace(fid, std::move(*lit)); + } + } + } + + static int FindColumnIndex(const std::vector& field_ids, int32_t fid) { + for (size_t i = 0; i < field_ids.size(); ++i) { + if (field_ids[i] == fid) return static_cast(i); + } + return -1; + } + // TODO(gangwu): make memory pool configurable ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); // Schema to write from the Parquet file. std::shared_ptr<::arrow::Schema> arrow_schema_; + // Iceberg schema, retained for field_id/type lookup when populating metrics. + std::shared_ptr iceberg_schema_; // The output stream to write Parquet file. std::shared_ptr<::arrow::io::OutputStream> output_stream_; // Parquet file writer to write ArrowArray. @@ -168,6 +275,8 @@ class ParquetWriter::Impl { int64_t total_bytes_{0}; // Row group start offsets in the Parquet file. std::vector split_offsets_; + // Computed file metrics, filled in Close(). + Metrics metrics_; }; ParquetWriter::~ParquetWriter() = default; @@ -185,7 +294,7 @@ Result ParquetWriter::metrics() { if (!impl_->Closed()) { return Invalid("ParquetWriter is not closed"); } - return {}; + return impl_->metrics(); } Result ParquetWriter::length() { return impl_->length(); }