Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 130 additions & 4 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import logging
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -55,6 +56,8 @@
from pyiceberg.serializers import FromInputFile, ToOutputFile
from pyiceberg.table import (
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
)
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -314,6 +317,47 @@ def add_glue_catalog_id(params: dict[str, str], **kwargs: Any) -> None:
event_system.register("provide-client-params.glue", add_glue_catalog_id)


class _S3TablesCreateTableTransaction(CreateTableTransaction):
"""CreateTableTransaction that cleans up the S3 Tables staging table on failure.

When ``create_table_transaction`` pre-creates a Glue table entry for an S3 Tables
federated database, that entry must be deleted if the transaction is never committed
(e.g. an exception inside the ``with`` block) or if the commit itself fails.
"""

def __init__(self, staged_table: StagedTable, catalog: "GlueCatalog", database_name: str, table_name: str):
super().__init__(staged_table)
self._catalog = catalog
self._database_name = database_name
self._table_name = table_name
self._staging_table_needs_cleanup = True

def commit_transaction(self) -> Table:
try:
result = super().commit_transaction()
self._staging_table_needs_cleanup = False # commit succeeded; staging table is now the real table
return result
except Exception:
self._cleanup_staging_table()
raise

def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
super().__exit__(exctype, excinst, exctb)
self._cleanup_staging_table()

def _cleanup_staging_table(self) -> None:
if not self._staging_table_needs_cleanup:
return
self._staging_table_needs_cleanup = False
try:
self._catalog.glue.delete_table(DatabaseName=self._database_name, Name=self._table_name)
except Exception:
logger.warning(
f"Failed to clean up S3 Tables staging table {self._database_name}.{self._table_name}",
exc_info=logger.isEnabledFor(logging.DEBUG),
)


class GlueCatalog(MetastoreCatalog):
glue: "GlueClient"

Expand Down Expand Up @@ -601,6 +645,82 @@ def create_table(
catalog=self,
)

def create_table_transaction(
self,
identifier: str | Identifier,
schema: Union[Schema, "pa.Schema"],
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
"""Create a CreateTableTransaction.

For S3 Tables federated databases, storage must be allocated before the table
metadata can be built, because S3 Tables manages the table location. This
override pre-creates a minimal Glue table entry, retrieves the managed location,
and then builds the staged table targeting that location. The commit path in
``commit_table`` will find the existing Glue table and update it with the final
metadata pointer.

For non-S3 Tables databases, this delegates to the base class.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier)

if not self._is_s3tables_database(database_name):
return super().create_table_transaction(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)

if location is not None:
raise ValueError(
f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. "
"S3 Tables manages the storage location automatically."
)

# Create a minimal table in Glue so S3 Tables allocates storage
self._create_glue_table(
database_name=database_name,
table_name=table_name,
table_input={
"Name": table_name,
"Parameters": {"format": "ICEBERG"},
},
)

try:
# Retrieve the managed storage location.
glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
storage_descriptor = glue_table.get("StorageDescriptor", {})
managed_location = storage_descriptor.get("Location")
if not managed_location:
raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}")

staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
location=managed_location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)
except Exception:
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except Exception:
logger.warning(
f"Failed to clean up S3 Tables table {database_name}.{table_name}",
exc_info=logger.isEnabledFor(logging.DEBUG),
)
raise

return _S3TablesCreateTableTransaction(staged_table, self, database_name, table_name)

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Expand Down Expand Up @@ -649,7 +769,12 @@ def commit_table(
try:
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
# A staging table (pre-created by create_table_transaction for S3 Tables)
# exists in Glue but has no metadata_location yet — skip loading Iceberg metadata.
if current_glue_table.get("Parameters", {}).get(METADATA_LOCATION):
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
else:
current_table = None
except NoSuchTableError:
current_glue_table = None
glue_table_version_id = None
Expand All @@ -669,8 +794,9 @@ def commit_table(
metadata_path=updated_staged_table.metadata_location,
)

if current_table:
# table exists, update the table
if current_glue_table is not None:
# Glue table exists — either a fully committed table or a staging table
# pre-created by create_table_transaction. Update it with the metadata pointer.
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
Expand All @@ -684,7 +810,7 @@ def commit_table(
properties=updated_staged_table.properties,
metadata=updated_staged_table.metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
prev_metadata_location=current_table.metadata_location if current_table else None,
)
self._update_glue_table(
database_name=database_name,
Expand Down
101 changes: 101 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,107 @@ def test_create_duplicated_table(
test_catalog.create_table(identifier, table_schema_nested)


@mock_aws
def test_create_table_transaction_s3tables(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

with test_catalog.create_table_transaction(
identifier,
table_schema_nested,
properties={"test_key": "test_value"},
):
pass

table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert table.location().rstrip("/") == f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}"
assert table.properties["test_key"] == "test_value"


@mock_aws
def test_create_table_transaction_s3tables_with_schema_evolution(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

with test_catalog.create_table_transaction(
identifier,
table_schema_nested,
) as txn:
with txn.update_schema() as update_schema:
update_schema.add_column(path="new_col", field_type=IntegerType())

table = test_catalog.load_table(identifier)
assert table.schema().find_field("new_col").field_type == IntegerType()


@mock_aws
def test_create_table_transaction_s3tables_rejects_location(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

with pytest.raises(ValueError, match="Cannot specify a location for S3 Tables table"):
test_catalog.create_table_transaction(identifier, table_schema_nested, location="s3://some-bucket/some-path")


@mock_aws
def test_create_table_transaction_s3tables_cleanup_on_exception(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
"""Staging table should be cleaned up if the transaction is not committed."""
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

with pytest.raises(RuntimeError, match="intentional"):
with test_catalog.create_table_transaction(
identifier,
table_schema_nested,
):
raise RuntimeError("intentional")

# The staging table should have been cleaned up, so creating the table again should work.
table = test_catalog.create_table(identifier, table_schema_nested) # type: ignore[unreachable]
assert table.name() == identifier


@mock_aws
def test_load_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
Expand Down
Loading