diff --git a/.github/workflows/cpp-linter.yml b/.github/workflows/cpp-linter.yml index 304d1a58b..45ba9cd07 100644 --- a/.github/workflows/cpp-linter.yml +++ b/.github/workflows/cpp-linter.yml @@ -79,7 +79,8 @@ jobs: -DICEBERG_BUILD_SQL_CATALOG=ON \ -DICEBERG_SQL_SQLITE=ON \ -DICEBERG_SQL_POSTGRESQL=ON \ - -DICEBERG_SQL_MYSQL=ON + -DICEBERG_SQL_MYSQL=ON \ + -DICEBERG_BUILD_HIVE=ON cmake --build . - name: Show sccache stats shell: bash diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 23fcc2d1c..f5ba7732f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -86,6 +86,50 @@ jobs: CC: gcc-14 CXX: g++-14 run: ci/scripts/build_example.sh $(pwd)/example + hive: + if: ${{ github.event_name != 'pull_request' || github.event.pull_request.draft == false }} + name: AMD64 Ubuntu 26.04 Hive + runs-on: ubuntu-26.04 + timeout-minutes: 45 + strategy: + fail-fast: false + env: + SCCACHE_DIR: ${{ github.workspace }}/.sccache + SCCACHE_CACHE_SIZE: "2G" + ICEBERG_EXTRA_CMAKE_ARGS: "-DICEBERG_BUILD_HIVE=ON" + steps: + - name: Checkout iceberg-cpp + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 + with: + persist-credentials: false + - name: Install dependencies + shell: bash + run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev + - name: Restore sccache cache + uses: actions/cache/restore@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: ${{ github.workspace }}/.sccache + key: sccache-test-hive-${{ github.run_id }} + restore-keys: | + sccache-test-hive- + sccache-test-ubuntu- + - name: Setup sccache + uses: mozilla-actions/sccache-action@9e7fa8a12102821edf02ca5dbea1acd0f89a2696 # v0.0.10 + - name: Build and test Iceberg with Hive + shell: bash + env: + CC: gcc-14 + CXX: g++-14 + run: ci/scripts/build_iceberg.sh $(pwd) OFF ON + - name: Show sccache stats + shell: bash + run: sccache --show-stats + - name: Save sccache cache + if: github.ref == 'refs/heads/main' + uses: actions/cache/save@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: ${{ github.workspace }}/.sccache + key: sccache-test-hive-${{ github.run_id }} macos: if: ${{ github.event_name != 'pull_request' || github.event.pull_request.draft == false }} name: AArch64 macOS 26 diff --git a/src/iceberg/catalog/hive/CMakeLists.txt b/src/iceberg/catalog/hive/CMakeLists.txt index 30c2a380a..5ec191d38 100644 --- a/src/iceberg/catalog/hive/CMakeLists.txt +++ b/src/iceberg/catalog/hive/CMakeLists.txt @@ -54,7 +54,7 @@ if(NOT TARGET thrift::thrift) "-DICEBERG_BUNDLE_THRIFT=OFF against a system Thrift install.") endif() -set(ICEBERG_HIVE_SOURCES hive_catalog.cc hive_catalog_properties.cc +set(ICEBERG_HIVE_SOURCES hive_catalog.cc hive_catalog_properties.cc hms_client.cc ${ICEBERG_HIVE_THRIFT_GEN_SOURCES}) set(ICEBERG_HIVE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/catalog/hive/hms_client.cc b/src/iceberg/catalog/hive/hms_client.cc new file mode 100644 index 000000000..4bc0b76ad --- /dev/null +++ b/src/iceberg/catalog/hive/hms_client.cc @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/hive/hms_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "ThriftHiveMetastore.h" +#include "iceberg/util/macros.h" + +namespace iceberg::hive { + +namespace { + +constexpr std::string_view kThriftPrefix = "thrift://"; + +std::string_view StripScheme(std::string_view spec) { + if (spec.starts_with(kThriftPrefix)) { + return spec.substr(kThriftPrefix.size()); + } + return spec; +} + +std::string_view Trim(std::string_view spec) { + while (!spec.empty() && (std::isspace(static_cast(spec.front())) != 0)) { + spec.remove_prefix(1); + } + while (!spec.empty() && (std::isspace(static_cast(spec.back())) != 0)) { + spec.remove_suffix(1); + } + return spec; +} + +Result ParseSingleEndpoint(std::string_view spec) { + spec = Trim(spec); + spec = StripScheme(spec); + spec = Trim(spec); + if (spec.empty()) { + return InvalidArgument("Empty HMS endpoint in URI list."); + } + + HmsEndpoint endpoint; + const auto colon = spec.rfind(':'); + if (colon == std::string_view::npos) { + endpoint.host = std::string(spec); + endpoint.port = kDefaultHmsPort; + return endpoint; + } + + endpoint.host = std::string(spec.substr(0, colon)); + if (endpoint.host.empty()) { + return InvalidArgument("HMS endpoint has empty host: '{}'.", spec); + } + + const auto port_str = spec.substr(colon + 1); + if (port_str.empty()) { + endpoint.port = kDefaultHmsPort; + return endpoint; + } + + int port = 0; + const auto* const port_end = port_str.data() + port_str.size(); + const auto [ptr, ec] = std::from_chars(port_str.data(), port_end, port); + if (ec != std::errc() || ptr != port_end || port <= 0 || port > 65535) { + return InvalidArgument("Invalid HMS port in endpoint '{}'.", spec); + } + endpoint.port = port; + return endpoint; +} + +} // namespace + +Result> ParseHmsUris(std::string_view uri) { + std::vector endpoints; + if (Trim(uri).empty()) { + return InvalidArgument("HMS URI is empty."); + } + + std::size_t pos = 0; + while (pos <= uri.size()) { + const auto comma = uri.find(',', pos); + const auto piece = uri.substr( + pos, comma == std::string_view::npos ? std::string_view::npos : comma - pos); + ICEBERG_ASSIGN_OR_RAISE(auto endpoint, ParseSingleEndpoint(piece)); + endpoints.push_back(std::move(endpoint)); + if (comma == std::string_view::npos) { + break; + } + pos = comma + 1; + } + return endpoints; +} + +// Fields are declared in dependency order (socket <- transport <- protocol +// <- client) so the client tears down before the transport it borrows. +class HmsClient::Impl { + public: + std::shared_ptr socket; + std::shared_ptr transport; + std::shared_ptr protocol; + std::unique_ptr client; +}; + +HmsClient::HmsClient(std::unique_ptr impl) : impl_(std::move(impl)) {} + +HmsClient::~HmsClient() { + if (impl_ && impl_->transport && impl_->transport->isOpen()) { + try { + impl_->transport->close(); + } catch (const apache::thrift::TException&) { + // Best-effort close on teardown; ignore exceptions. + } + } +} + +Result> HmsClient::Connect( + const HiveCatalogProperties& config) { + ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri()); + ICEBERG_ASSIGN_OR_RAISE(auto endpoints, ParseHmsUris(uri)); + ICEBERG_ASSIGN_OR_RAISE(auto transport_mode, config.ThriftTransport()); + + // HA failover beyond the first endpoint is left for a later commit. + const HmsEndpoint& endpoint = endpoints.front(); + const int connect_timeout_ms = config.Get(HiveCatalogProperties::kConnectTimeoutMs); + const int socket_timeout_ms = config.Get(HiveCatalogProperties::kSocketTimeoutMs); + + auto socket = + std::make_shared(endpoint.host, endpoint.port); + socket->setConnTimeout(connect_timeout_ms); + socket->setRecvTimeout(socket_timeout_ms); + socket->setSendTimeout(socket_timeout_ms); + + std::shared_ptr transport; + switch (transport_mode) { + case HiveThriftTransport::kBuffered: + transport = std::make_shared(socket); + break; + case HiveThriftTransport::kFramed: + transport = std::make_shared(socket); + break; + } + + auto protocol = std::make_shared(transport); + auto client = + std::make_unique(protocol); + + try { + transport->open(); + } catch (const apache::thrift::transport::TTransportException& e) { + return IOError("Failed to connect to HMS at {}:{} : {}", endpoint.host, endpoint.port, + e.what()); + } catch (const apache::thrift::TException& e) { + return IOError("Thrift error contacting HMS at {}:{} : {}", endpoint.host, + endpoint.port, e.what()); + } + + auto impl = std::make_unique(); + impl->socket = std::move(socket); + impl->transport = std::move(transport); + impl->protocol = std::move(protocol); + impl->client = std::move(client); + return std::unique_ptr(new HmsClient(std::move(impl))); +} + +} // namespace iceberg::hive diff --git a/src/iceberg/catalog/hive/hms_client.h b/src/iceberg/catalog/hive/hms_client.h new file mode 100644 index 000000000..39b739e71 --- /dev/null +++ b/src/iceberg/catalog/hive/hms_client.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/catalog/hive/hive_catalog_properties.h" +#include "iceberg/catalog/hive/iceberg_hive_export.h" +#include "iceberg/result.h" + +/// \file iceberg/catalog/hive/hms_client.h +/// \brief Thin wrapper around the generated Hive Metastore Thrift client. +/// +/// Thrift types are kept out of the public interface via a pImpl. + +namespace iceberg::hive { + +/// \brief Hive's well-known metastore port, used when an HMS URI omits one. +inline constexpr int kDefaultHmsPort = 9083; + +/// \brief A single host:port pair parsed from an HMS URI. +/// +/// HMS URIs commonly take one of the forms: +/// * `thrift://host:port` (Java HiveCatalog convention) +/// * `host:port` (iceberg-rust HmsCatalog convention) +/// * comma-separated list of either form for HA failover +/// +/// `port` defaults to 9083 (Hive's well-known metastore port) when the +/// caller omits an explicit port. +struct ICEBERG_HIVE_EXPORT HmsEndpoint { + std::string host; + int port = kDefaultHmsPort; +}; + +/// \brief Parse an HMS URI string into one or more endpoints. +/// +/// Each comma-separated segment is treated as an independent endpoint; +/// surrounding whitespace and an optional `thrift://` scheme prefix are +/// stripped. Returns an InvalidArgument error if any segment fails to +/// produce a non-empty host and a port within (0, 65535]. +ICEBERG_HIVE_EXPORT Result> ParseHmsUris(std::string_view uri); + +/// \brief A live connection to a Hive Metastore over Thrift. +/// +/// Construction goes through `Connect`, which parses the URI, selects the +/// transport and opens it so that configuration errors surface as +/// `iceberg::Error` rather than C++ exceptions. +class ICEBERG_HIVE_EXPORT HmsClient { + public: + ~HmsClient(); + + HmsClient(const HmsClient&) = delete; + HmsClient& operator=(const HmsClient&) = delete; + HmsClient(HmsClient&&) = delete; + HmsClient& operator=(HmsClient&&) = delete; + + /// \brief Connect to the Hive Metastore described by `config`. + /// + /// The first endpoint listed in `config.Uri()` is used; HA failover + /// to subsequent endpoints is left to a future commit. + static Result> Connect(const HiveCatalogProperties& config); + + private: + class Impl; + std::unique_ptr impl_; + + explicit HmsClient(std::unique_ptr impl); +}; + +} // namespace iceberg::hive diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fcbc22126..868081a67 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -316,3 +316,27 @@ if(ICEBERG_BUILD_REST) util/docker_compose_util.cc) endif() endif() + +if(ICEBERG_BUILD_HIVE) + function(add_hive_iceberg_test test_name) + set(options) + set(oneValueArgs) + set(multiValueArgs SOURCES) + cmake_parse_arguments(ARG + "${options}" + "${oneValueArgs}" + "${multiValueArgs}" + ${ARGN}) + + add_executable(${test_name}) + target_include_directories(${test_name} PRIVATE "${CMAKE_BINARY_DIR}/iceberg/test/") + target_sources(${test_name} PRIVATE ${ARG_SOURCES}) + target_link_libraries(${test_name} PRIVATE GTest::gmock_main iceberg_hive_static) + if(MSVC_TOOLCHAIN) + target_compile_options(${test_name} PRIVATE /bigobj) + endif() + add_test(NAME ${test_name} COMMAND ${test_name}) + endfunction() + + add_hive_iceberg_test(hive_catalog_test SOURCES hms_client_test.cc) +endif() diff --git a/src/iceberg/test/hms_client_test.cc b/src/iceberg/test/hms_client_test.cc new file mode 100644 index 000000000..cc3efe41d --- /dev/null +++ b/src/iceberg/test/hms_client_test.cc @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/hive/hms_client.h" + +#include + +#include "iceberg/catalog/hive/hive_catalog_properties.h" + +namespace iceberg::hive { + +TEST(ParseHmsUrisTest, SingleHostPort) { + auto result = ParseHmsUris("localhost:9083"); + ASSERT_TRUE(result.has_value()) << result.error().message; + ASSERT_EQ(result->size(), 1); + EXPECT_EQ((*result)[0].host, "localhost"); + EXPECT_EQ((*result)[0].port, 9083); +} + +TEST(ParseHmsUrisTest, ThriftSchemePrefixStripped) { + auto result = ParseHmsUris("thrift://hms.example.com:9084"); + ASSERT_TRUE(result.has_value()) << result.error().message; + ASSERT_EQ(result->size(), 1); + EXPECT_EQ((*result)[0].host, "hms.example.com"); + EXPECT_EQ((*result)[0].port, 9084); +} + +TEST(ParseHmsUrisTest, DefaultsToHmsPortWhenAbsent) { + auto result = ParseHmsUris("hms.example.com"); + ASSERT_TRUE(result.has_value()) << result.error().message; + ASSERT_EQ(result->size(), 1); + EXPECT_EQ((*result)[0].host, "hms.example.com"); + EXPECT_EQ((*result)[0].port, 9083); +} + +TEST(ParseHmsUrisTest, DefaultsToHmsPortWhenColonOnly) { + auto result = ParseHmsUris("hms.example.com:"); + ASSERT_TRUE(result.has_value()) << result.error().message; + ASSERT_EQ((*result)[0].port, 9083); +} + +TEST(ParseHmsUrisTest, MultipleHaEndpoints) { + auto result = ParseHmsUris("thrift://h1:9083,h2:9084,h3"); + ASSERT_TRUE(result.has_value()) << result.error().message; + ASSERT_EQ(result->size(), 3); + EXPECT_EQ((*result)[0].host, "h1"); + EXPECT_EQ((*result)[0].port, 9083); + EXPECT_EQ((*result)[1].host, "h2"); + EXPECT_EQ((*result)[1].port, 9084); + EXPECT_EQ((*result)[2].host, "h3"); + EXPECT_EQ((*result)[2].port, 9083); +} + +TEST(ParseHmsUrisTest, WhitespaceAroundEndpointsTolerated) { + auto result = ParseHmsUris(" thrift://h1:9083 , h2:9084 "); + ASSERT_TRUE(result.has_value()) << result.error().message; + ASSERT_EQ(result->size(), 2); + EXPECT_EQ((*result)[0].host, "h1"); + EXPECT_EQ((*result)[1].host, "h2"); +} + +TEST(ParseHmsUrisTest, EmptyUriFails) { + EXPECT_FALSE(ParseHmsUris("").has_value()); + EXPECT_FALSE(ParseHmsUris(" ").has_value()); +} + +TEST(ParseHmsUrisTest, EmptyHostFails) { + EXPECT_FALSE(ParseHmsUris(":9083").has_value()); + EXPECT_FALSE(ParseHmsUris("thrift://:9083").has_value()); +} + +TEST(ParseHmsUrisTest, NonNumericPortFails) { + EXPECT_FALSE(ParseHmsUris("localhost:abc").has_value()); +} + +TEST(ParseHmsUrisTest, OutOfRangePortFails) { + EXPECT_FALSE(ParseHmsUris("localhost:0").has_value()); + EXPECT_FALSE(ParseHmsUris("localhost:65536").has_value()); + EXPECT_FALSE(ParseHmsUris("localhost:99999").has_value()); +} + +TEST(ParseHmsUrisTest, TrailingCharsAfterPortFails) { + EXPECT_FALSE(ParseHmsUris("localhost:9083abc").has_value()); + EXPECT_FALSE(ParseHmsUris("thrift://host:9083/path").has_value()); +} + +TEST(ParseHmsUrisTest, EmptySegmentInListFails) { + EXPECT_FALSE(ParseHmsUris("h1:9083,,h2:9084").has_value()); + EXPECT_FALSE(ParseHmsUris("h1:9083,").has_value()); +} + +TEST(HmsClientConnectTest, MissingUriIsInvalidArgument) { + auto config = HiveCatalogProperties::default_properties(); + auto client = HmsClient::Connect(config); + ASSERT_FALSE(client.has_value()); + EXPECT_EQ(client.error().kind, ErrorKind::kInvalidArgument); +} + +TEST(HmsClientConnectTest, BadUriIsInvalidArgument) { + auto config = HiveCatalogProperties::FromMap( + {{std::string(HiveCatalogProperties::kUri.key()), "thrift://:9083"}}); + auto client = HmsClient::Connect(config); + ASSERT_FALSE(client.has_value()); + EXPECT_EQ(client.error().kind, ErrorKind::kInvalidArgument); +} + +TEST(HmsClientConnectTest, BadTransportIsInvalidArgument) { + auto config = HiveCatalogProperties::FromMap( + {{std::string(HiveCatalogProperties::kUri.key()), "localhost:9083"}, + {std::string(HiveCatalogProperties::kThriftTransport.key()), "bogus"}}); + auto client = HmsClient::Connect(config); + ASSERT_FALSE(client.has_value()); + EXPECT_EQ(client.error().kind, ErrorKind::kInvalidArgument); +} + +TEST(HmsClientConnectTest, UnreachableHmsIsIoError) { + // Port 1 is privileged; nothing should be listening. We assert that the + // failure surfaces as IOError rather than a Thrift C++ exception. + auto config = HiveCatalogProperties::FromMap( + {{std::string(HiveCatalogProperties::kUri.key()), "127.0.0.1:1"}, + {std::string(HiveCatalogProperties::kConnectTimeoutMs.key()), "200"}}); + auto client = HmsClient::Connect(config); + ASSERT_FALSE(client.has_value()); + EXPECT_EQ(client.error().kind, ErrorKind::kIOError); +} + +} // namespace iceberg::hive