Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions providers/amazon/docs/operators/glue_catalog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,17 @@ To create a partition in an AWS Glue Data Catalog table, use
:dedent: 4
:start-after: [START howto_operator_glue_catalog_create_partition]
:end-before: [END howto_operator_glue_catalog_create_partition]

.. _howto/operator:GlueCatalogBatchDeletePartitionOperator:

Batch Delete Partitions
-----------------------

To delete one or more partitions from an AWS Glue Data Catalog table, use
:class:`~airflow.providers.amazon.aws.operators.glue_catalog.GlueCatalogBatchDeletePartitionOperator`.

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_glue_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_glue_catalog_batch_delete_partition]
:end-before: [END howto_operator_glue_catalog_batch_delete_partition]
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,67 @@ def execute(self, context: Context) -> None:
else:
raise
self.log.info("Partition created.")


class GlueCatalogBatchDeletePartitionOperator(AwsBaseOperator[AwsBaseHook]):
"""
Delete one or more partitions from an AWS Glue Data Catalog table.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogBatchDeletePartitionOperator`

:param database_name: The name of the database. (templated)
:param table_name: The name of the table. (templated)
:param partitions_to_delete: List of partition value lists to delete. (templated)
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID. (templated)
"""

aws_hook_class = AwsBaseHook
template_fields: tuple[str, ...] = (
*AwsBaseOperator.template_fields,
"database_name",
"table_name",
"catalog_id",
)

def __init__(
self,
*,
database_name: str,
table_name: str,
partitions_to_delete: list[dict[str, list[str]]],
catalog_id: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.database_name = database_name
self.table_name = table_name
self.partitions_to_delete = partitions_to_delete
self.catalog_id = catalog_id

@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}

def execute(self, context: Context) -> list[dict[str, Any]]:
self.log.info(
"Deleting %d partitions from %s.%s",
len(self.partitions_to_delete),
self.database_name,
self.table_name,
)
kwargs: dict[str, Any] = prune_dict(
{
"DatabaseName": self.database_name,
"TableName": self.table_name,
"PartitionsToDelete": self.partitions_to_delete,
"CatalogId": self.catalog_id,
}
)
response = self.hook.conn.batch_delete_partition(**kwargs)
errors = response.get("Errors", [])
if errors:
self.log.warning("Errors deleting partitions: %s", errors)
Comment on lines +398 to +400
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of errors the operator would fail? I am also surprised, what kind of error can be returned? Usually, if there is an error, boto3 returns an exception. Here, the call would be successfully but would return a list of errors?

self.log.info("Batch delete partitions complete.")
return errors
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had multiple different failures when running the system tests yo recently added. See #66758. Did you run the system test locally to be sure it is running successfully?

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datetime import datetime

from airflow.providers.amazon.aws.operators.glue_catalog import (
GlueCatalogBatchDeletePartitionOperator,
GlueCatalogCreateDatabaseOperator,
GlueCatalogCreatePartitionOperator,
GlueCatalogCreateTableOperator,
Expand Down Expand Up @@ -105,6 +106,16 @@
)
# [END howto_operator_glue_catalog_create_partition]

# [START howto_operator_glue_catalog_batch_delete_partition]
batch_delete_partition = GlueCatalogBatchDeletePartitionOperator(
task_id="batch_delete_partition",
database_name=db_name,
table_name=table_name,
partitions_to_delete=[{"Values": ["2024-01-01"]}],
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_glue_catalog_batch_delete_partition]

# [START howto_operator_glue_catalog_delete_table]
delete_table = GlueCatalogDeleteTableOperator(
task_id="delete_table",
Expand All @@ -119,6 +130,7 @@
create_database,
create_table,
create_partition,
batch_delete_partition,
delete_table,
delete_database,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.glue_catalog import (
GlueCatalogBatchDeletePartitionOperator,
GlueCatalogCreateDatabaseOperator,
GlueCatalogCreatePartitionOperator,
GlueCatalogCreateTableOperator,
Expand Down Expand Up @@ -264,3 +265,44 @@ def test_execute_skip_existing(self, mock_conn):

def test_template_fields(self):
validate_template_fields(self.operator)


PARTITIONS_TO_DELETE = [{"Values": ["2024-01-01"]}, {"Values": ["2024-01-02"]}]


class TestGlueCatalogBatchDeletePartitionOperator:
def setup_method(self):
self.operator = GlueCatalogBatchDeletePartitionOperator(
task_id="batch_delete_partition",
database_name=DB_NAME,
table_name=TABLE_NAME,
partitions_to_delete=PARTITIONS_TO_DELETE,
)

@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
def test_execute(self, mock_conn):
mock_client = mock.MagicMock()
mock_client.batch_delete_partition.return_value = {"Errors": []}
mock_conn.return_value = mock_client

result = self.operator.execute({})

mock_client.batch_delete_partition.assert_called_once_with(
DatabaseName=DB_NAME, TableName=TABLE_NAME, PartitionsToDelete=PARTITIONS_TO_DELETE
)
assert result == []

@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
def test_execute_with_errors(self, mock_conn):
mock_client = mock.MagicMock()
errors = [
{"PartitionValues": ["2024-01-01"], "ErrorDetail": {"ErrorCode": "EntityNotFoundException"}}
]
mock_client.batch_delete_partition.return_value = {"Errors": errors}
mock_conn.return_value = mock_client

result = self.operator.execute({})
assert result == errors

def test_template_fields(self):
validate_template_fields(self.operator)
Loading