diff --git a/providers/amazon/docs/operators/glue_catalog.rst b/providers/amazon/docs/operators/glue_catalog.rst index e8777727940f3..8fe4a8224ff31 100644 --- a/providers/amazon/docs/operators/glue_catalog.rst +++ b/providers/amazon/docs/operators/glue_catalog.rst @@ -96,3 +96,17 @@ To create a partition in an AWS Glue Data Catalog table, use :dedent: 4 :start-after: [START howto_operator_glue_catalog_create_partition] :end-before: [END howto_operator_glue_catalog_create_partition] + +.. _howto/operator:GlueCatalogBatchDeletePartitionOperator: + +Batch Delete Partitions +----------------------- + +To delete one or more partitions from an AWS Glue Data Catalog table, use +:class:`~airflow.providers.amazon.aws.operators.glue_catalog.GlueCatalogBatchDeletePartitionOperator`. + +.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_glue_catalog.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_glue_catalog_batch_delete_partition] + :end-before: [END howto_operator_glue_catalog_batch_delete_partition] diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py index 76141a4d8a4fd..296f6214e9999 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py @@ -24,6 +24,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields from airflow.utils.helpers import prune_dict if TYPE_CHECKING: @@ -336,3 +337,70 @@ def execute(self, context: Context) -> None: else: raise self.log.info("Partition created.") + + +class GlueCatalogBatchDeletePartitionOperator(AwsBaseOperator[AwsBaseHook]): + """ + Delete one or more partitions from an AWS Glue Data Catalog table. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueCatalogBatchDeletePartitionOperator` + + :param database_name: The name of the database. (templated) + :param table_name: The name of the table. (templated) + :param partitions_to_delete: List of partition value dicts to delete. (templated) + :param catalog_id: The ID of the Data Catalog. Defaults to the account ID. (templated) + """ + + aws_hook_class = AwsBaseHook + template_fields: tuple[str, ...] = aws_template_fields( + "database_name", "table_name", "catalog_id", "partitions_to_delete" + ) + + def __init__( + self, + *, + database_name: str, + table_name: str, + partitions_to_delete: list[dict[str, list[str]]], + catalog_id: str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.database_name = database_name + self.table_name = table_name + self.partitions_to_delete = partitions_to_delete + self.catalog_id = catalog_id + + @property + def _hook_parameters(self) -> dict[str, Any]: + return {**super()._hook_parameters, "client_type": "glue"} + + def execute(self, context: Context) -> list[dict[str, Any]]: + self.log.info( + "Deleting %d partitions from %s.%s", + len(self.partitions_to_delete), + self.database_name, + self.table_name, + ) + kwargs: dict[str, Any] = prune_dict( + { + "DatabaseName": self.database_name, + "TableName": self.table_name, + "PartitionsToDelete": self.partitions_to_delete, + "CatalogId": self.catalog_id, + } + ) + response = self.hook.conn.batch_delete_partition(**kwargs) + errors = response.get("Errors", []) + if errors: + # EntityNotFoundException is expected for idempotent deletes + real_errors = [ + e for e in errors if e.get("ErrorDetail", {}).get("ErrorCode") != "EntityNotFoundException" + ] + if real_errors: + raise RuntimeError(f"Failed to delete {len(real_errors)} partition(s): {real_errors}") + self.log.info("Some partitions not found (already deleted), continuing.") + self.log.info("Batch delete partitions complete.") + return errors diff --git a/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py b/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py index 6eb736e1eedc2..7eed9c8df10f4 100644 --- a/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py +++ b/providers/amazon/tests/system/amazon/aws/example_glue_catalog.py @@ -19,6 +19,7 @@ from datetime import datetime from airflow.providers.amazon.aws.operators.glue_catalog import ( + GlueCatalogBatchDeletePartitionOperator, GlueCatalogCreateDatabaseOperator, GlueCatalogCreatePartitionOperator, GlueCatalogCreateTableOperator, @@ -106,6 +107,16 @@ ) # [END howto_operator_glue_catalog_create_partition] + # [START howto_operator_glue_catalog_batch_delete_partition] + batch_delete_partition = GlueCatalogBatchDeletePartitionOperator( + task_id="batch_delete_partition", + database_name=db_name, + table_name=table_name, + partitions_to_delete=[{"Values": ["2024-01-01"]}], + trigger_rule=TriggerRule.ALL_DONE, + ) + # [END howto_operator_glue_catalog_batch_delete_partition] + # [START howto_operator_glue_catalog_delete_table] delete_table = GlueCatalogDeleteTableOperator( task_id="delete_table", @@ -120,6 +131,7 @@ create_database, create_table, create_partition, + batch_delete_partition, delete_table, delete_database, ) diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py index 6d40e0f36eb84..ef9e4bdf49828 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_catalog.py @@ -24,6 +24,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.operators.glue_catalog import ( + GlueCatalogBatchDeletePartitionOperator, GlueCatalogCreateDatabaseOperator, GlueCatalogCreatePartitionOperator, GlueCatalogCreateTableOperator, @@ -264,3 +265,58 @@ def test_execute_skip_existing(self, mock_conn): def test_template_fields(self): validate_template_fields(self.operator) + + +PARTITIONS_TO_DELETE = [{"Values": ["2024-01-01"]}, {"Values": ["2024-01-02"]}] + + +class TestGlueCatalogBatchDeletePartitionOperator: + def setup_method(self): + self.operator = GlueCatalogBatchDeletePartitionOperator( + task_id="batch_delete_partition", + database_name=DB_NAME, + table_name=TABLE_NAME, + partitions_to_delete=PARTITIONS_TO_DELETE, + ) + + @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock) + def test_execute(self, mock_conn): + mock_client = mock.MagicMock() + mock_client.batch_delete_partition.return_value = {"Errors": []} + mock_conn.return_value = mock_client + + result = self.operator.execute({}) + + mock_client.batch_delete_partition.assert_called_once_with( + DatabaseName=DB_NAME, TableName=TABLE_NAME, PartitionsToDelete=PARTITIONS_TO_DELETE + ) + assert result == [] + + @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock) + def test_execute_with_not_found_errors(self, mock_conn): + """EntityNotFoundException is expected (idempotent delete) and should not raise.""" + mock_client = mock.MagicMock() + errors = [ + {"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode": "EntityNotFoundException"}} + ] + mock_client.batch_delete_partition.return_value = {"Errors": errors} + mock_conn.return_value = mock_client + + result = self.operator.execute({}) + assert result == errors + + @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock) + def test_execute_with_real_errors(self, mock_conn): + """Non-EntityNotFoundException errors should raise RuntimeError.""" + mock_client = mock.MagicMock() + errors = [ + {"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode": "InternalServiceException"}} + ] + mock_client.batch_delete_partition.return_value = {"Errors": errors} + mock_conn.return_value = mock_client + + with pytest.raises(RuntimeError, match="Failed to delete 1 partition"): + self.operator.execute({}) + + def test_template_fields(self): + validate_template_fields(self.operator)