From 5e105ad12ca993e285294e23cef3b1cfadcaf775 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Fri, 27 Mar 2026 00:18:12 +0000 Subject: [PATCH] fix(glue): Support create_table_transaction for S3 Tables federated databases For S3 Tables, the warehouse location is managed by S3 Tables and only available after a Glue table entry is created. create_table() already handled this via _create_table_s3tables(), but create_table_transaction() went through the base class path which failed with 'No default path is set'. This adds a create_table_transaction() override on GlueCatalog that, for S3 Tables federated databases: 1. Pre-creates a minimal Glue table entry so S3 Tables allocates storage 2. Retrieves the managed location from the Glue table 3. Builds the staged table targeting that managed location 4. Returns an _S3TablesCreateTableTransaction that cleans up the staging table on abort/failure Also refactors commit_table() to handle the case where a Glue table exists but has no metadata_location yet (the staging table case), branching on current_glue_table instead of current_table for the update-vs-create decision. --- pyiceberg/catalog/glue.py | 134 +++++++++++++++++++++++++++++++++++-- tests/catalog/test_glue.py | 101 ++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 83c06c3438..3738d2ceb1 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -17,6 +17,7 @@ import logging +from types import TracebackType from typing import ( TYPE_CHECKING, Any, @@ -55,6 +56,8 @@ from pyiceberg.serializers import FromInputFile, ToOutputFile from pyiceberg.table import ( CommitTableResponse, + CreateTableTransaction, + StagedTable, Table, ) from pyiceberg.table.metadata import TableMetadata @@ -314,6 +317,47 @@ def add_glue_catalog_id(params: dict[str, str], **kwargs: Any) -> None: event_system.register("provide-client-params.glue", add_glue_catalog_id) +class _S3TablesCreateTableTransaction(CreateTableTransaction): + """CreateTableTransaction that cleans up the S3 Tables staging table on failure. + + When ``create_table_transaction`` pre-creates a Glue table entry for an S3 Tables + federated database, that entry must be deleted if the transaction is never committed + (e.g. an exception inside the ``with`` block) or if the commit itself fails. + """ + + def __init__(self, staged_table: StagedTable, catalog: "GlueCatalog", database_name: str, table_name: str): + super().__init__(staged_table) + self._catalog = catalog + self._database_name = database_name + self._table_name = table_name + self._staging_table_needs_cleanup = True + + def commit_transaction(self) -> Table: + try: + result = super().commit_transaction() + self._staging_table_needs_cleanup = False # commit succeeded; staging table is now the real table + return result + except Exception: + self._cleanup_staging_table() + raise + + def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None: + super().__exit__(exctype, excinst, exctb) + self._cleanup_staging_table() + + def _cleanup_staging_table(self) -> None: + if not self._staging_table_needs_cleanup: + return + self._staging_table_needs_cleanup = False + try: + self._catalog.glue.delete_table(DatabaseName=self._database_name, Name=self._table_name) + except Exception: + logger.warning( + f"Failed to clean up S3 Tables staging table {self._database_name}.{self._table_name}", + exc_info=logger.isEnabledFor(logging.DEBUG), + ) + + class GlueCatalog(MetastoreCatalog): glue: "GlueClient" @@ -601,6 +645,82 @@ def create_table( catalog=self, ) + def create_table_transaction( + self, + identifier: str | Identifier, + schema: Union[Schema, "pa.Schema"], + location: str | None = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> CreateTableTransaction: + """Create a CreateTableTransaction. + + For S3 Tables federated databases, storage must be allocated before the table + metadata can be built, because S3 Tables manages the table location. This + override pre-creates a minimal Glue table entry, retrieves the managed location, + and then builds the staged table targeting that location. The commit path in + ``commit_table`` will find the existing Glue table and update it with the final + metadata pointer. + + For non-S3 Tables databases, this delegates to the base class. + """ + database_name, table_name = self.identifier_to_database_and_table(identifier) + + if not self._is_s3tables_database(database_name): + return super().create_table_transaction( + identifier=identifier, + schema=schema, + location=location, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + + if location is not None: + raise ValueError( + f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. " + "S3 Tables manages the storage location automatically." + ) + + # Create a minimal table in Glue so S3 Tables allocates storage + self._create_glue_table( + database_name=database_name, + table_name=table_name, + table_input={ + "Name": table_name, + "Parameters": {"format": "ICEBERG"}, + }, + ) + + try: + # Retrieve the managed storage location. + glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) + storage_descriptor = glue_table.get("StorageDescriptor", {}) + managed_location = storage_descriptor.get("Location") + if not managed_location: + raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}") + + staged_table = self._create_staged_table( + identifier=identifier, + schema=schema, + location=managed_location, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + except Exception: + try: + self.glue.delete_table(DatabaseName=database_name, Name=table_name) + except Exception: + logger.warning( + f"Failed to clean up S3 Tables table {database_name}.{table_name}", + exc_info=logger.isEnabledFor(logging.DEBUG), + ) + raise + + return _S3TablesCreateTableTransaction(staged_table, self, database_name, table_name) + def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -649,7 +769,12 @@ def commit_table( try: current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) glue_table_version_id = current_glue_table.get("VersionId") - current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + # A staging table (pre-created by create_table_transaction for S3 Tables) + # exists in Glue but has no metadata_location yet — skip loading Iceberg metadata. + if current_glue_table.get("Parameters", {}).get(METADATA_LOCATION): + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + else: + current_table = None except NoSuchTableError: current_glue_table = None glue_table_version_id = None @@ -669,8 +794,9 @@ def commit_table( metadata_path=updated_staged_table.metadata_location, ) - if current_table: - # table exists, update the table + if current_glue_table is not None: + # Glue table exists — either a fully committed table or a staging table + # pre-created by create_table_transaction. Update it with the metadata pointer. if not glue_table_version_id: raise CommitFailedException( f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" @@ -684,7 +810,7 @@ def commit_table( properties=updated_staged_table.properties, metadata=updated_staged_table.metadata, glue_table=current_glue_table, - prev_metadata_location=current_table.metadata_location, + prev_metadata_location=current_table.metadata_location if current_table else None, ) self._update_glue_table( database_name=database_name, diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index c8da49a87e..f2ea3520a3 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -312,6 +312,107 @@ def test_create_duplicated_table( test_catalog.create_table(identifier, table_schema_nested) +@mock_aws +def test_create_table_transaction_s3tables( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + with test_catalog.create_table_transaction( + identifier, + table_schema_nested, + properties={"test_key": "test_value"}, + ): + pass + + table = test_catalog.load_table(identifier) + assert table.name() == identifier + assert table.location().rstrip("/") == f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}" + assert table.properties["test_key"] == "test_value" + + +@mock_aws +def test_create_table_transaction_s3tables_with_schema_evolution( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + with test_catalog.create_table_transaction( + identifier, + table_schema_nested, + ) as txn: + with txn.update_schema() as update_schema: + update_schema.add_column(path="new_col", field_type=IntegerType()) + + table = test_catalog.load_table(identifier) + assert table.schema().find_field("new_col").field_type == IntegerType() + + +@mock_aws +def test_create_table_transaction_s3tables_rejects_location( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + with pytest.raises(ValueError, match="Cannot specify a location for S3 Tables table"): + test_catalog.create_table_transaction(identifier, table_schema_nested, location="s3://some-bucket/some-path") + + +@mock_aws +def test_create_table_transaction_s3tables_cleanup_on_exception( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + """Staging table should be cleaned up if the transaction is not committed.""" + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + with pytest.raises(RuntimeError, match="intentional"): + with test_catalog.create_table_transaction( + identifier, + table_schema_nested, + ): + raise RuntimeError("intentional") + + # The staging table should have been cleaned up, so creating the table again should work. + table = test_catalog.create_table(identifier, table_schema_nested) # type: ignore[unreachable] + assert table.name() == identifier + + @mock_aws def test_load_table( _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str