HDDS-14009. EventNotification: Generate events according to S3 schema/strategy#10318
HDDS-14009. EventNotification: Generate events according to S3 schema/strategy#10318gardenia wants to merge 3 commits into
Conversation
…/strategy - OMEventListenerKafkaPublisher now emits events according to a S3 style event schema - Forked the S3EventNotification model from AWS SDK v1 to generate the serialized form (with some simplifications) while preserving original licensing and attribution. - Implemented 'ozoneEventData' extension bag using Map<String, Object> for non-standard S3 extensions - extension keys (isDirectory, isRecursive, etc.) are defined in OzoneEventDataKey Enum - Introduced a reflection-based OMEventListenerNotificationStrategy defaulting to S3 but overrideable in tests etc. - Added NoOpOMEventListenerNotificationStrategy as a safe fallback for configuration or instantiation failures. - Made Kafka service interval and timeout fully configurable using standard Hadoop time duration strings. - Implemented a graceful 10-second timeout for the Kafka producer during plugin shutdown to prevent resource leaks.
|
@ChenSammi this is the next part of the review. It is the last major piece apart from some loose ends needing revisited |
| /* copy of | ||
| * software.amazon.awssdk.eventnotifications.s3.model.S3EventNotification | ||
| * class taken from AWS SDK (1.x) with minor changes for build issues | ||
| * and removed usage of unnecessary AWS specific extension entiies: |
There was a problem hiding this comment.
Is it possible that we just import classes from their sdk?
There was a problem hiding this comment.
I explored using the SDK class originally, but ended up going with a fork for a few reasons:
- Classpath bloat: OM doesn't currently use the AWS SDK at
runtime. That class has internal dependencies on SDK utility classes
such as SdkHttpUtils and their own Jackson wrapper. Using it would force
us to pull in the wole SDK core and its transitive dependencies (Joda-Time, older HttpComponents, etc.),
which we want to avoid in the core server. - Extensibility: We need to add Ozone-specific metadata (like isDirectory or renameFromKey). The SDK class is a fixed contract—to add our custom fields, we’d have to use hacky Jackson wrappers or 'Mix-ins' to
override its behavior from the outside. - The SDK v1 class is hard-coded to Joda-Time. By forking, I was able to switch everything to native
OffsetDateTime, so we don't have to carry legacy date-library baggage in our pom.xml. - The SDK v2 version of the same class wasn't any better. It is more
rigid, uses immutable builders and is messy to add bespoke fields.
If there is a consensus of opinion that we need to avoid this somehow I am open to revisiting but those are the reasons I went with the fork.
There was a problem hiding this comment.
Thanks for the thorough explanation. make sense! Let's keep the fork.
| * com.amazonaws.services.s3.event.S3EventNotification which is part of | ||
| * AWS SDK 1.x | ||
| * | ||
| * NOTE: the above class is designed for deserialization which is why it |
There was a problem hiding this comment.
| * NOTE: the above class is designed for deserialization which is why it | |
| * NOTE: the above class is designed for serialization which is why it |
IIUC, this would be serialized to json string?
There was a problem hiding this comment.
My point with that comment is that the original AWS class was designed more for deserializing events from JSON rather than "building" them. Which is why I we added the custom builder. I will update the comment to be clearer about what I meant.
| } | ||
| } | ||
|
|
||
| public static class LifecycleEventDataEntity { |
There was a problem hiding this comment.
maybe we could keep this as lifecycle work is going to merge into main.
There was a problem hiding this comment.
ok I will retain LifecycleEventDataEntity for now.
| * This is a notification strategy to generate events according to S3 | ||
| * notification semantics. | ||
| */ | ||
| public class S3EventNotificationStrategy implements OMEventListenerNotificationStrategy { |
There was a problem hiding this comment.
| public class S3EventNotificationStrategy implements OMEventListenerNotificationStrategy { | |
| public class S3EventNotificationSerializer implements OMEventListenerNotificationSerializer { |
There was a problem hiding this comment.
My thinking with the "strategy" terminology was that the strategy is the class which you hand a OmCompletedRequestInfo entity and it emits one or more events based on the strategy e.g. S3 strategy could mean one thing, cloudevents could mean another thing. To me "serializer" would imply a more narrow purpose and lack the "decision making" connotation.
There was a problem hiding this comment.
Pull request overview
Adds a pluggable “notification strategy” layer to the OM Kafka event listener and introduces an S3-compatible event generation strategy (plus supporting S3 event model/serialization) so emitted Kafka messages follow the S3 event notification schema.
Changes:
- Introduces
OMEventListenerNotificationStrategyand wiresOMEventListenerKafkaPublisherto use a configurable strategy (defaulting to S3). - Implements
S3EventNotificationStrategywith S3-style event payload generation and adds supporting S3 event model/builder + date-time serializer. - Updates Kafka publisher unit tests to assert on structured S3 event fields (including ISO-8601
eventTime).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java | Updates tests to validate emitted payloads as S3 event notifications (structured JSON + eventTime format). |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java | New strategy that maps OM completed requests to S3-style event JSON strings. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java | Builder to create S3 event notification objects suitable for serialization. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java | Forked/copy-based S3 event notification model for JSON (de)serialization. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/package-info.java | Package docs for S3 event generation classes. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java | Custom serializer to emit ISO-8601 offset timestamps for S3 event fields. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java | New interface defining the strategy contract for event generation. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java | Uses the strategy to generate/send events; adds strategy + service interval/timeout configs; improves producer shutdown. |
| hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListenerNotificationStrategy.java | Fallback strategy that emits no events. |
| hadoop-ozone/ozone-manager-plugins/pom.xml | Adds Jackson dependencies needed by the new S3 event model/serialization. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| List<String> eventsToSend = notificationStrategy.determineEventsForOperation(completedRequestInfo); | ||
|
|
||
| LOG.debug("Sending {}", event); | ||
|
|
||
| try { | ||
| kafkaClient.send(event); | ||
| } catch (IOException ex) { | ||
| LOG.error("Failure to send event {}", event, ex); | ||
| return; | ||
| // loop over events and send them to our kafka sink | ||
| for (String event : eventsToSend) { | ||
| try { | ||
| kafkaClient.send(event); | ||
| } catch (IOException ex) { | ||
| LOG.error("Failure to send event {}", event, ex); | ||
| return; | ||
| } |
Please describe your PR in detail:
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14009
How was this patch tested?
unit tests