diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc b/src/iceberg/arrow/s3/arrow_s3_file_io.cc index cffd95840..a7e98620e 100644 --- a/src/iceberg/arrow/s3/arrow_s3_file_io.cc +++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc @@ -18,9 +18,13 @@ */ #include +#include #include #include #include +#include +#include +#include #include #if ICEBERG_S3_ENABLED @@ -36,9 +40,10 @@ namespace iceberg::arrow { +#if ICEBERG_S3_ENABLED + namespace { -#if ICEBERG_S3_ENABLED const std::string* FindProperty( const std::unordered_map& properties, std::string_view key) { @@ -74,6 +79,24 @@ Status EnsureS3Initialized() { return {}; } +// Splits any URI scheme off `endpoint` into `options.scheme`, returning the bare +// host[:port] that Arrow's `endpoint_override` expects. +std::string SplitEndpointScheme(std::string_view endpoint, + ::arrow::fs::S3Options& options) { + if (const auto pos = endpoint.find("://"); pos != std::string_view::npos) { + options.scheme = std::string(endpoint.substr(0, pos)); + endpoint = endpoint.substr(pos + 3); + } + return std::string(endpoint); +} + +bool IsS3FileIOCredentialPrefix(std::string_view prefix) { + return prefix == "s3" || prefix.starts_with("s3://") || prefix.starts_with("s3a://") || + prefix.starts_with("s3n://"); +} + +} // namespace + /// \brief Configure S3Options from a properties map. /// /// \param properties The configuration properties map. @@ -100,7 +123,7 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( } // Configure region - if (const auto* region = FindProperty(properties, S3Properties::kRegion); + if (const auto* region = FindProperty(properties, S3Properties::kClientRegion); region != nullptr) { options.region = *region; } @@ -108,18 +131,13 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( // Configure endpoint (for MinIO, LocalStack, etc.) if (const auto* endpoint = FindProperty(properties, S3Properties::kEndpoint); endpoint != nullptr) { - options.endpoint_override = *endpoint; - } else { - // Fall back to AWS standard environment variables for endpoint override - const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3"); - if (s3_endpoint_env != nullptr) { - options.endpoint_override = s3_endpoint_env; - } else { - const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL"); - if (endpoint_env != nullptr) { - options.endpoint_override = endpoint_env; - } - } + options.endpoint_override = SplitEndpointScheme(*endpoint, options); + } else if (const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3"); + s3_endpoint_env != nullptr) { + options.endpoint_override = SplitEndpointScheme(s3_endpoint_env, options); + } else if (const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL"); + endpoint_env != nullptr) { + options.endpoint_override = SplitEndpointScheme(endpoint_env, options); } ICEBERG_ASSIGN_OR_RAISE(const auto path_style_access, @@ -128,11 +146,11 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( options.force_virtual_addressing = !*path_style_access; } - // Configure SSL + // Explicit `s3.ssl.enabled` overrides any endpoint-derived scheme. ICEBERG_ASSIGN_OR_RAISE(const auto ssl_enabled, ParseOptionalBool(properties, S3Properties::kSslEnabled)); - if (ssl_enabled.has_value() && !*ssl_enabled) { - options.scheme = "http"; + if (ssl_enabled.has_value()) { + options.scheme = *ssl_enabled ? "https" : "http"; } // Configure timeouts @@ -152,33 +170,160 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( return options; } -#endif -} // namespace +namespace { -Result> MakeS3FileIO( +Result> BuildArrowS3FileSystem( const std::unordered_map& properties) { -#if ICEBERG_S3_ENABLED ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized()); - - // Configure S3 options from properties (uses default credentials if empty) ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties)); ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options)); + return std::shared_ptr<::arrow::fs::FileSystem>(std::move(fs)); +} - return std::make_unique(std::move(fs)); -#else - return NotSupported("Arrow S3 support is not enabled"); -#endif +std::string CanonicalizeS3Scheme(std::string_view location) { + for (std::string_view scheme : {"s3a://", "s3n://", "oss://"}) { + if (location.starts_with(scheme)) { + return std::string("s3://").append(location.substr(scheme.size())); + } + } + return std::string(location); +} + +class ArrowS3FileIO final : public FileIO, public SupportsStorageCredentials { + public: + ArrowS3FileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs, + std::unordered_map default_properties) + : default_file_io_(std::move(arrow_fs)), + default_properties_(std::move(default_properties)) {} + + Result> NewInputFile(std::string file_location) override; + + Result> NewInputFile(std::string file_location, + size_t length) override; + + Result> NewOutputFile(std::string file_location) override; + + Status DeleteFile(const std::string& file_location) override; + + Status DeleteFiles(const std::vector& file_locations) override; + + Status SetStorageCredentials( + const std::vector& storage_credentials) override; + + const std::vector& credentials() const override { + return storage_credentials_; + } + + SupportsStorageCredentials* AsSupportsStorageCredentials() override { return this; } + + private: + ArrowFileSystemFileIO& FileIOForPath(std::string_view location); + + ArrowFileSystemFileIO default_file_io_; + std::unordered_map default_properties_; + std::vector storage_credentials_; + std::vector>> + file_io_by_prefix_; +}; + +Status ArrowS3FileIO::SetStorageCredentials( + const std::vector& storage_credentials) { + std::vector>> + file_io_by_prefix; + file_io_by_prefix.reserve(storage_credentials.size()); + // TODO(gangwu): Refresh vended credentials via credentials.uri before tokens expire. + for (const auto& credential : storage_credentials) { + ICEBERG_RETURN_UNEXPECTED(credential.Validate()); + if (!IsS3FileIOCredentialPrefix(credential.prefix)) { + return NotSupported( + "Storage credential prefix '{}' is unsupported by Arrow S3 FileIO", + credential.prefix); + } + auto properties = default_properties_; + for (const auto& [key, value] : credential.config) { + properties[key] = value; + } + ICEBERG_ASSIGN_OR_RAISE(auto fs, BuildArrowS3FileSystem(properties)); + file_io_by_prefix.emplace_back( + CanonicalizeS3Scheme(credential.prefix), + std::make_unique(std::move(fs))); + } + file_io_by_prefix_ = std::move(file_io_by_prefix); + storage_credentials_ = storage_credentials; + return {}; +} + +ArrowFileSystemFileIO& ArrowS3FileIO::FileIOForPath(std::string_view location) { + if (file_io_by_prefix_.empty()) { + return default_file_io_; + } + const std::string canonical = CanonicalizeS3Scheme(location); + ArrowFileSystemFileIO* best = &default_file_io_; + size_t best_len = 0; + for (const auto& [prefix, file_io] : file_io_by_prefix_) { + if (prefix.size() > best_len && canonical.starts_with(prefix)) { + best = file_io.get(); + best_len = prefix.size(); + } + } + return *best; +} + +Result> ArrowS3FileIO::NewInputFile( + std::string file_location) { + return FileIOForPath(file_location).NewInputFile(std::move(file_location)); +} + +Result> ArrowS3FileIO::NewInputFile(std::string file_location, + size_t length) { + return FileIOForPath(file_location).NewInputFile(std::move(file_location), length); +} + +Result> ArrowS3FileIO::NewOutputFile( + std::string file_location) { + return FileIOForPath(file_location).NewOutputFile(std::move(file_location)); +} + +Status ArrowS3FileIO::DeleteFile(const std::string& file_location) { + return FileIOForPath(file_location).DeleteFile(file_location); +} + +Status ArrowS3FileIO::DeleteFiles(const std::vector& file_locations) { + std::unordered_map> locations_by_io; + for (const auto& file_location : file_locations) { + locations_by_io[&FileIOForPath(file_location)].push_back(file_location); + } + for (auto& [file_io, locations] : locations_by_io) { + ICEBERG_RETURN_UNEXPECTED(file_io->DeleteFiles(locations)); + } + return {}; +} + +} // namespace + +Result> MakeS3FileIO( + const std::unordered_map& properties) { + // Uses default credentials if properties are empty. + ICEBERG_ASSIGN_OR_RAISE(auto fs, BuildArrowS3FileSystem(properties)); + return std::make_unique(std::move(fs), properties); } Status FinalizeS3() { -#if ICEBERG_S3_ENABLED auto status = ::arrow::fs::FinalizeS3(); ICEBERG_ARROW_RETURN_NOT_OK(status); return {}; +} + #else + +Result> MakeS3FileIO( + [[maybe_unused]] const std::unordered_map& properties) { return NotSupported("Arrow S3 support is not enabled"); -#endif } +Status FinalizeS3() { return NotSupported("Arrow S3 support is not enabled"); } + +#endif + } // namespace iceberg::arrow diff --git a/src/iceberg/arrow/s3/s3_properties.h b/src/iceberg/arrow/s3/s3_properties.h index 53657743d..7b76968a6 100644 --- a/src/iceberg/arrow/s3/s3_properties.h +++ b/src/iceberg/arrow/s3/s3_properties.h @@ -37,8 +37,8 @@ struct S3Properties { static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key"; /// AWS session token (for temporary credentials) static constexpr std::string_view kSessionToken = "s3.session-token"; - /// AWS region - static constexpr std::string_view kRegion = "s3.region"; + /// AWS region, standard Iceberg client property. + static constexpr std::string_view kClientRegion = "client.region"; /// Custom endpoint override (for MinIO, LocalStack, etc.) static constexpr std::string_view kEndpoint = "s3.endpoint"; /// Whether to use path-style access (needed for MinIO) diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index 49dbcdd4c..4dcfad399 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -71,6 +71,8 @@ constexpr std::string_view kSource = "source"; constexpr std::string_view kDestination = "destination"; constexpr std::string_view kMetadata = "metadata"; constexpr std::string_view kConfig = "config"; +constexpr std::string_view kStorageCredentials = "storage-credentials"; +constexpr std::string_view kPrefix = "prefix"; constexpr std::string_view kIdentifiers = "identifiers"; constexpr std::string_view kOverrides = "overrides"; constexpr std::string_view kDefaults = "defaults"; @@ -133,6 +135,23 @@ constexpr std::string_view kResidualFilter = "residual-filter"; constexpr std::string_view kMapKeys = "keys"; constexpr std::string_view kMapValues = "values"; +Result StorageCredentialToJson(const StorageCredential& credential) { + ICEBERG_RETURN_UNEXPECTED(credential.Validate()); + nlohmann::json json; + json[kPrefix] = credential.prefix; + json[kConfig] = credential.config; + return json; +} + +Result StorageCredentialFromJson(const nlohmann::json& json) { + StorageCredential credential; + ICEBERG_ASSIGN_OR_RAISE(credential.prefix, GetJsonValue(json, kPrefix)); + ICEBERG_ASSIGN_OR_RAISE(credential.config, + GetJsonValue(json, kConfig)); + ICEBERG_RETURN_UNEXPECTED(credential.Validate()); + return credential; +} + template Result> KeyValueMapFromJson(const nlohmann::json& json, std::string_view key) { @@ -695,6 +714,14 @@ Result ToJson(const LoadTableResult& result) { SetOptionalStringField(json, kMetadataLocation, result.metadata_location); ICEBERG_ASSIGN_OR_RAISE(json[kMetadata], ToJson(*result.metadata)); SetContainerField(json, kConfig, result.config); + if (!result.storage_credentials.empty()) { + nlohmann::json creds = nlohmann::json::array(); + for (const auto& cred : result.storage_credentials) { + ICEBERG_ASSIGN_OR_RAISE(auto entry, StorageCredentialToJson(cred)); + creds.push_back(std::move(entry)); + } + json[kStorageCredentials] = std::move(creds); + } return json; } @@ -707,6 +734,15 @@ Result LoadTableResultFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json)); ICEBERG_ASSIGN_OR_RAISE(result.config, GetJsonValueOrDefault(json, kConfig)); + if (auto it = json.find(kStorageCredentials); it != json.end() && !it->is_null()) { + if (!it->is_array()) { + return JsonParseError("Cannot parse storage credentials from non-array"); + } + for (const auto& entry : *it) { + ICEBERG_ASSIGN_OR_RAISE(auto cred, StorageCredentialFromJson(entry)); + result.storage_credentials.push_back(std::move(cred)); + } + } ICEBERG_RETURN_UNEXPECTED(result.Validate()); return result; } diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 6b479ee03..27de0befa 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -29,7 +29,6 @@ #include #include "iceberg/catalog/rest/auth/auth_managers.h" -#include "iceberg/catalog/rest/auth/auth_properties.h" #include "iceberg/catalog/rest/auth/auth_session.h" #include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/constant.h" @@ -51,6 +50,7 @@ #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/transaction.h" +#include "iceberg/util/formatter_internal.h" #include "iceberg/util/macros.h" namespace iceberg::rest { @@ -122,41 +122,6 @@ Result CaptureNoSuchNamespace(const auto& status) { return CaptureNoSuchObject(status, ErrorKind::kNoSuchNamespace); } -Status ValidateNoFileIOConfig( - const std::unordered_map& table_config) { - static const std::unordered_set kTableConfigHandledByAuthKeys = { - auth::AuthProperties::kAuthType, - auth::AuthProperties::kBasicUsername, - auth::AuthProperties::kBasicPassword, - auth::AuthProperties::kSigV4Enabled, - auth::AuthProperties::kSigV4DelegateAuthType, - auth::AuthProperties::kSigV4SigningRegion, - auth::AuthProperties::kSigV4SigningName, - auth::AuthProperties::kSigV4AccessKeyId, - auth::AuthProperties::kSigV4SecretAccessKey, - auth::AuthProperties::kSigV4SessionToken, - auth::AuthProperties::kToken.key(), - auth::AuthProperties::kCredential.key(), - auth::AuthProperties::kScope.key(), - auth::AuthProperties::kOAuth2ServerUri.key(), - auth::AuthProperties::kKeepRefreshed.key(), - auth::AuthProperties::kExchangeEnabled.key(), - auth::AuthProperties::kAudience.key(), - auth::AuthProperties::kResource.key(), - }; - - for (const auto& [key, _] : table_config) { - if (!kTableConfigHandledByAuthKeys.contains(key)) { - // Fail closed because unknown table config may be FileIO/storage config. - // TODO(gangwu): Build table-specific FileIO when REST storage - // credentials and table-level storage config are supported. - return NotImplemented( - "Table-specific FileIO is not implemented for table config key '{}'", key); - } - } - return {}; -} - Status CheckBoundTable(const TableIdentifier& requested, const TableIdentifier& bound) { if (requested == bound) { return {}; @@ -285,12 +250,14 @@ class RestCatalog::TableScopedCatalog final TableScopedCatalog(std::shared_ptr root, SessionContext context, TableIdentifier identifier, std::unordered_map table_config, - std::shared_ptr table_session) + std::shared_ptr table_session, + std::shared_ptr table_io) : root_(std::move(root)), context_(std::move(context)), identifier_(std::move(identifier)), table_config_(std::move(table_config)), - table_session_(std::move(table_session)) {} + table_session_(std::move(table_session)), + table_io_(std::move(table_io)) {} std::string_view name() const override { return root_->name(); } @@ -348,7 +315,7 @@ class RestCatalog::TableScopedCatalog final auto response, root_->UpdateTableInternal(identifier, requirements, updates, *table_session_)); return root_->MakeTableFromCommitResponse(identifier, std::move(response), context_, - table_config_, table_session_); + table_config_, table_session_, table_io_); } Result> StageCreateTable( @@ -395,6 +362,7 @@ class RestCatalog::TableScopedCatalog final TableIdentifier identifier_; std::unordered_map table_config_; std::shared_ptr table_session_; + std::shared_ptr table_io_; }; RestCatalog::~RestCatalog() { @@ -519,8 +487,12 @@ Result> RestCatalog::TableAuthSession( Result> RestCatalog::TableFileIO( const SessionContext& /*context*/, - const std::unordered_map& table_config) const { - ICEBERG_RETURN_UNEXPECTED(ValidateNoFileIOConfig(table_config)); + const std::unordered_map& table_config, + const std::vector& storage_credentials) const { + if (!table_config.empty() || !storage_credentials.empty()) { + return MakeTableFileIO(config_.configs(), table_config, storage_credentials); + } + return file_io_; } @@ -740,8 +712,9 @@ Result> RestCatalog::UpdateTable( ICEBERG_ASSIGN_OR_RAISE( auto table_session, TableAuthSession(identifier, table_config, std::move(contextual_session))); + // Top-level updates have no loaded table FileIO to reuse. return MakeTableFromCommitResponse(identifier, std::move(response), context, - table_config, std::move(table_session)); + table_config, std::move(table_session), file_io_); } Result> RestCatalog::StageCreateTable( @@ -756,12 +729,15 @@ Result> RestCatalog::StageCreateTable( CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/true, *contextual_session)); auto table_config = std::move(result.config); - ICEBERG_ASSIGN_OR_RAISE(auto table_io, TableFileIO(context, table_config)); + auto storage_credentials = std::move(result.storage_credentials); + ICEBERG_ASSIGN_OR_RAISE(auto table_io, + TableFileIO(context, table_config, storage_credentials)); ICEBERG_ASSIGN_OR_RAISE( auto table_session, TableAuthSession(identifier, table_config, std::move(contextual_session))); auto table_catalog = std::make_shared( - shared_from_this(), context, identifier, table_config, std::move(table_session)); + shared_from_this(), context, identifier, table_config, std::move(table_session), + table_io); ICEBERG_ASSIGN_OR_RAISE( auto staged_table, StagedTable::Make(identifier, std::move(result.metadata), @@ -869,12 +845,14 @@ Result> RestCatalog::MakeTableFromLoadResult( const SessionContext& context, std::shared_ptr contextual_session) { auto table_config = std::move(result.config); - ICEBERG_ASSIGN_OR_RAISE(auto table_io, TableFileIO(context, table_config)); + auto storage_credentials = std::move(result.storage_credentials); + ICEBERG_ASSIGN_OR_RAISE(auto table_io, + TableFileIO(context, table_config, storage_credentials)); ICEBERG_ASSIGN_OR_RAISE( auto table_session, TableAuthSession(identifier, table_config, std::move(contextual_session))); auto table_catalog = std::make_shared( - shared_from_this(), context, identifier, table_config, table_session); + shared_from_this(), context, identifier, table_config, table_session, table_io); return Table::Make(identifier, std::move(result.metadata), std::move(result.metadata_location), std::move(table_io), std::move(table_catalog)); @@ -884,13 +862,10 @@ Result> RestCatalog::MakeTableFromCommitResponse( const TableIdentifier& identifier, CommitTableResponse response, const SessionContext& context, const std::unordered_map& table_config, - std::shared_ptr table_session) { - // TODO(gangwu): If the REST commit response grows table config or - // storage credentials, derive a replacement table session/FileIO from that - // response. The current table commit response does not define config. - ICEBERG_ASSIGN_OR_RAISE(auto table_io, TableFileIO(context, table_config)); + std::shared_ptr table_session, std::shared_ptr table_io) { + // Reuse the bound FileIO because commit responses carry no config or credentials. auto table_catalog = std::make_shared( - shared_from_this(), context, identifier, table_config, table_session); + shared_from_this(), context, identifier, table_config, table_session, table_io); return Table::Make(identifier, std::move(response.metadata), std::move(response.metadata_location), std::move(table_io), std::move(table_catalog)); diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 76d2e54dc..97cf42151 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -31,6 +31,7 @@ #include "iceberg/catalog/session_catalog.h" #include "iceberg/catalog/session_context.h" #include "iceberg/result.h" +#include "iceberg/storage_credential.h" /// \file iceberg/catalog/rest/rest_catalog.h /// RestCatalog implementation for Iceberg REST API. @@ -79,7 +80,8 @@ class ICEBERG_REST_EXPORT RestCatalog final Result> TableFileIO( const SessionContext& context, - const std::unordered_map& table_config) const; + const std::unordered_map& table_config, + const std::vector& storage_credentials) const; Result> ListNamespaces(const Namespace& ns, auth::AuthSession& session) const; @@ -169,7 +171,7 @@ class ICEBERG_REST_EXPORT RestCatalog final const TableIdentifier& identifier, CommitTableResponse response, const SessionContext& context, const std::unordered_map& table_config, - std::shared_ptr table_session); + std::shared_ptr table_session, std::shared_ptr table_io); RestCatalogProperties config_; std::shared_ptr file_io_; diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc index 5fadca1ac..fe8a2b155 100644 --- a/src/iceberg/catalog/rest/rest_file_io.cc +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -20,7 +20,12 @@ #include "iceberg/catalog/rest/rest_file_io.h" #include +#include +#include +#include +#include "iceberg/catalog/rest/types.h" +#include "iceberg/file_io.h" #include "iceberg/file_io_registry.h" #include "iceberg/util/macros.h" @@ -33,6 +38,16 @@ bool IsBuiltinImpl(std::string_view io_impl) { io_impl == FileIORegistry::kArrowS3FileIO; } +std::unordered_map MergeFileIOProperties( + const std::unordered_map& catalog_config, + const std::unordered_map& table_config) { + auto properties = catalog_config; + for (const auto& [key, value] : table_config) { + properties[key] = value; + } + return properties; +} + } // namespace Result DetectBuiltinFileIO(std::string_view location) { @@ -92,4 +107,33 @@ Result> MakeCatalogFileIO(const RestCatalogProperties& c return FileIORegistry::Load(io_impl, config.configs()); } +Result> MakeTableFileIO( + const std::unordered_map& catalog_config, + const std::unordered_map& table_config, + const std::vector& storage_credentials) { + const auto default_properties = MergeFileIOProperties(catalog_config, table_config); + const auto properties = RestCatalogProperties::FromMap(default_properties); + auto io_impl = properties.Get(RestCatalogProperties::kIOImpl); + if (io_impl.empty()) { + const auto warehouse = properties.Get(RestCatalogProperties::kWarehouse); + if (warehouse.empty()) { + return InvalidArgument(R"("{}" or "{}" property is required to create FileIO)", + RestCatalogProperties::kIOImpl.key(), + RestCatalogProperties::kWarehouse.key()); + } + ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse)); + io_impl = std::string(BuiltinFileIOName(detected_kind)); + } + ICEBERG_ASSIGN_OR_RAISE(auto io, FileIORegistry::Load(io_impl, default_properties)); + + if (storage_credentials.empty()) { + return io; + } else if (auto* credentialed = io->AsSupportsStorageCredentials()) { + ICEBERG_RETURN_UNEXPECTED(credentialed->SetStorageCredentials(storage_credentials)); + } else { + return NotSupported("Configured FileIO does not support vended storage credentials"); + } + return io; +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_file_io.h b/src/iceberg/catalog/rest/rest_file_io.h index 68482521a..9e5f7a0a9 100644 --- a/src/iceberg/catalog/rest/rest_file_io.h +++ b/src/iceberg/catalog/rest/rest_file_io.h @@ -22,9 +22,12 @@ #include #include #include +#include +#include #include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io.h" #include "iceberg/file_io_registry.h" #include "iceberg/result.h" @@ -44,4 +47,10 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind); ICEBERG_REST_EXPORT Result> MakeCatalogFileIO( const RestCatalogProperties& config); +/// \brief Build the configured table FileIO and apply storage credentials if present. +ICEBERG_REST_EXPORT Result> MakeTableFileIO( + const std::unordered_map& catalog_config, + const std::unordered_map& table_config, + const std::vector& storage_credentials); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.cc b/src/iceberg/catalog/rest/types.cc index 8d96bccb2..84fba9a7c 100644 --- a/src/iceberg/catalog/rest/types.cc +++ b/src/iceberg/catalog/rest/types.cc @@ -86,7 +86,8 @@ bool CreateTableRequest::operator==(const CreateTableRequest& other) const { } bool LoadTableResult::operator==(const LoadTableResult& other) const { - if (metadata_location != other.metadata_location || config != other.config) { + if (metadata_location != other.metadata_location || config != other.config || + storage_credentials != other.storage_credentials) { return false; } diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 7849b366b..20a59fa59 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -30,6 +30,7 @@ #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/result.h" +#include "iceberg/storage_credential.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" #include "iceberg/util/macros.h" @@ -185,13 +186,17 @@ struct ICEBERG_REST_EXPORT LoadTableResult { std::string metadata_location; std::shared_ptr metadata; // required std::unordered_map config; - // TODO(Li Feiyang): Add std::shared_ptr storage_credential; + /// \brief Vended storage credentials, one per URI prefix; empty if none. + std::vector storage_credentials; /// \brief Validates the LoadTableResult. Status Validate() const { if (!metadata) { return ValidationFailed("Invalid metadata: null"); } + for (const auto& credential : storage_credentials) { + ICEBERG_RETURN_UNEXPECTED(credential.Validate()); + } return {}; } @@ -332,7 +337,7 @@ struct ICEBERG_REST_EXPORT PlanTableScanResponse { PlanStatus plan_status = PlanStatus::kCompleted; std::string plan_id; std::optional error; - // TODO(sandeepg): Add credentials. + // TODO(sandeepg): Add storage credentials and bind scan FileIO to them. Status Validate() const; @@ -347,7 +352,7 @@ struct ICEBERG_REST_EXPORT FetchPlanningResultResponse { std::vector> delete_files; PlanStatus plan_status = PlanStatus::kCompleted; std::optional error; - // TODO(sandeepg): Add credentials. + // TODO(sandeepg): Add storage credentials and bind scan FileIO to them. Status Validate() const; diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index ba6f0129a..7d7b31113 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -30,9 +30,12 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/storage_credential.h" namespace iceberg { +class SupportsStorageCredentials; + /// \brief Seekable byte stream for reading file contents. class ICEBERG_EXPORT SeekableInputStream { public: @@ -171,6 +174,24 @@ class ICEBERG_EXPORT FileIO { /// \param file_locations The locations of the files to delete. /// \return void if all deletes succeed, or an error code if any delete fails. virtual Status DeleteFiles(const std::vector& file_locations); + + /// \brief Return storage-credential support when implemented by this FileIO. + virtual SupportsStorageCredentials* AsSupportsStorageCredentials() { return nullptr; } +}; + +/// \brief Mix-in for FileIO implementations that route object paths to +/// per-prefix file systems built from vended storage credentials, letting the +/// catalog stay decoupled from concrete storage implementations. +class ICEBERG_EXPORT SupportsStorageCredentials { + public: + virtual ~SupportsStorageCredentials() = default; + + /// \brief Install vended storage credentials. + virtual Status SetStorageCredentials( + const std::vector& storage_credentials) = 0; + + /// \brief Return currently installed storage credentials. + virtual const std::vector& credentials() const = 0; }; } // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 71a4498f1..1fa15fa12 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -283,6 +283,7 @@ install_headers( 'sort_field.h', 'sort_order.h', 'statistics_file.h', + 'storage_credential.h', 'table.h', 'table_identifier.h', 'table_metadata.h', diff --git a/src/iceberg/storage_credential.h b/src/iceberg/storage_credential.h new file mode 100644 index 000000000..604e8ddce --- /dev/null +++ b/src/iceberg/storage_credential.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief A storage credential vended for a storage location prefix. +struct ICEBERG_EXPORT StorageCredential { + std::string prefix; + std::unordered_map config; + + Status Validate() const { + if (prefix.empty()) { + return ValidationFailed("Invalid storage credential: prefix must be non-empty"); + } + if (config.empty()) { + return ValidationFailed("Invalid storage credential: config must be non-empty"); + } + return {}; + } + + bool operator==(const StorageCredential& other) const = default; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc index b1caff1e8..701a8f0d9 100644 --- a/src/iceberg/test/arrow_s3_file_io_test.cc +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -21,14 +21,25 @@ #include #include #include +#include #include +#include +#include #include #include +#if ICEBERG_S3_ENABLED +# include +#endif + #include "iceberg/arrow/arrow_io_util.h" #include "iceberg/arrow/s3/s3_properties.h" +#include "iceberg/file_io.h" +#include "iceberg/result.h" +#include "iceberg/storage_credential.h" #include "iceberg/test/matchers.h" +#include "iceberg/util/macros.h" namespace { @@ -62,20 +73,38 @@ std::unordered_map PropertiesFromEnv() { properties[std::string(iceberg::arrow::S3Properties::kEndpoint)] = *endpoint; } if (const auto region = GetEnvIfSet("AWS_REGION")) { - properties[std::string(iceberg::arrow::S3Properties::kRegion)] = *region; + properties[std::string(iceberg::arrow::S3Properties::kClientRegion)] = *region; } return properties; } +std::unordered_map BadS3Credentials() { + return { + {std::string(iceberg::arrow::S3Properties::kAccessKeyId), "bad-access-key"}, + {std::string(iceberg::arrow::S3Properties::kSecretAccessKey), "bad-secret-key"}}; +} + } // namespace namespace iceberg::arrow { +#if ICEBERG_S3_ENABLED +Result<::arrow::fs::S3Options> ConfigureS3Options( + const std::unordered_map& properties); +#endif + namespace { class ArrowS3FileIOTest : public ::testing::Test { protected: +#if ICEBERG_S3_ENABLED + static void SetUpTestSuite() { + auto io = MakeS3FileIO({}); + ASSERT_THAT(io, IsOk()); + } +#endif + static void TearDownTestSuite() { auto status = FinalizeS3(); if (!status.has_value()) { @@ -89,27 +118,56 @@ class ArrowS3FileIOTest : public ::testing::Test { return MakeObjectUri(*base_uri_, object_name); } - void RequireIntegrationEnv() const { - if (!base_uri_.has_value()) { - GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; - } - } + const std::string& BaseUri() const { return *base_uri_; } + + bool HasIntegrationEnv() const { return base_uri_.has_value(); } private: std::optional base_uri_; }; +Status CheckReadWrite(FileIO& io, const std::string& object_uri, + std::string_view content) { + ICEBERG_RETURN_UNEXPECTED(io.WriteFile(object_uri, content)); + ICEBERG_ASSIGN_OR_RAISE(auto read, io.ReadFile(object_uri, std::nullopt)); + EXPECT_EQ(read, std::string(content)); + return io.DeleteFile(object_uri); +} + } // namespace -TEST_F(ArrowS3FileIOTest, CreateWithDefaultProperties) { +TEST_F(ArrowS3FileIOTest, Create) { auto result = MakeS3FileIO({}); ASSERT_THAT(result, IsOk()); EXPECT_NE(result.value(), nullptr); } -TEST_F(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) { - auto result = MakeS3FileIO(); +TEST_F(ArrowS3FileIOTest, StoresCredentials) { + auto result = MakeS3FileIO({}); + ASSERT_THAT(result, IsOk()); + auto* credentialed = result.value()->AsSupportsStorageCredentials(); + ASSERT_NE(credentialed, nullptr); + + std::vector credentials = { + {.prefix = "s3://bucket/table", + .config = {{std::string(S3Properties::kAccessKeyId), "access-key"}, + {std::string(S3Properties::kSecretAccessKey), "secret"}}}}; + EXPECT_THAT(credentialed->SetStorageCredentials(credentials), IsOk()); + EXPECT_EQ(credentialed->credentials(), credentials); +} + +TEST_F(ArrowS3FileIOTest, RejectsCredentialPrefix) { + auto result = MakeS3FileIO({}); ASSERT_THAT(result, IsOk()); + auto* credentialed = result.value()->AsSupportsStorageCredentials(); + ASSERT_NE(credentialed, nullptr); + + auto status = credentialed->SetStorageCredentials( + {{.prefix = "gs://bucket/table", + .config = {{std::string(S3Properties::kAccessKeyId), "access-key"}, + {std::string(S3Properties::kSecretAccessKey), "secret"}}}}); + EXPECT_THAT(status, IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(status, HasErrorMessage("unsupported by Arrow S3 FileIO")); } TEST_F(ArrowS3FileIOTest, RejectsIncompleteStaticCredentials) { @@ -126,59 +184,122 @@ TEST_F(ArrowS3FileIOTest, RejectsInvalidBooleanProperties) { EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); } -TEST_F(ArrowS3FileIOTest, ReadWriteFile) { - RequireIntegrationEnv(); +TEST_F(ArrowS3FileIOTest, ReadWrite) { + if (!HasIntegrationEnv()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } auto io_res = MakeS3FileIO(); ASSERT_THAT(io_res, IsOk()); auto io = std::move(io_res).value(); auto object_uri = ObjectUri("iceberg_s3_io_test.txt"); - auto write_res = io->WriteFile(object_uri, "hello s3"); - ASSERT_THAT(write_res, IsOk()); - - auto read_res = io->ReadFile(object_uri, std::nullopt); - ASSERT_THAT(read_res, IsOk()); - EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3"))); - - auto del_res = io->DeleteFile(object_uri); - EXPECT_THAT(del_res, IsOk()); + EXPECT_THAT(CheckReadWrite(*io, object_uri, "hello s3"), IsOk()); } -TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithProperties) { - RequireIntegrationEnv(); +TEST_F(ArrowS3FileIOTest, ReadWriteWithProperties) { + if (!HasIntegrationEnv()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } auto io_res = MakeS3FileIO(PropertiesFromEnv()); ASSERT_THAT(io_res, IsOk()); auto io = std::move(io_res).value(); auto object_uri = ObjectUri("iceberg_s3_io_props_test.txt"); - auto write_res = io->WriteFile(object_uri, "hello s3 with properties"); - ASSERT_THAT(write_res, IsOk()); - - auto read_res = io->ReadFile(object_uri, std::nullopt); - ASSERT_THAT(read_res, IsOk()); - EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3 with properties"))); - - auto del_res = io->DeleteFile(object_uri); - EXPECT_THAT(del_res, IsOk()); + EXPECT_THAT(CheckReadWrite(*io, object_uri, "hello s3 with properties"), IsOk()); } -TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithSslDisabled) { - RequireIntegrationEnv(); - std::unordered_map properties; - properties[std::string(S3Properties::kSslEnabled)] = "false"; +TEST_F(ArrowS3FileIOTest, LongestCredentialPrefix) { + if (!HasIntegrationEnv()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + + auto properties = PropertiesFromEnv(); + if (properties.empty()) { + GTEST_SKIP() << "Set S3 properties to enable credential routing test"; + } auto io_res = MakeS3FileIO(properties); ASSERT_THAT(io_res, IsOk()); + auto io = std::move(io_res).value(); + auto* credentialed = io->AsSupportsStorageCredentials(); + ASSERT_NE(credentialed, nullptr); + + constexpr std::string_view object_name = "iceberg_s3_io_prefix_test.txt"; + auto object_uri = ObjectUri(object_name); + const auto partial_prefix = + object_uri.substr(0, object_uri.size() - object_name.size() + 3); + + auto bad_properties = BadS3Credentials(); + EXPECT_THAT(credentialed->SetStorageCredentials( + {{.prefix = BaseUri(), .config = std::move(bad_properties)}, + {.prefix = partial_prefix, .config = properties}}), + IsOk()); + EXPECT_THAT(CheckReadWrite(*io, object_uri, "hello s3 with vended credentials"), + IsOk()); } -TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithTimeouts) { - RequireIntegrationEnv(); - std::unordered_map properties; - properties[std::string(S3Properties::kConnectTimeoutMs)] = "5000"; - properties[std::string(S3Properties::kSocketTimeoutMs)] = "10000"; +#if ICEBERG_S3_ENABLED +TEST_F(ArrowS3FileIOTest, ClientRegion) { + auto result = + ConfigureS3Options({{std::string(S3Properties::kClientRegion), "us-east-1"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->region, "us-east-1"); +} - auto io_res = MakeS3FileIO(properties); - ASSERT_THAT(io_res, IsOk()); +TEST_F(ArrowS3FileIOTest, EndpointScheme) { + struct Case { + std::string_view endpoint; + std::string_view endpoint_override; + std::string_view scheme; + }; + const std::vector cases = {{"https://oss-cn-hangzhou.aliyuncs.com:443", + "oss-cn-hangzhou.aliyuncs.com:443", "https"}, + {"http://localhost:9000", "localhost:9000", "http"}, + {"localhost:9000", "localhost:9000", "https"}}; + + for (const auto& test_case : cases) { + auto result = ConfigureS3Options( + {{std::string(S3Properties::kEndpoint), std::string(test_case.endpoint)}}); + ASSERT_THAT(result, IsOk()) << test_case.endpoint; + EXPECT_EQ(result->endpoint_override, test_case.endpoint_override); + EXPECT_EQ(result->scheme, test_case.scheme); + } +} + +TEST_F(ArrowS3FileIOTest, SslEnabled) { + auto https = + ConfigureS3Options({{std::string(S3Properties::kEndpoint), "http://localhost:9000"}, + {std::string(S3Properties::kSslEnabled), "true"}}); + ASSERT_THAT(https, IsOk()); + EXPECT_EQ(https->scheme, "https"); + + auto http = ConfigureS3Options( + {{std::string(S3Properties::kEndpoint), "https://localhost:9000"}, + {std::string(S3Properties::kSslEnabled), "false"}}); + ASSERT_THAT(http, IsOk()); + EXPECT_EQ(http->scheme, "http"); +} + +TEST_F(ArrowS3FileIOTest, PathStyleAccess) { + auto virtual_addressing = + ConfigureS3Options({{std::string(S3Properties::kPathStyleAccess), "false"}}); + ASSERT_THAT(virtual_addressing, IsOk()); + EXPECT_TRUE(virtual_addressing->force_virtual_addressing); + + auto path_style = + ConfigureS3Options({{std::string(S3Properties::kPathStyleAccess), "true"}}); + ASSERT_THAT(path_style, IsOk()); + EXPECT_FALSE(path_style->force_virtual_addressing); +} + +TEST_F(ArrowS3FileIOTest, Timeouts) { + auto result = + ConfigureS3Options({{std::string(S3Properties::kConnectTimeoutMs), "5000"}, + {std::string(S3Properties::kSocketTimeoutMs), "10000"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->connect_timeout, 5); + EXPECT_EQ(result->request_timeout, 10); } +#endif } // namespace iceberg::arrow diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc index b1193d9f8..7f41b0b46 100644 --- a/src/iceberg/test/rest_file_io_test.cc +++ b/src/iceberg/test/rest_file_io_test.cc @@ -19,9 +19,14 @@ #include "iceberg/catalog/rest/rest_file_io.h" +#include +#include +#include + #include #include +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io_registry.h" #include "iceberg/test/matchers.h" @@ -44,6 +49,24 @@ class MockFileIO : public FileIO { Status DeleteFile(const std::string& /*file_location*/) override { return {}; } }; +std::vector captured_storage_credentials; +std::unordered_map captured_file_io_properties; + +class MockCredentialedFileIO : public MockFileIO, public SupportsStorageCredentials { + public: + Status SetStorageCredentials( + const std::vector& credentials) override { + captured_storage_credentials = credentials; + return {}; + } + + const std::vector& credentials() const override { + return captured_storage_credentials; + } + + SupportsStorageCredentials* AsSupportsStorageCredentials() override { return this; } +}; + } // namespace TEST(RestFileIOTest, DetectBuiltinKindFromScheme) { @@ -147,4 +170,78 @@ TEST(RestFileIOTest, MakeCatalogFileIOSkipsCheckWhenWarehouseAbsent) { ASSERT_THAT(result, IsOk()); } +TEST(RestFileIOTest, TableFileIOMergesConfigAndCredentials) { + const std::string custom_impl = "com.mycompany.CredentialedFileIO"; + captured_file_io_properties.clear(); + captured_storage_credentials.clear(); + FileIORegistry::Register( + custom_impl, + [](const std::unordered_map& properties) + -> Result> { + captured_file_io_properties = properties; + return std::make_unique(); + }); + + auto result = MakeTableFileIO( + {{"warehouse", "s3://catalog/warehouse"}, + {"catalog-only", "catalog"}, + {"shared", "catalog"}}, + {{"io-impl", custom_impl}, {"table-only", "table"}, {"shared", "table"}}, + {{.prefix = "s3://bucket/table", + .config = {{"shared", "credential"}, {"credential-only", "value"}}}}); + ASSERT_THAT(result, IsOk()); + auto* credentialed = result.value()->AsSupportsStorageCredentials(); + ASSERT_NE(credentialed, nullptr); + + EXPECT_THAT( + captured_file_io_properties, + ::testing::UnorderedElementsAre( + ::testing::Pair("warehouse", "s3://catalog/warehouse"), + ::testing::Pair("catalog-only", "catalog"), + ::testing::Pair("io-impl", custom_impl), ::testing::Pair("table-only", "table"), + ::testing::Pair("shared", "table"))); + ASSERT_EQ(captured_storage_credentials.size(), 1); + EXPECT_EQ(captured_storage_credentials[0].prefix, "s3://bucket/table"); + EXPECT_THAT(captured_storage_credentials[0].config, + ::testing::UnorderedElementsAre(::testing::Pair("credential-only", "value"), + ::testing::Pair("shared", "credential"))); + EXPECT_EQ(credentialed->credentials(), captured_storage_credentials); +} + +TEST(RestFileIOTest, TableImplOverridesWarehouseScheme) { + captured_file_io_properties.clear(); + FileIORegistry::Register( + std::string(FileIORegistry::kArrowS3FileIO), + [](const std::unordered_map& properties) + -> Result> { + captured_file_io_properties = properties; + return std::make_unique(); + }); + + auto result = + MakeTableFileIO({{"warehouse", "/tmp/catalog-warehouse"}}, + {{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)}}, + /*storage_credentials=*/{}); + ASSERT_THAT(result, IsOk()); + EXPECT_THAT( + captured_file_io_properties, + ::testing::UnorderedElementsAre( + ::testing::Pair("warehouse", "/tmp/catalog-warehouse"), + ::testing::Pair("io-impl", std::string(FileIORegistry::kArrowS3FileIO)))); +} + +TEST(RestFileIOTest, TableFileIORejectsCredentials) { + const std::string custom_impl = "com.mycompany.PlainFileIO"; + FileIORegistry::Register( + custom_impl, + [](const std::unordered_map& /*properties*/) + -> Result> { return std::make_unique(); }); + + auto result = MakeTableFileIO( + {{"warehouse", "s3://catalog/warehouse"}}, {{"io-impl", custom_impl}}, + {{.prefix = "s3://bucket/table", .config = {{"k", "v"}}}}); + EXPECT_THAT(result, IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(result, HasErrorMessage("does not support vended storage credentials")); +} + } // namespace iceberg::rest diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 50507dd3a..dd9326cb2 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -82,6 +82,11 @@ static std::shared_ptr MakeSimpleTableMetadata() { }); } +std::string LoadTableJsonWithCredentials(std::string_view storage_credentials) { + return std::string(R"({"storage-credentials":)") + std::string(storage_credentials) + + R"(,"metadata":{"format-version":2,"table-uuid":"test","location":"s3://test","last-sequence-number":0,"last-column-id":1,"last-updated-ms":0,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0}})"; +} + // Test parameter structure for roundtrip tests template struct JsonRoundTripParam { @@ -122,6 +127,7 @@ template struct JsonInvalidParam { std::string test_name; std::string invalid_json_str; + ErrorKind expected_error_kind = ErrorKind::kJsonParseError; std::string expected_error_message; }; @@ -135,7 +141,7 @@ class JsonInvalidTest : public ::testing::TestWithParam> const auto& param = Base::GetParam(); auto result = FromJson(nlohmann::json::parse(param.invalid_json_str)); - ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(result, IsError(param.expected_error_kind)); ASSERT_THAT(result, HasErrorMessage(param.expected_error_message)) << result.error().message; } @@ -1116,7 +1122,18 @@ INSTANTIATE_TEST_SUITE_P( .model = {.metadata_location = "s3://bucket/metadata/v1.json", .metadata = MakeSimpleTableMetadata(), .config = {{"warehouse", "s3://bucket/warehouse"}, - {"foo", "bar"}}}}), + {"foo", "bar"}}}}, + LoadTableResultParam{ + .test_name = "WithCredentials", + .expected_json_str = + R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot-log":[],"snapshots":[],"sort-orders":[{"fields":[],"order-id":0}],"statistics":[],"table-uuid":"test-uuid-1234"},"storage-credentials":[{"config":{"client.region":"us-east-1","s3.access-key-id":"AKIAtest","s3.secret-access-key":"secret","s3.session-token":"token"},"prefix":"s3"}]})", + .model = + {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = {{.prefix = "s3", + .config = {{"s3.access-key-id", "AKIAtest"}, + {"s3.secret-access-key", "secret"}, + {"s3.session-token", "token"}, + {"client.region", "us-east-1"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); @@ -1184,7 +1201,30 @@ INSTANTIATE_TEST_SUITE_P( LoadTableResultInvalidParam{ .test_name = "InvalidMetadataContent", .invalid_json_str = R"({"metadata":{"format-version":"invalid"}})", - .expected_error_message = "type must be number, but is string"}), + .expected_error_message = "type must be number, but is string"}, + LoadTableResultInvalidParam{ + .test_name = "CredentialsNotArray", + .invalid_json_str = LoadTableJsonWithCredentials(R"("oops")"), + .expected_error_message = "Cannot parse storage credentials from non-array"}, + LoadTableResultInvalidParam{ + .test_name = "CredentialMissingPrefix", + .invalid_json_str = LoadTableJsonWithCredentials(R"([{"config":{"k":"v"}}])"), + .expected_error_message = "Missing 'prefix'"}, + LoadTableResultInvalidParam{ + .test_name = "CredentialMissingConfig", + .invalid_json_str = LoadTableJsonWithCredentials(R"([{"prefix":"s3"}])"), + .expected_error_message = "Missing 'config'"}, + LoadTableResultInvalidParam{.test_name = "CredentialEmptyPrefix", + .invalid_json_str = LoadTableJsonWithCredentials( + R"([{"prefix":"","config":{"k":"v"}}])"), + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_message = "prefix must be non-empty"}, + LoadTableResultInvalidParam{ + .test_name = "CredentialEmptyConfig", + .invalid_json_str = + LoadTableJsonWithCredentials(R"([{"prefix":"s3","config":{}}])"), + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_message = "config must be non-empty"}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 784b3e03b..870badba0 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -201,6 +201,7 @@ class PartitionSummary; /// \brief File I/O. struct ReaderOptions; struct WriterOptions; +struct StorageCredential; class FileIO; class Reader; class Writer;