diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java index f53798f21038..b3a903157acd 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java @@ -28,7 +28,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.util.TestRunner; import org.junit.jupiter.api.BeforeAll; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; import java.time.Duration; @@ -37,7 +37,7 @@ public abstract class AbstractKafkaBaseIT { - protected static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.6"; // January 2026 + protected static final String IMAGE_NAME = System.getProperty("kafka.docker.image", "apache/kafka:4.2.0"); protected static final Integer MESSAGE_MAX_BYTES = 2097152; @@ -52,18 +52,24 @@ public abstract class AbstractKafkaBaseIT { protected static final Duration DURATION_POLL = Duration.ofSeconds(3); - protected static final ConfluentKafkaContainer kafkaContainer; + protected static final KafkaContainer kafkaContainer; // NIFI-11259 - single testcontainers Kafka instance needed for all module integration tests static { - kafkaContainer = new ConfluentKafkaContainer(DockerImageName.parse(IMAGE_NAME)) + kafkaContainer = new KafkaContainer(DockerImageName.parse(IMAGE_NAME)) .withEnv(getEnvironmentIntegration()); kafkaContainer.start(); } private static Map getEnvironmentIntegration() { return Map.of( - "KAFKA_MESSAGE_MAX_BYTES", Integer.toString(MESSAGE_MAX_BYTES) + "KAFKA_MESSAGE_MAX_BYTES", Integer.toString(MESSAGE_MAX_BYTES), + "KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer,share", + "KAFKA_GROUP_SHARE_ENABLE", "true", + "KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR", "1", + "KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1", + "KAFKA_GROUP_SHARE_MIN_RECORD_LOCK_DURATION_MS", "5000", + "KAFKA_GROUP_SHARE_RECORD_LOCK_DURATION_MS", "5000" ); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaShareGroupIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaShareGroupIT.java new file mode 100644 index 000000000000..59b99b310ccf --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaShareGroupIT.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; +import org.apache.kafka.clients.admin.ShareGroupDescription; +import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.kafka.processors.consumer.GroupType; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; +import org.apache.nifi.kafka.service.api.consumer.share.ShareAcknowledgementMode; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ConsumeKafkaShareGroupIT extends AbstractConsumeKafkaIT { + + private static final String RECORD_VALUE = "share-group-test-value"; + + private TestRunner runner; + + @BeforeEach + void setRunner() throws InitializationException { + runner = TestRunners.newTestRunner(ConsumeKafka.class); + addKafkaConnectionService(runner); + + runner.setProperty(ConsumeKafka.CONNECTION_SERVICE, CONNECTION_SERVICE_ID); + runner.setProperty(ConsumeKafka.GROUP_TYPE, GroupType.SHARE); + } + + private static final long ASSIGNMENT_TIMEOUT_MS = 60_000L; + + private static final long DELIVERY_TIMEOUT_MS = 60_000L; + + @Test + @Timeout(120) + void testShareGroupConsumesProducedRecord() throws ExecutionException, InterruptedException, TimeoutException { + final String topic = "share-group-topic-" + UUID.randomUUID(); + final String shareGroupId = "share-group-" + UUID.randomUUID(); + + runner.setProperty(ConsumeKafka.GROUP_ID, shareGroupId); + runner.setProperty(ConsumeKafka.TOPICS, topic); + runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + // The share-group starting position is fixed at the partition's high watermark when the + // partition is first assigned to the group (auto offset reset defaults to "latest"). Drive + // empty poll iterations until the broker reports a non-empty assignment, then produce, so + // the record's offset is not before the group's starting offset. + runner.run(1, false, true); + waitForShareGroupAssignment(shareGroupId, topic); + + produceOne(topic, null, null, RECORD_VALUE, Collections.emptyList()); + + waitForFlowFile(); + + runner.run(1, true, false); + + final Iterator flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).iterator(); + assertTrue(flowFiles.hasNext()); + + final MockFlowFile flowFile = flowFiles.next(); + flowFile.assertContentEquals(RECORD_VALUE); + flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, topic); + flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_PARTITION); + flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_OFFSET); + flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_TIMESTAMP); + } + + private void waitForFlowFile() throws TimeoutException { + final long deadline = System.currentTimeMillis() + DELIVERY_TIMEOUT_MS; + while (System.currentTimeMillis() < deadline) { + runner.run(1, false, false); + if (!runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).isEmpty()) { + return; + } + } + throw new TimeoutException("Did not receive a FlowFile from the share group within %d ms".formatted(DELIVERY_TIMEOUT_MS)); + } + + /** + * Drive {@link TestRunner#run} iterations against the share-group consumer until the broker + * reports the share group is ready to deliver records for {@code topic}. Two conditions must + * hold before producing the test record: + *
    + *
  1. The group coordinator has assigned the partition to a share-group member.
  2. + *
  3. The share-group state machine has recorded a starting offset for the partition, + * which only happens after the consumer's first ShareFetch lands on the leader.
  4. + *
+ * Without the second check there is a race where the test produces before the consumer's + * first fetch, the broker locks the share-group's starting offset to the post-produce + * high-watermark, and the produced record sits below the starting offset and is never + * delivered. The race is rare locally but more likely on slow CI runners. + */ + private void waitForShareGroupAssignment(final String shareGroupId, final String topic) throws TimeoutException { + final Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + try (Admin admin = Admin.create(adminProperties)) { + final long deadline = System.currentTimeMillis() + ASSIGNMENT_TIMEOUT_MS; + boolean assigned = false; + while (System.currentTimeMillis() < deadline) { + runner.run(1, false, false); + if (!assigned) { + assigned = hasShareGroupAssignment(admin, shareGroupId, topic); + } + if (assigned && hasShareGroupStartingOffset(admin, shareGroupId, topic)) { + return; + } + } + } + throw new TimeoutException("Share group %s never became ready to deliver records for topic %s".formatted(shareGroupId, topic)); + } + + private boolean hasShareGroupAssignment(final Admin admin, final String shareGroupId, final String topic) { + try { + final Map descriptions = admin.describeShareGroups(Collections.singletonList(shareGroupId)).all().get(); + final ShareGroupDescription description = descriptions.get(shareGroupId); + if (description == null) { + return false; + } + for (final ShareMemberDescription member : description.members()) { + if (member.assignment().topicPartitions().stream().anyMatch(tp -> topic.equals(tp.topic()))) { + return true; + } + } + return false; + } catch (final Exception ignored) { + return false; + } + } + + private boolean hasShareGroupStartingOffset(final Admin admin, final String shareGroupId, final String topic) { + try { + final Map offsets = admin + .listShareGroupOffsets(Collections.singletonMap(shareGroupId, new ListShareGroupOffsetsSpec())) + .partitionsToOffsetInfo(shareGroupId).get(); + return offsets != null && offsets.keySet().stream().anyMatch(tp -> topic.equals(tp.topic())); + } catch (final Exception ignored) { + return false; + } + } + + @Test + @Timeout(120) + void testShareGroupConsumesProducedRecordWithImplicitAcknowledgement() throws ExecutionException, InterruptedException, TimeoutException { + final String topic = "share-group-implicit-topic-" + UUID.randomUUID(); + final String shareGroupId = "share-group-implicit-" + UUID.randomUUID(); + + runner.setProperty(ConsumeKafka.GROUP_ID, shareGroupId); + runner.setProperty(ConsumeKafka.TOPICS, topic); + runner.setProperty(ConsumeKafka.ACKNOWLEDGEMENT_MODE, ShareAcknowledgementMode.IMPLICIT.getValue()); + runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + runner.run(1, false, true); + waitForShareGroupAssignment(shareGroupId, topic); + + produceOne(topic, null, null, RECORD_VALUE, Collections.emptyList()); + + waitForFlowFile(); + runner.run(1, true, false); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).getFirst(); + flowFile.assertContentEquals(RECORD_VALUE); + flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, topic); + } + + @Test + @Timeout(60) + void testShareGroupVerifySucceeds() throws InitializationException { + final String topic = "share-group-verify-" + UUID.randomUUID(); + final String shareGroupId = "share-group-verify-" + UUID.randomUUID(); + + runner.setProperty(ConsumeKafka.GROUP_ID, shareGroupId); + runner.setProperty(ConsumeKafka.TOPICS, topic); + runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + final ConsumeKafka processor = (ConsumeKafka) runner.getProcessor(); + final List results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertEquals(1, results.size()); + + final ConfigVerificationResult result = results.getFirst(); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, result.getOutcome()); + assertNotNull(result.getExplanation()); + assertTrue(result.getExplanation().contains(shareGroupId)); + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaAuthSaslPlaintextIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaAuthSaslPlaintextIT.java index de28ed5593a4..e160530b4f77 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaAuthSaslPlaintextIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaAuthSaslPlaintextIT.java @@ -34,11 +34,10 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; import java.util.Collections; @@ -51,7 +50,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -@Disabled("circle back to this") @TestMethodOrder(MethodOrderer.MethodName.class) public class PublishKafkaAuthSaslPlaintextIT { private static final String SERVICE_ID = Kafka3ConnectionService.class.getSimpleName(); @@ -61,28 +59,30 @@ public class PublishKafkaAuthSaslPlaintextIT { private static final String USERNAME = "nifi"; private static final String PASSWORD = UUID.randomUUID().toString(); - private static ConfluentKafkaContainer kafka; + private static KafkaContainer kafka; @BeforeAll static void beforeAll() { - kafka = new ConfluentKafkaContainer(DockerImageName.parse(AbstractKafkaBaseIT.IMAGE_NAME)) + kafka = new KafkaContainer(DockerImageName.parse(AbstractKafkaBaseIT.IMAGE_NAME)) .withEnv(getEnvironmentSaslPlaintext()); kafka.start(); } /** * Environment to be provided to docker container to enable SASL authentication. - *

- * Disable this test for now: - *

+ * + *

The testcontainers {@link KafkaContainer} for the apache/kafka image runs Kafka in KRaft + * mode with three listeners: the externally-mapped {@code PLAINTEXT} listener that + * {@link KafkaContainer#getBootstrapServers()} exposes, an internal {@code BROKER} listener for + * intra-cluster traffic, and a {@code CONTROLLER} listener for the KRaft controller quorum. + * The protocol map below remaps the externally-mapped listener to {@code SASL_PLAINTEXT} while + * keeping {@code BROKER} and {@code CONTROLLER} on PLAINTEXT so the KRaft bootstrap succeeds.

*/ private static Map getEnvironmentSaslPlaintext() { final Map environment = new LinkedHashMap<>(); - environment.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT"); + environment.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT"); environment.put("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN"); + environment.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN"); environment.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", String.format( "%s required user_%s=\"%s\";", PlainLoginModule.class.getName(), USERNAME, PASSWORD)); return environment; diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java index 9a50eb6060c9..f74a0244ae7e 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.connector.components.ConnectorMethod; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kafka.processors.common.KafkaUtils; +import org.apache.nifi.kafka.processors.consumer.GroupType; import org.apache.nifi.kafka.processors.consumer.OffsetTracker; import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler; @@ -48,6 +49,10 @@ import org.apache.nifi.kafka.service.api.consumer.PollingContext; import org.apache.nifi.kafka.service.api.consumer.RebalanceCallback; import org.apache.nifi.kafka.service.api.consumer.SessionContext; +import org.apache.nifi.kafka.service.api.consumer.share.Acknowledgement; +import org.apache.nifi.kafka.service.api.consumer.share.KafkaShareConsumerService; +import org.apache.nifi.kafka.service.api.consumer.share.ShareAcknowledgementMode; +import org.apache.nifi.kafka.service.api.consumer.share.ShareGroupContext; import org.apache.nifi.kafka.service.api.record.ByteRecord; import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; import org.apache.nifi.kafka.shared.property.KeyEncoding; @@ -89,6 +94,9 @@ @CapabilityDescription("Consumes messages from Apache Kafka Consumer API. " + "The complementary NiFi processor for sending messages is PublishKafka. The Processor supports consumption of Kafka messages, optionally interpreted as NiFi records. " + + "By default the Processor uses a classic consumer group, which assigns whole partitions to consumers. " + + "When configured for 'Share Group' the Processor uses Kafka share groups (KIP-932, requires Kafka 4.2+ brokers configured for share-group operation), " + + "which distribute records cooperatively across the consumers of a share group with per-record acknowledgement instead of per-partition offset commits. " + "Please note that, at this time (in read record mode), the Processor assumes that " + "all records that are retrieved from a given partition have the same schema. For this mode, if any of the Kafka messages are pulled but cannot be parsed or written with the " + "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " @@ -125,20 +133,50 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess .required(true) .build(); + static final PropertyDescriptor GROUP_TYPE = new PropertyDescriptor.Builder() + .name("Group Type") + .description("Selects the Kafka consumer group model. Choose 'Consumer Group' for the classic offset-committed model used by NiFi historically. " + + "Choose 'Share Group' to use Kafka share groups (KIP-932), which require Kafka 4.1+ brokers configured for share-group operation and " + + "deliver records cooperatively with per-record acknowledgement instead of per-partition offset commits. " + + "When 'Share Group' is selected the [Acknowledgement Mode] property controls how records are acknowledged to the broker.") + .required(true) + .allowableValues(GroupType.class) + .defaultValue(GroupType.CONSUMER) + .expressionLanguageSupported(NONE) + .build(); + static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() .name("Group ID") - .description("Kafka Consumer Group Identifier corresponding to Kafka group.id property") + .description("Kafka group identifier. For 'Consumer Group' this corresponds to the Kafka group.id property. " + + "For 'Share Group' this is the share-group identifier used by the broker to track per-record acknowledgements.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(NONE) .build(); + static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder() + .name("Acknowledgement Mode") + .description("Controls how share-group records are acknowledged to the broker. " + + "'Explicit' (the default) acknowledges every delivered record individually so a session rollback can release records back to the share group " + + "for immediate redelivery to another consumer. " + + "'Implicit' relies on the broker's default behavior of accepting all delivered records on the next poll or commit; " + + "in this mode a session rollback cannot actively release records, and they only become eligible for redelivery once the broker's " + + "acquisition lock expires (controlled by the broker-level 'group.share.record.lock.duration.ms' configuration).") + .required(true) + .allowableValues(ShareAcknowledgementMode.class) + .defaultValue(ShareAcknowledgementMode.EXPLICIT) + .dependsOn(GROUP_TYPE, GroupType.SHARE) + .expressionLanguageSupported(NONE) + .build(); + static final PropertyDescriptor TOPIC_FORMAT = new PropertyDescriptor.Builder() .name("Topic Format") - .description("Specifies whether the Topics provided are a comma separated list of names or a single regular expression") + .description("Specifies whether the Topics provided are a comma separated list of names or a single regular expression. " + + "Pattern subscription is not available for share groups in Kafka 4.1.") .required(true) .allowableValues(TOPIC_NAME, TOPIC_PATTERN) .defaultValue(TOPIC_NAME) + .dependsOn(GROUP_TYPE, GroupType.CONSUMER) .build(); static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() @@ -152,21 +190,25 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() .name("auto.offset.reset") .displayName("Auto Offset Reset") - .description("Automatic offset configuration applied when no previous consumer offset found corresponding to Kafka auto.offset.reset property") + .description("Automatic offset configuration applied when no previous consumer offset found corresponding to Kafka auto.offset.reset property. " + + "Not applicable to share groups, which manage starting positions at the group level via Kafka admin tools.") .required(true) .allowableValues(AutoOffsetReset.class) .defaultValue(AutoOffsetReset.LATEST) .expressionLanguageSupported(NONE) + .dependsOn(GROUP_TYPE, GroupType.CONSUMER) .build(); static final PropertyDescriptor COMMIT_OFFSETS = new PropertyDescriptor.Builder() .name("Commit Offsets") .description("Specifies whether this Processor should commit the offsets to Kafka after receiving messages. Typically, this value should be set to true " + "so that messages that are received are not duplicated. However, in certain scenarios, we may want to avoid committing the offsets, that the data can be " + - "processed and later acknowledged by PublishKafka in order to provide Exactly Once semantics.") + "processed and later acknowledged by PublishKafka in order to provide Exactly Once semantics. " + + "Not applicable to share groups, which acknowledge per record rather than committing offsets.") .required(true) .allowableValues("true", "false") .defaultValue("true") + .dependsOn(GROUP_TYPE, GroupType.CONSUMER) .build(); static final PropertyDescriptor MAX_UNCOMMITTED_SIZE = new PropertyDescriptor.Builder() @@ -179,6 +221,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess ) .required(false) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .dependsOn(GROUP_TYPE, GroupType.CONSUMER) .build(); static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder() @@ -314,7 +357,9 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess private static final List PROPERTY_DESCRIPTORS = List.of( CONNECTION_SERVICE, + GROUP_TYPE, GROUP_ID, + ACKNOWLEDGEMENT_MODE, TOPIC_FORMAT, TOPICS, AUTO_OFFSET_RESET, @@ -348,12 +393,16 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess private volatile boolean commitOffsets; private volatile boolean useReader; private volatile String brokerUri; + private volatile GroupType groupType; private volatile PollingContext pollingContext; + private volatile ShareGroupContext shareGroupContext; + private volatile ShareAcknowledgementMode shareAcknowledgementMode; private volatile int maxConsumerCount; private volatile boolean maxUncommittedSizeConfigured; private volatile long maxUncommittedSize; private final Queue consumerServices = new LinkedBlockingQueue<>(); + private final Queue shareConsumerServices = new LinkedBlockingQueue<>(); private final AtomicInteger activeConsumerCount = new AtomicInteger(); @Override @@ -375,7 +424,16 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String @OnScheduled public void onScheduled(final ProcessContext context) { - pollingContext = createPollingContext(context); + groupType = context.getProperty(GROUP_TYPE).asAllowableValue(GroupType.class); + if (groupType == GroupType.SHARE) { + shareAcknowledgementMode = context.getProperty(ACKNOWLEDGEMENT_MODE).asAllowableValue(ShareAcknowledgementMode.class); + shareGroupContext = createShareGroupContext(context, shareAcknowledgementMode); + pollingContext = null; + } else { + pollingContext = createPollingContext(context); + shareGroupContext = null; + shareAcknowledgementMode = null; + } headerEncoding = Charset.forName(context.getProperty(HEADER_ENCODING).getValue()); final String headerNamePatternProperty = context.getProperty(HEADER_NAME_PATTERN).getValue(); @@ -386,7 +444,7 @@ public void onScheduled(final ProcessContext context) { } keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).asAllowableValue(KeyEncoding.class); - commitOffsets = context.getProperty(COMMIT_OFFSETS).asBoolean(); + commitOffsets = groupType == GroupType.SHARE || context.getProperty(COMMIT_OFFSETS).asBoolean(); processingStrategy = context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class); // Only read HEADER_NAME_PREFIX when PROCESSING_STRATEGY is FLOW_FILE (property dependency) @@ -401,25 +459,37 @@ public void onScheduled(final ProcessContext context) { maxConsumerCount = context.getMaxConcurrentTasks(); activeConsumerCount.set(0); - final PropertyValue maxUncommittedSizeProperty = context.getProperty(MAX_UNCOMMITTED_SIZE); - maxUncommittedSizeConfigured = maxUncommittedSizeProperty.isSet(); - if (maxUncommittedSizeConfigured) { - maxUncommittedSize = maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue(); + if (groupType == GroupType.CONSUMER) { + final PropertyValue maxUncommittedSizeProperty = context.getProperty(MAX_UNCOMMITTED_SIZE); + maxUncommittedSizeConfigured = maxUncommittedSizeProperty.isSet(); + if (maxUncommittedSizeConfigured) { + maxUncommittedSize = maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue(); + } + } else { + maxUncommittedSizeConfigured = false; } } @OnStopped public void onStopped() { - // Ensure that we close all Producer services when stopped KafkaConsumerService service; - while ((service = consumerServices.poll()) != null) { close(service, "Processor stopped"); } + + KafkaShareConsumerService shareService; + while ((shareService = shareConsumerServices.poll()) != null) { + closeShareConsumer(shareService, "Processor stopped"); + } } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { + if (groupType == GroupType.SHARE) { + triggerShareGroup(context, session); + return; + } + final KafkaConsumerService consumerService = getConsumerService(context); if (consumerService == null) { getLogger().debug("No Kafka Consumer Service available; will yield and return immediately"); @@ -581,6 +651,36 @@ public List verify(final ProcessContext context, final final List verificationResults = new ArrayList<>(); final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); + final GroupType verifyGroupType = context.getProperty(GROUP_TYPE).asAllowableValue(GroupType.class); + + if (verifyGroupType == GroupType.SHARE) { + // Verification always samples-and-RELEASEs records, which requires EXPLICIT acknowledgement + // regardless of the user-selected acknowledgement mode for runtime processing. + final ShareGroupContext verifyShareContext = createShareGroupContext(context, ShareAcknowledgementMode.EXPLICIT); + try (final KafkaShareConsumerService shareConsumerService = connectionService.getShareConsumerService(verifyShareContext)) { + verificationResults.add(verifyShareGroup(shareConsumerService, verifyShareContext)); + } catch (final UnsupportedOperationException e) { + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Verify Share Group Subscription") + .outcome(Outcome.FAILED) + .explanation("Configured Kafka Connection Service does not support share groups: " + e.getMessage()) + .build()); + } catch (final IOException e) { + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Communicate with Kafka Broker") + .outcome(Outcome.FAILED) + .explanation("There was an I/O failure when communicating with Kafka: " + e) + .build()); + } catch (final RuntimeException e) { + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Verify Share Group Subscription") + .outcome(Outcome.FAILED) + .explanation("Failed to verify the share group subscription against the broker. The broker may not support Kafka share groups or share-group operation may be disabled: " + e) + .build()); + } + return verificationResults; + } + final PollingContext pollingContext = createPollingContext(context, null, AutoOffsetReset.EARLIEST); try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { final ConfigVerificationResult partitionVerification = verifyPartitions(consumerService, pollingContext); @@ -599,6 +699,37 @@ public List verify(final ProcessContext context, final return verificationResults; } + private ConfigVerificationResult verifyShareGroup(final KafkaShareConsumerService shareConsumerService, final ShareGroupContext shareContext) { + final ConfigVerificationResult.Builder builder = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Share Group Subscription"); + + try { + final Iterable records = shareConsumerService.poll(Duration.ofSeconds(5)); + int sampled = 0; + for (final ByteRecord byteRecord : records) { + sampled++; + shareConsumerService.acknowledge(byteRecord, Acknowledgement.RELEASE); + } + // Always commit pending acknowledgements; if no records, this is a no-op. + shareConsumerService.commit(); + + if (sampled == 0) { + builder.outcome(Outcome.SUCCESSFUL).explanation( + "Successfully subscribed to topics %s for share group [%s]. No records were available within the verification window; " + .formatted(shareContext.getTopics(), shareContext.getGroupId()) + + "ensure the share group's starting offset has been set via Kafka admin tools (kafka-share-groups.sh) if records are expected."); + } else { + builder.outcome(Outcome.SUCCESSFUL).explanation( + "Successfully subscribed to topics %s for share group [%s] and sampled [%d] records (released back to the share group)." + .formatted(shareContext.getTopics(), shareContext.getGroupId(), sampled)); + } + } catch (final Exception e) { + builder.outcome(Outcome.FAILED).explanation("Share group subscription failed: " + e); + } + + return builder.build(); + } + private ConfigVerificationResult verifyPartitions(final KafkaConsumerService consumerService, final PollingContext pollingContext) { final ConfigVerificationResult.Builder partitionVerification = new ConfigVerificationResult.Builder() .verificationStepName("Verify Topic Partitions"); @@ -672,6 +803,28 @@ private ConfigVerificationResult verifyCanParse(final ProcessContext context, fi ) public List sampleTopics(final ProcessContext context) throws IOException { final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); + final GroupType sampleGroupType = context.getProperty(GROUP_TYPE).asAllowableValue(GroupType.class); + + if (sampleGroupType == GroupType.SHARE) { + // Sampling RELEASEs records back to the share group so they remain available to real consumers, + // which requires EXPLICIT acknowledgement regardless of the user-selected acknowledgement mode. + final ShareGroupContext sampleShareContext = createShareGroupContext(context, ShareAcknowledgementMode.EXPLICIT); + try (final KafkaShareConsumerService shareConsumerService = connectionService.getShareConsumerService(sampleShareContext)) { + final Iterable records = shareConsumerService.poll(Duration.ofSeconds(60)); + final List samples = new ArrayList<>(); + for (final ByteRecord record : records) { + samples.add(record.getValue()); + // Release sampled records so they remain available to real consumers + shareConsumerService.acknowledge(record, Acknowledgement.RELEASE); + if (samples.size() >= 10) { + break; + } + } + shareConsumerService.commit(); + return samples; + } + } + final PollingContext pollingContext = createPollingContext(context, "nifi-validation-" + System.currentTimeMillis(), AutoOffsetReset.EARLIEST); try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { final Iterable records = consumerService.poll(Duration.ofSeconds(60)); @@ -812,6 +965,136 @@ private PollingContext createPollingContext(final ProcessContext context) { return createPollingContext(context, groupId, autoOffsetReset); } + private ShareGroupContext createShareGroupContext(final ProcessContext context, final ShareAcknowledgementMode acknowledgementMode) { + final String groupId = context.getProperty(GROUP_ID).getValue(); + final String topics = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue(); + final Collection topicList = KafkaUtils.toTopicList(topics); + return new ShareGroupContext(groupId, topicList, acknowledgementMode); + } + + private void triggerShareGroup(final ProcessContext context, final ProcessSession session) { + final KafkaShareConsumerService shareConsumerService = getShareConsumerService(context); + if (shareConsumerService == null) { + getLogger().debug("No Kafka Share Consumer Service available; will yield and return immediately"); + context.yield(); + return; + } + + final long maxUncommittedMillis = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + final OffsetTracker offsetTracker = new OffsetTracker(); + + // The share consumer requires every in-flight record to be acknowledged (explicitly or implicitly) + // before the next poll. Acknowledgement happens in the session-commit callback below, after + // FlowFiles are durably committed. To respect this contract we perform a single poll per trigger; + // the poll itself blocks up to Max Uncommitted Time waiting for records, so a retry loop is + // unnecessary and would violate the share consumer contract by polling again before the + // previous batch has been acknowledged. + try { + final Iterator consumerRecords = shareConsumerService.poll(Duration.ofMillis(maxUncommittedMillis)).iterator(); + if (!consumerRecords.hasNext()) { + getLogger().trace("No Kafka share-group records consumed for {}; re-queuing share consumer", shareGroupContext); + shareConsumerServices.offer(shareConsumerService); + return; + } + + processConsumerRecords(context, session, offsetTracker, consumerRecords); + + session.commitAsync( + () -> commitShareConsumer(shareConsumerService, offsetTracker, session), + throwable -> { + getLogger().error("Failed to commit session for share group; will roll back any uncommitted records", throwable); + rollbackShareConsumer(shareConsumerService, offsetTracker, session); + context.yield(); + }); + } catch (final Exception e) { + getLogger().error("Failed to consume Kafka share-group records", e); + // Best-effort rollback so EXPLICIT-mode consumers can release records immediately; + // in IMPLICIT mode this is a no-op and we rely on the close+lock-expiry below for redelivery. + if (!shareConsumerService.isClosed()) { + try { + shareConsumerService.rollback(); + } catch (final Exception rollbackException) { + getLogger().warn("Failed to release records back to the share group", rollbackException); + } + } + closeShareConsumer(shareConsumerService, "Encountered Exception while consuming or writing out Kafka share-group records"); + context.yield(); + session.rollback(); + } + } + + private void commitShareConsumer(final KafkaShareConsumerService shareConsumerService, final OffsetTracker offsetTracker, final ProcessSession session) { + try { + shareConsumerService.commit(); + + offsetTracker.getRecordCounts().forEach((topic, count) -> session.adjustCounter("Records Acknowledged for " + topic, count, true)); + + shareConsumerServices.offer(shareConsumerService); + getLogger().debug("Committed acknowledgements for Kafka Share Consumer Service"); + } catch (final Exception e) { + getLogger().error("Failed to commit acknowledgements for Kafka Share Consumer Service; will release records back to the share group", e); + rollbackShareConsumer(shareConsumerService, offsetTracker, session); + } + } + + private void rollbackShareConsumer(final KafkaShareConsumerService shareConsumerService, final OffsetTracker offsetTracker, final ProcessSession session) { + if (shareConsumerService.isClosed()) { + return; + } + + try { + shareConsumerService.rollback(); + if (shareAcknowledgementMode == ShareAcknowledgementMode.EXPLICIT) { + // EXPLICIT-mode rollback actively RELEASEs records to the broker, so the consumer can be re-pooled. + shareConsumerServices.offer(shareConsumerService); + getLogger().debug("Released records back to the share group for Kafka Share Consumer Service"); + } else { + // IMPLICIT-mode rollback cannot actively release records. Close the consumer so the broker's + // acquisition lock expires and the records become eligible for redelivery to another consumer. + closeShareConsumer(shareConsumerService, "IMPLICIT-mode session rollback - awaiting broker acquisition lock expiry"); + } + } catch (final Exception e) { + getLogger().warn("Failed to release records back to the share group", e); + closeShareConsumer(shareConsumerService, "Failed to release records back to the share group"); + } + + offsetTracker.getRecordCounts().forEach((topic, count) -> session.adjustCounter("Records Released for " + topic, count, true)); + } + + private KafkaShareConsumerService getShareConsumerService(final ProcessContext context) { + final KafkaShareConsumerService shareConsumerService = shareConsumerServices.poll(); + if (shareConsumerService != null) { + return shareConsumerService; + } + + final int activeCount = activeConsumerCount.incrementAndGet(); + if (activeCount > getMaxConsumerCount()) { + getLogger().trace("No Kafka Share Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); + activeConsumerCount.decrementAndGet(); + return null; + } + + getLogger().info("No Kafka Share Consumer Service available; creating a new one. Active count: {}", activeCount); + final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); + return connectionService.getShareConsumerService(shareGroupContext); + } + + private void closeShareConsumer(final KafkaShareConsumerService shareConsumerService, final String reason) { + if (shareConsumerService.isClosed()) { + getLogger().debug("Asked to close Kafka Share Consumer Service but consumer already closed"); + return; + } + + getLogger().info("Closing Kafka Share Consumer due to: {}", reason); + + try { + shareConsumerService.close(); + activeConsumerCount.decrementAndGet(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close Kafka Share Consumer Service", ioe); + } + } + private PollingContext createPollingContext(final ProcessContext context, final String groupId, final AutoOffsetReset autoOffsetReset) { final String topics = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue(); final String topicFormat = context.getProperty(TOPIC_FORMAT).getValue(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/GroupType.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/GroupType.java new file mode 100644 index 000000000000..88426509fe76 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/GroupType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.consumer; + +import org.apache.nifi.components.DescribedValue; + +/** + * Selects the Kafka consumer group model used by ConsumeKafka. + */ +public enum GroupType implements DescribedValue { + CONSUMER( + "Consumer Group", + "Use a classic Kafka consumer group. Records are partition-bound: each partition is assigned to one consumer in the group at a time, " + + "offset positioning is committed per partition, and parallelism is capped by the number of partitions in the subscribed topics." + ), + + SHARE( + "Share Group", + "Use a Kafka share group (KIP-932). Records are distributed cooperatively across the consumers of the share group with per-record acknowledgement, " + + "and parallelism is no longer capped by the number of partitions. Requires Kafka 4.1+ brokers configured for share groups." + ); + + private final String displayName; + private final String description; + + GroupType(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.kafka.processors.ConsumeKafka/additionalDetails.md b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.kafka.processors.ConsumeKafka/additionalDetails.md index e454c53f0276..ac9f54b60069 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.kafka.processors.ConsumeKafka/additionalDetails.md +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.kafka.processors.ConsumeKafka/additionalDetails.md @@ -15,6 +15,72 @@ # ConsumeKafka +### Group Type + +The `Group Type` property selects the Kafka consumer group model. + +- `Consumer Group` (the default) uses Kafka's classic consumer-group protocol. Records are bound to partitions: each partition is assigned to one consumer at a time, position is tracked per partition by committed offset, and parallelism (the number of concurrent tasks that can effectively consume) is capped by the number of partitions on the subscribed topics. This is the historical behavior of `ConsumeKafka` and the only option available against Kafka 3.x or 4.0 brokers. +- `Share Group` uses Kafka share groups (KIP-932). Records are distributed cooperatively across the consumers of a share group with per-record acknowledgement instead of per-partition offset commits. Parallelism is no longer capped by the number of partitions, at the cost of weaker per-partition ordering and the loss of the exactly-once handoff pattern with `PublishKafka`. + +#### Upgrade compatibility + +Existing `ConsumeKafka` flows from earlier NiFi releases are unchanged after upgrading. The new `Group Type` property defaults to `Consumer Group`, so any flow definition that omits the property continues to use the classic consumer-group code path with the same semantics it had before. The classic-group properties (`Topic Format`, `Auto Offset Reset`, `Commit Offsets`) remain at their previously persisted values; they are simply hidden from the UI when `Group Type` is switched to `Share Group`. + +#### Share Group prerequisites + +Share groups require the following on the broker side: + +- All brokers running Apache Kafka **4.2 or later** (recommended). Kafka 4.2 is the first release in which share groups are Generally Available and the version that matches the Kafka client bundled with NiFi. +- Kafka **4.1** brokers are also supported, but share groups are still in preview in that release. +- Kafka 4.0 share-group early-access clients are **not wire-compatible** with 4.1+ brokers; running 4.0 brokers with this processor is not supported. +- The share rebalance protocol enabled in `group.coordinator.rebalance.protocols` (for example `classic,consumer,share`). +- The share coordinator state topic available, with replication and ISR settings appropriate for the cluster (`share.coordinator.state.topic.replication.factor`, `share.coordinator.state.topic.min.isr`). + +The Kafka client used by NiFi exposes the share consumer (`org.apache.kafka.clients.consumer.KafkaShareConsumer`) under the `@Evolving` interface stability annotation. APIs and behavior may change in incompatible ways between minor Kafka releases. + +#### Setting the starting position for a new share group + +Share groups do **not** honor the `auto.offset.reset` consumer property. The starting position for a brand-new share group is governed by the broker-level `share.auto.offset.reset` configuration and is managed out of band via Kafka administrative tooling. Before starting the processor for the first time, set the share group's starting offset using either: + +- The `kafka-share-groups.sh --reset-offsets` script that ships with Kafka, or +- The Java Admin API (`Admin.alterShareGroupOffsets`). + +If the share group has no recorded starting position when the processor first runs, only records produced after the consumer subscribes are eligible for delivery (the broker default is `latest`). The processor's verification step surfaces a hint in this case. + +#### Differences from the Consumer Group path + +The following classic-group properties are hidden when `Group Type` is `Share Group`, since they have no analogue: + +- `Topic Format` — the Kafka share consumer accepts only an explicit topic-name list. Pattern subscription is not part of the share-consumer API. +- `Auto Offset Reset` — share groups manage starting position at the group level (see above). +- `Commit Offsets` — share groups acknowledge per record rather than committing offsets, so the exactly-once handoff with `PublishKafka` (which depends on deferring offset commits) is not available. +- `Max Uncommitted Size` — the share consumer issues a single `poll()` per processor trigger and acknowledges the full batch atomically, so a size cap cannot be applied incrementally across polls the way it is in classic mode. The size of each batch is therefore controlled at the broker level by `group.share.max.fetch.bytes` rather than from the processor. + +Other behavioral differences worth noting: + +- A session commit acknowledges all records consumed in the session as `ACCEPT`, including records that were routed to the `parse failure` relationship. To redeliver records on failure, route the `parse failure` relationship to a downstream error-handling subflow rather than relying on broker-side redelivery. +- A session rollback (for example when downstream FlowFile commit fails) releases all records consumed in the session. The release semantics depend on the configured `Acknowledgement Mode` — see below. +- Verifying or sampling a share-group configuration consumes records briefly and releases them back to the share group (`RELEASE` acknowledgement) so they remain available to real consumers, regardless of the configured `Acknowledgement Mode`. + +#### Acknowledgement Mode + +When `Group Type` is `Share Group`, the `Acknowledgement Mode` property controls how records are acknowledged to the broker: + +- `Explicit` (the default) acknowledges every delivered record individually. On a successful session commit each record is acknowledged as `ACCEPT`. On a session rollback the records are acknowledged as `RELEASE` and become **immediately** eligible for redelivery to any consumer in the share group. This is the recommended mode for typical NiFi workloads because it keeps end-to-end redelivery latency low when downstream processing fails. +- `Implicit` lets the broker treat all delivered records as `ACCEPT` on the next poll or commit. Per-record acknowledgement is not permitted in this mode, so a session rollback cannot actively release records back to the broker. Released records only become eligible for redelivery once the broker's record-acquisition lock expires (broker-level `group.share.record.lock.duration.ms`, default `30000`). The processor closes the share consumer on rollback to avoid re-using a session that still has unacknowledged records pending; the next NiFi trigger creates a fresh consumer. + +Choose `Explicit` unless you specifically need the broker-side accept-everything behavior of `Implicit` (for example, to trade redelivery latency for slightly lower client overhead in steady-state success cases). + +#### Broker-side delivery and retry + +Share groups apply a broker-side retry envelope that is independent of any NiFi-side retry policy: + +- Each delivery of a record increments a broker-tracked delivery count. +- After `group.share.delivery.attempt.limit` deliveries (broker default `5`, configurable per-group), the broker moves the record to the *archived* state and stops delivering it to any consumer. This is the share-group equivalent of a poison-message dead-letter; archived records are visible via Kafka admin tooling but are no longer delivered. +- Records held by a consumer that does not acknowledge them within `group.share.record.lock.duration.ms` are automatically released by the broker for redelivery, even if the consumer has not invoked `release()` itself. This is also the redelivery mechanism `ConsumeKafka` relies on in `Implicit` mode after a session rollback. + +For workloads where NiFi-side error handling is preferred over broker-side retry, route the `parse failure` relationship to a downstream subflow (for example, a `PutKafka`-based dead-letter topic). + ### Output Strategies This processor offers multiple output strategies (configured via processor property 'Output Strategy') for converting Kafka records into FlowFiles. diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java index 14c50d45babe..aa0de2e7672a 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java @@ -18,28 +18,45 @@ import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.kafka.processors.consumer.GroupType; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; import org.apache.nifi.kafka.service.api.KafkaConnectionService; import org.apache.nifi.kafka.service.api.common.PartitionState; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.share.KafkaShareConsumerService; +import org.apache.nifi.kafka.service.api.consumer.share.ShareAcknowledgementMode; +import org.apache.nifi.kafka.service.api.consumer.share.ShareGroupContext; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.Duration; import java.util.Collections; import java.util.List; +import static org.apache.nifi.kafka.processors.ConsumeKafka.ACKNOWLEDGEMENT_MODE; import static org.apache.nifi.kafka.processors.ConsumeKafka.CONNECTION_SERVICE; import static org.apache.nifi.kafka.processors.ConsumeKafka.GROUP_ID; +import static org.apache.nifi.kafka.processors.ConsumeKafka.GROUP_TYPE; +import static org.apache.nifi.kafka.processors.ConsumeKafka.PROCESSING_STRATEGY; import static org.apache.nifi.kafka.processors.ConsumeKafka.TOPICS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -64,6 +81,9 @@ class ConsumeKafkaTest { @Mock KafkaConsumerService kafkaConsumerService; + @Mock + KafkaShareConsumerService kafkaShareConsumerService; + private TestRunner runner; private ConsumeKafka processor; @@ -141,6 +161,225 @@ public void testDynamicProperties() throws InitializationException { runner.enableControllerService(kafkaConnectionService); } + @Test + public void testShareGroupSelectionIsValidWithoutClassicProperties() throws InitializationException { + setConnectionService(); + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + + // Auto Offset Reset, Topic Format, and Commit Offsets depend on GROUP_TYPE = CONSUMER and + // are intentionally not required when SHARE is selected. Configuration must still be valid. + runner.assertValid(); + } + + @Test + public void testShareGroupVerifySuccessful() throws InitializationException { + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + + final List results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertEquals(1, results.size()); + + final ConfigVerificationResult result = results.getFirst(); + assertEquals(Outcome.SUCCESSFUL, result.getOutcome()); + assertNotNull(result.getExplanation()); + assertTrue(result.getExplanation().contains(CONSUMER_GROUP_ID)); + } + + @Test + public void testShareGroupVerifySuccessfulSamplesAreReleased() throws InitializationException { + final ByteRecord sample = new ByteRecord(TEST_TOPIC_NAME, FIRST_PARTITION, 0L, 0L, Collections.emptyList(), null, "value".getBytes(), 1); + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(List.of(sample)); + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + + final List results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + final ConfigVerificationResult result = results.getFirst(); + assertEquals(Outcome.SUCCESSFUL, result.getOutcome()); + + verify(kafkaShareConsumerService).acknowledge(any(ByteRecord.class), any()); + verify(kafkaShareConsumerService).commit(); + } + + @Test + public void testShareGroupVerifyUnsupportedConnectionService() throws InitializationException { + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))) + .thenThrow(new UnsupportedOperationException("Share consumer not supported")); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + + final List results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertEquals(1, results.size()); + assertEquals(Outcome.FAILED, results.getFirst().getOutcome()); + } + + @Test + public void testShareGroupVerifyPassesShareGroupIdToConnectionService() throws InitializationException { + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + + processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(ShareGroupContext.class); + verify(kafkaConnectionService).getShareConsumerService(captor.capture()); + final ShareGroupContext capturedContext = captor.getValue(); + assertEquals(CONSUMER_GROUP_ID, capturedContext.getGroupId()); + assertEquals(List.of(TEST_TOPIC_NAME), List.copyOf(capturedContext.getTopics())); + } + + @Test + public void testShareGroupAcknowledgementModeDefaultsToExplicit() throws InitializationException { + setConnectionService(); + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + + runner.assertValid(); + assertEquals(ShareAcknowledgementMode.EXPLICIT.getValue(), + runner.getProcessContext().getProperty(ACKNOWLEDGEMENT_MODE).getValue()); + } + + @Test + public void testShareGroupOnTriggerCommitsAcknowledgements() throws InitializationException { + setConnectionService(); + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + runner.setProperty(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + final String recordValue = "share-record"; + final ByteRecord record = new ByteRecord(TEST_TOPIC_NAME, FIRST_PARTITION, 7L, 0L, Collections.emptyList(), null, recordValue.getBytes(), 1); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(List.of(record)); + + runner.run(1, true, true); + + runner.assertTransferCount(ConsumeKafka.SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).getFirst(); + flowFile.assertContentEquals(recordValue); + flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, TEST_TOPIC_NAME); + flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET, "7"); + + verify(kafkaShareConsumerService).commit(); + verify(kafkaShareConsumerService, never()).rollback(); + } + + @Test + public void testShareGroupOnTriggerNoRecordsRecyclesConsumer() throws InitializationException { + setConnectionService(); + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + runner.setProperty(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + + runner.run(1, false, true); + runner.run(1, false, false); + runner.run(1, true, false); + + runner.assertTransferCount(ConsumeKafka.SUCCESS, 0); + // The same consumer must be reused across triggers when polls return no records. + verify(kafkaConnectionService, times(1)).getShareConsumerService(any(ShareGroupContext.class)); + verify(kafkaShareConsumerService, never()).commit(); + verify(kafkaShareConsumerService, never()).rollback(); + } + + @Test + public void testShareGroupOnTriggerPollExceptionClosesConsumerAndRollsBack() throws Exception { + setConnectionService(); + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + runner.setProperty(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + when(kafkaShareConsumerService.poll(any(Duration.class))).thenThrow(new RuntimeException("broker unavailable")); + when(kafkaShareConsumerService.isClosed()).thenReturn(false); + + runner.run(1, true, true); + + runner.assertTransferCount(ConsumeKafka.SUCCESS, 0); + verify(kafkaShareConsumerService).rollback(); + verify(kafkaShareConsumerService).close(); + } + + @Test + public void testShareGroupExplicitModePassedToShareGroupContext() throws InitializationException { + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + runner.setProperty(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + runner.run(1, true, true); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(ShareGroupContext.class); + verify(kafkaConnectionService).getShareConsumerService(captor.capture()); + assertEquals(ShareAcknowledgementMode.EXPLICIT, captor.getValue().getAcknowledgementMode()); + } + + @Test + public void testShareGroupImplicitModePassedToShareGroupContext() throws InitializationException { + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + runner.setProperty(ACKNOWLEDGEMENT_MODE, ShareAcknowledgementMode.IMPLICIT.getValue()); + runner.setProperty(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); + + runner.run(1, true, true); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(ShareGroupContext.class); + verify(kafkaConnectionService).getShareConsumerService(captor.capture()); + assertEquals(ShareAcknowledgementMode.IMPLICIT, captor.getValue().getAcknowledgementMode()); + } + + @Test + public void testShareGroupVerificationAlwaysUsesExplicitMode() throws InitializationException { + when(kafkaShareConsumerService.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + setConnectionService(); + when(kafkaConnectionService.getShareConsumerService(any(ShareGroupContext.class))).thenReturn(kafkaShareConsumerService); + + runner.setProperty(GROUP_TYPE, GroupType.SHARE); + runner.setProperty(TOPICS, TEST_TOPIC_NAME); + runner.setProperty(GROUP_ID, CONSUMER_GROUP_ID); + runner.setProperty(ACKNOWLEDGEMENT_MODE, ShareAcknowledgementMode.IMPLICIT.getValue()); + + processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(ShareGroupContext.class); + verify(kafkaConnectionService).getShareConsumerService(captor.capture()); + // Verification must use EXPLICIT regardless of the user-selected mode so sampled records + // can be released back to the share group. + assertEquals(ShareAcknowledgementMode.EXPLICIT, captor.getValue().getAcknowledgementMode()); + } + private void setConnectionService() throws InitializationException { when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java index e669c008117a..1992128550a2 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java @@ -19,6 +19,8 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; import org.apache.nifi.kafka.service.api.consumer.PollingContext; +import org.apache.nifi.kafka.service.api.consumer.share.KafkaShareConsumerService; +import org.apache.nifi.kafka.service.api.consumer.share.ShareGroupContext; import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; @@ -29,4 +31,18 @@ public interface KafkaConnectionService extends ControllerService { KafkaProducerService getProducerService(ProducerConfiguration producerConfiguration); String getBrokerUri(); + + /** + * Build a Kafka share-group consumer for the given subscription. + * Implementations that do not support share groups (KIP-932) may throw + * {@link UnsupportedOperationException}; the default implementation does so to preserve + * backward compatibility for existing connection services. + * + * @param shareGroupContext Subscription context describing the share group, topics, and + * acknowledgement mode + * @return Share-group consumer service + */ + default KafkaShareConsumerService getShareConsumerService(ShareGroupContext shareGroupContext) { + throw new UnsupportedOperationException("Share consumer not supported by this Kafka Connection Service"); + } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/Acknowledgement.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/Acknowledgement.java new file mode 100644 index 000000000000..1ea5a13daf1c --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/Acknowledgement.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.api.consumer.share; + +import org.apache.nifi.components.DescribedValue; + +/** + * Acknowledgement type for a Kafka share-group record. Mirrors the broker-side outcomes + * defined by KIP-932 without leaking the Kafka client API into the NiFi service contract. + */ +public enum Acknowledgement implements DescribedValue { + ACCEPT("Accept", "Acknowledge successful processing of the record. The record will not be redelivered."), + + RELEASE("Release", "Release the record so that another consumer in the share group may receive it."), + + REJECT("Reject", "Reject the record as unprocessable. The record will not be redelivered to any consumer."); + + private final String displayName; + private final String description; + + Acknowledgement(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/KafkaShareConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/KafkaShareConsumerService.java new file mode 100644 index 000000000000..0c8f9bf2b8a9 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/KafkaShareConsumerService.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.api.consumer.share; + +import org.apache.nifi.kafka.service.api.record.ByteRecord; + +import java.io.Closeable; +import java.time.Duration; + +/** + * NiFi service contract for consuming records from a Kafka share group (KIP-932). + * + *

Share groups differ from classic consumer groups in important ways:

+ *
    + *
  • Records are not bound to per-consumer partition assignments; multiple consumers in a share + * group may receive records from the same partition.
  • + *
  • Position is tracked per record by acknowledgement, not by committed offset. Records are + * acknowledged with {@link Acknowledgement#ACCEPT}, {@link Acknowledgement#RELEASE}, or + * {@link Acknowledgement#REJECT}.
  • + *
  • There is no rollback by offset seek. The closest equivalent is to release records, which + * returns them to the share-group queue for redelivery to another (or the same) consumer.
  • + *
+ * + *

This service must be closed to avoid leaking connection resources.

+ */ +public interface KafkaShareConsumerService extends Closeable { + + /** + * Poll the share group for records up to the given duration. + * + * @param maxWaitDuration Maximum amount of time to wait for records + * @return Records delivered by the broker, possibly empty + */ + Iterable poll(Duration maxWaitDuration); + + /** + * Acknowledge a single record returned from the most recent {@link #poll(Duration)}. + * Calling this method is only valid when the underlying consumer is configured for + * {@link ShareAcknowledgementMode#EXPLICIT}. + * + * @param record Record to acknowledge + * @param acknowledgement Outcome of the record's processing + */ + void acknowledge(ByteRecord record, Acknowledgement acknowledgement); + + /** + * Commit pending acknowledgements to the broker. + * + *

In implicit mode this commits acknowledgement of all delivered-but-unacknowledged records + * as {@link Acknowledgement#ACCEPT}. In explicit mode this commits the per-record + * acknowledgements that have been recorded since the previous commit; any record delivered by + * the most recent poll that has not been explicitly acknowledged is implicitly accepted as part + * of the commit so the consumer can move on to the next poll.

+ */ + void commit(); + + /** + * Release all delivered-but-unacknowledged records and commit the release acknowledgements. + * This is the share-group analogue of a rollback: released records become eligible for + * redelivery to other consumers in the same share group. + */ + void rollback(); + + /** + * @return {@code true} if this service has been closed + */ + boolean isClosed(); +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/ShareAcknowledgementMode.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/ShareAcknowledgementMode.java new file mode 100644 index 000000000000..fee64e181d30 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/ShareAcknowledgementMode.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.api.consumer.share; + +import org.apache.nifi.components.DescribedValue; + +/** + * Acknowledgement mode for a Kafka share-group consumer. Maps to the underlying Kafka client + * property {@code share.acknowledgement.mode}. + */ +public enum ShareAcknowledgementMode implements DescribedValue { + IMPLICIT( + "implicit", + "Implicit", + "Records delivered by a poll are implicitly accepted when the next poll is called or when commit is invoked." + + " Per-record failure handling is not available in this mode." + ), + + EXPLICIT( + "explicit", + "Explicit", + "Each delivered record must be acknowledged before the next poll. Enables per-record failure handling" + + " and selective release or rejection of records." + ); + + private final String value; + private final String displayName; + private final String description; + + ShareAcknowledgementMode(final String value, final String displayName, final String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/ShareGroupContext.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/ShareGroupContext.java new file mode 100644 index 000000000000..5ad305395686 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/share/ShareGroupContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.api.consumer.share; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +/** + * Subscription context for a Kafka share-group consumer. + * Share groups (KIP-932) accept only a topic-name list; pattern subscription is not part of the + * 4.1 share-consumer API. + */ +public class ShareGroupContext { + + private final String groupId; + private final Collection topics; + private final ShareAcknowledgementMode acknowledgementMode; + + public ShareGroupContext(final String groupId, final Collection topics, final ShareAcknowledgementMode acknowledgementMode) { + this.groupId = Objects.requireNonNull(groupId, "Group ID required"); + this.topics = List.copyOf(Objects.requireNonNull(topics, "Topics required")); + this.acknowledgementMode = Objects.requireNonNull(acknowledgementMode, "Acknowledgement Mode required"); + } + + public String getGroupId() { + return groupId; + } + + public Collection getTopics() { + return topics; + } + + public ShareAcknowledgementMode getAcknowledgementMode() { + return acknowledgementMode; + } + + @Override + public String toString() { + return "ShareGroupContext[groupId=%s, topics=%s, acknowledgementMode=%s]".formatted(groupId, topics, acknowledgementMode); + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index 225a21ca0682..cfd8bc1fcc7e 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; @@ -45,11 +46,15 @@ import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; import org.apache.nifi.kafka.service.api.consumer.PollingContext; +import org.apache.nifi.kafka.service.api.consumer.share.KafkaShareConsumerService; +import org.apache.nifi.kafka.service.api.consumer.share.ShareAcknowledgementMode; +import org.apache.nifi.kafka.service.api.consumer.share.ShareGroupContext; import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; import org.apache.nifi.kafka.service.consumer.Kafka3AssignmentService; import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; import org.apache.nifi.kafka.service.consumer.Subscription; +import org.apache.nifi.kafka.service.consumer.share.Kafka4ShareConsumerService; import org.apache.nifi.kafka.service.producer.Kafka3ProducerService; import org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler; import org.apache.nifi.kafka.service.security.StandardSslEngineFactory; @@ -260,6 +265,39 @@ public KafkaConsumerService getConsumerService(final PollingContext pollingConte } } + @Override + public KafkaShareConsumerService getShareConsumerService(final ShareGroupContext shareGroupContext) { + Objects.requireNonNull(shareGroupContext, "Share Group Context required"); + + final Properties properties = new Properties(); + properties.putAll(consumerProperties); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupContext.getGroupId()); + + // Configurations rejected by ShareConsumerConfig.SHARE_GROUP_UNSUPPORTED_CONFIGS in + // Kafka 4.2+. Remove rather than override so KafkaShareConsumer construction does not + // throw ConfigException on properties carried over from the classic-consumer setup. + // Keep this list in sync with org.apache.kafka.clients.consumer.ShareConsumerConfig + // when upgrading the kafka-clients dependency. + properties.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + properties.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + properties.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG); + properties.remove(ConsumerConfig.ISOLATION_LEVEL_CONFIG); + properties.remove(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG); + properties.remove(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG); + properties.remove(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); + properties.remove(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); + properties.remove(ConsumerConfig.GROUP_PROTOCOL_CONFIG); + properties.remove(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); + + final ShareAcknowledgementMode acknowledgementMode = shareGroupContext.getAcknowledgementMode(); + properties.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, acknowledgementMode.getValue()); + + final ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); + final KafkaShareConsumer shareConsumer = new KafkaShareConsumer<>(properties, deserializer, deserializer); + + return new Kafka4ShareConsumerService(getLogger(), shareConsumer, acknowledgementMode, shareGroupContext.getTopics()); + } + private Subscription createSubscription(final PollingContext pollingContext) { final String groupId = pollingContext.getGroupId(); final Optional topicPatternFound = pollingContext.getTopicPattern(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/share/Kafka4ShareConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/share/Kafka4ShareConsumerService.java new file mode 100644 index 000000000000..f573afb2af43 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/share/Kafka4ShareConsumerService.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.consumer.share; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.nifi.kafka.service.api.consumer.share.Acknowledgement; +import org.apache.nifi.kafka.service.api.consumer.share.KafkaShareConsumerService; +import org.apache.nifi.kafka.service.api.consumer.share.ShareAcknowledgementMode; +import org.apache.nifi.kafka.service.api.header.RecordHeader; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.logging.ComponentLog; + +import java.io.Closeable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Kafka share-group consumer implementation backed by {@link ShareConsumer} (KIP-932). + * Requires Kafka 4.2+ brokers with share-group features enabled. + * + *

The {@link ShareConsumer} API is annotated {@code @Evolving} in the Kafka client and may + * change in incompatible ways between minor Kafka releases.

+ */ +public class Kafka4ShareConsumerService implements KafkaShareConsumerService, Closeable { + + private final ComponentLog componentLog; + private final ShareConsumer consumer; + private final ShareAcknowledgementMode acknowledgementMode; + + /** + * Records delivered by the most recent {@link #poll(Duration)} that have not yet been + * explicitly acknowledged. Tracked by topic-partition-offset coordinates so callers can + * reference records via the {@link ByteRecord} they received from the service without the + * service needing to expose the underlying {@link ConsumerRecord}. Insertion-order tracking + * makes any commit-time auto-ACCEPT iteration deterministic. + */ + private final Map> pendingRecords = new LinkedHashMap<>(); + + private volatile boolean closed = false; + + public Kafka4ShareConsumerService(final ComponentLog componentLog, + final ShareConsumer consumer, + final ShareAcknowledgementMode acknowledgementMode, + final Collection topics) { + this.componentLog = Objects.requireNonNull(componentLog, "Component Log required"); + this.consumer = Objects.requireNonNull(consumer, "Consumer required"); + this.acknowledgementMode = Objects.requireNonNull(acknowledgementMode, "Acknowledgement Mode required"); + Objects.requireNonNull(topics, "Topics required"); + + consumer.subscribe(topics); + } + + @Override + public Iterable poll(final Duration maxWaitDuration) { + final ConsumerRecords consumerRecords = consumer.poll(maxWaitDuration); + if (consumerRecords.isEmpty()) { + return List.of(); + } + + final List byteRecords = new ArrayList<>(consumerRecords.count()); + for (final ConsumerRecord consumerRecord : consumerRecords) { + final RecordCoordinates coordinates = RecordCoordinates.of(consumerRecord); + pendingRecords.put(coordinates, consumerRecord); + byteRecords.add(toByteRecord(consumerRecord)); + } + return byteRecords; + } + + @Override + public void acknowledge(final ByteRecord record, final Acknowledgement acknowledgement) { + Objects.requireNonNull(record, "Record required"); + Objects.requireNonNull(acknowledgement, "Acknowledgement required"); + + if (acknowledgementMode != ShareAcknowledgementMode.EXPLICIT) { + componentLog.debug("Per-record acknowledgement called but consumer is in [{}] mode; ignoring", acknowledgementMode); + return; + } + + final RecordCoordinates coordinates = RecordCoordinates.of(record); + final ConsumerRecord consumerRecord = pendingRecords.remove(coordinates); + if (consumerRecord == null) { + componentLog.debug("Record {} not found among pending records; cannot acknowledge", coordinates); + return; + } + + consumer.acknowledge(consumerRecord, toAcknowledgeType(acknowledgement)); + } + + @Override + public void commit() { + if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) { + // Explicit mode requires every delivered record to be acknowledged before the next poll/commit. + // Treat any record left unacknowledged at this point as ACCEPT so the commit can succeed. + for (final ConsumerRecord remaining : pendingRecords.values()) { + consumer.acknowledge(remaining, AcknowledgeType.ACCEPT); + } + } + + final Map> result = consumer.commitSync(); + pendingRecords.clear(); + logPartitionFailures("commit", result); + } + + /** + * {@inheritDoc} + * + *

In {@link ShareAcknowledgementMode#EXPLICIT} this acknowledges all pending records as + * {@link AcknowledgeType#RELEASE} and commits, so the records are immediately eligible for + * redelivery to another consumer in the share group.

+ * + *

In {@link ShareAcknowledgementMode#IMPLICIT} per-record acknowledgement is not permitted, + * and the next {@link #commit()} or {@link #poll(Duration)} would otherwise treat the records as + * implicitly accepted. This implementation drops the local pending tracking, but the only way + * to actually release the records is for the caller to {@link #close()} the consumer and let + * the broker's record-acquisition lock expire (broker-level + * {@code group.share.record.lock.duration.ms}). After that timeout the records become eligible + * for redelivery to another consumer.

+ */ + @Override + public void rollback() { + if (pendingRecords.isEmpty()) { + return; + } + + if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) { + for (final ConsumerRecord consumerRecord : pendingRecords.values()) { + consumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE); + } + try { + final Map> result = consumer.commitSync(); + logPartitionFailures("rollback", result); + } catch (final Exception e) { + componentLog.warn("Failed to commit RELEASE acknowledgements during rollback for {} records", pendingRecords.size(), e); + } + pendingRecords.clear(); + } else { + componentLog.debug("Dropped {} pending records in implicit acknowledgement mode; redelivery requires consumer close and broker lock expiry", + pendingRecords.size()); + pendingRecords.clear(); + } + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + closed = true; + consumer.close(); + } + + private void logPartitionFailures(final String operation, final Map> result) { + for (final Map.Entry> entry : result.entrySet()) { + final Optional failure = entry.getValue(); + if (failure.isPresent()) { + componentLog.warn("Share consumer {} reported failure for partition {}", operation, entry.getKey(), failure.get()); + } + } + } + + private ByteRecord toByteRecord(final ConsumerRecord consumerRecord) { + final List recordHeaders = new ArrayList<>(); + consumerRecord.headers().forEach(header -> recordHeaders.add(new RecordHeader(header.key(), header.value()))); + + byte[] value = consumerRecord.value(); + if (value == null) { + value = new byte[0]; + } + + return new ByteRecord( + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.timestamp(), + recordHeaders, + consumerRecord.key(), + value, + 1 + ); + } + + private AcknowledgeType toAcknowledgeType(final Acknowledgement acknowledgement) { + return switch (acknowledgement) { + case ACCEPT -> AcknowledgeType.ACCEPT; + case RELEASE -> AcknowledgeType.RELEASE; + case REJECT -> AcknowledgeType.REJECT; + }; + } + + /** + * Coordinates that uniquely identify a record within a poll batch. Used to map the {@link ByteRecord} + * the caller receives back to the underlying {@link ConsumerRecord} the share consumer needs in order + * to call {@code acknowledge}. + */ + private record RecordCoordinates(String topic, int partition, long offset) { + + static RecordCoordinates of(final ConsumerRecord consumerRecord) { + return new RecordCoordinates(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()); + } + + static RecordCoordinates of(final ByteRecord byteRecord) { + return new RecordCoordinates(byteRecord.getTopic(), byteRecord.getPartition(), byteRecord.getOffset()); + } + } + + // Visible for tests + int getPendingRecordCount() { + return pendingRecords.size(); + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java index e7cf2d361844..f02466cd3ddf 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java @@ -50,7 +50,7 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.io.TempDir; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; @@ -92,7 +92,7 @@ public class Kafka3ConnectionServiceBaseIT { // This Base class executes its tests with Ssl off and Sasl off. // There are subclasses which execute these same tests and enable Ssl or Sasl - public static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.6"; // January 2026 + public static final String IMAGE_NAME = System.getProperty("kafka.docker.image", "apache/kafka:4.2.0"); private static final String DELIVERY_TIMEOUT_MS_KEY = "delivery.timeout.ms"; private static final String DELIVERY_TIMEOUT_MS_VALUE = "60000"; @@ -142,7 +142,7 @@ public class Kafka3ConnectionServiceBaseIT { protected TestRunner runner; - private ConfluentKafkaContainer kafkaContainer; + private KafkaContainer kafkaContainer; private Kafka3ConnectionService service; @@ -169,7 +169,7 @@ void startContainer() throws Exception { trustStore.store(outputStream, KEY_STORE_PASSWORD.toCharArray()); } - kafkaContainer = new ConfluentKafkaContainer(DockerImageName.parse(IMAGE_NAME)); + kafkaContainer = new KafkaContainer(DockerImageName.parse(IMAGE_NAME)); initializeContainer(); kafkaContainer.start(); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaConnectivityIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaConnectivityIT.java index 0eb1730c88c2..e1c9a861cfdb 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaConnectivityIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaConnectivityIT.java @@ -32,7 +32,7 @@ import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; import java.time.Duration; @@ -50,11 +50,11 @@ public class KafkaConnectivityIT { private static final String TEST_TOPIC = "nifi-" + System.currentTimeMillis(); - private static ConfluentKafkaContainer kafka; + private static KafkaContainer kafka; @BeforeAll static void beforeAll() { - kafka = new ConfluentKafkaContainer(DockerImageName.parse(Kafka3ConnectionServiceBaseIT.IMAGE_NAME)); + kafka = new KafkaContainer(DockerImageName.parse(Kafka3ConnectionServiceBaseIT.IMAGE_NAME)); kafka.start(); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/share/Kafka4ShareConsumerServiceTest.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/share/Kafka4ShareConsumerServiceTest.java new file mode 100644 index 000000000000..3c3fb7beb7ee --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/share/Kafka4ShareConsumerServiceTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.consumer.share; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.apache.nifi.kafka.service.api.consumer.share.Acknowledgement; +import org.apache.nifi.kafka.service.api.consumer.share.ShareAcknowledgementMode; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.logging.ComponentLog; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class Kafka4ShareConsumerServiceTest { + + private static final String TOPIC = "share-topic"; + private static final List TOPICS = Collections.singletonList(TOPIC); + private static final int PARTITION_0 = 0; + + @Mock + private ShareConsumer shareConsumer; + + @Mock + private ComponentLog componentLog; + + @Test + void testConstructorSubscribesToConfiguredTopics() { + new Kafka4ShareConsumerService(componentLog, shareConsumer, ShareAcknowledgementMode.EXPLICIT, TOPICS); + verify(shareConsumer).subscribe(TOPICS); + } + + @Test + void testPollReturnsByteRecordsAndTracksPending() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ConsumerRecord record0 = createRecord(TOPIC, PARTITION_0, 5L); + final ConsumerRecord record1 = createRecord(TOPIC, PARTITION_0, 6L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(record0, record1)); + + final List byteRecords = collect(service.poll(Duration.ofMillis(100))); + + assertEquals(2, byteRecords.size()); + assertEquals(2, service.getPendingRecordCount()); + assertEquals(5L, byteRecords.get(0).getOffset()); + assertEquals(6L, byteRecords.get(1).getOffset()); + } + + @Test + void testPollConvertsTombstoneToEmptyValue() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ConsumerRecord tombstone = createTombstoneRecord(TOPIC, PARTITION_0, 7L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(tombstone)); + + final List byteRecords = collect(service.poll(Duration.ofMillis(100))); + + assertEquals(1, byteRecords.size()); + final ByteRecord byteRecord = byteRecords.get(0); + assertNotNull(byteRecord.getValue()); + assertEquals(0, byteRecord.getValue().length); + } + + @Test + void testExplicitAcknowledgementsForwardedToConsumer() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ConsumerRecord recordAccept = createRecord(TOPIC, PARTITION_0, 1L); + final ConsumerRecord recordRelease = createRecord(TOPIC, PARTITION_0, 2L); + final ConsumerRecord recordReject = createRecord(TOPIC, PARTITION_0, 3L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(recordAccept, recordRelease, recordReject)); + + final List byteRecords = collect(service.poll(Duration.ofMillis(100))); + + service.acknowledge(byteRecords.get(0), Acknowledgement.ACCEPT); + service.acknowledge(byteRecords.get(1), Acknowledgement.RELEASE); + service.acknowledge(byteRecords.get(2), Acknowledgement.REJECT); + + verify(shareConsumer).acknowledge(recordAccept, AcknowledgeType.ACCEPT); + verify(shareConsumer).acknowledge(recordRelease, AcknowledgeType.RELEASE); + verify(shareConsumer).acknowledge(recordReject, AcknowledgeType.REJECT); + assertEquals(0, service.getPendingRecordCount()); + } + + @Test + void testCommitInExplicitModeAcceptsRemainingPendingRecords() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ConsumerRecord first = createRecord(TOPIC, PARTITION_0, 1L); + final ConsumerRecord second = createRecord(TOPIC, PARTITION_0, 2L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(first, second)); + when(shareConsumer.commitSync()).thenReturn(Collections.emptyMap()); + + final List byteRecords = collect(service.poll(Duration.ofMillis(100))); + service.acknowledge(byteRecords.get(0), Acknowledgement.ACCEPT); + service.commit(); + + verify(shareConsumer).acknowledge(first, AcknowledgeType.ACCEPT); + verify(shareConsumer).acknowledge(second, AcknowledgeType.ACCEPT); + verify(shareConsumer).commitSync(); + assertEquals(0, service.getPendingRecordCount()); + } + + @Test + void testCommitInImplicitModeDoesNotCallAcknowledge() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.IMPLICIT); + final ConsumerRecord first = createRecord(TOPIC, PARTITION_0, 1L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(first)); + when(shareConsumer.commitSync()).thenReturn(Collections.emptyMap()); + + collect(service.poll(Duration.ofMillis(100))); + service.commit(); + + verify(shareConsumer, never()).acknowledge(any(ConsumerRecord.class), any(AcknowledgeType.class)); + verify(shareConsumer).commitSync(); + } + + @Test + void testRollbackInExplicitModeReleasesPendingRecordsAndCommits() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ConsumerRecord first = createRecord(TOPIC, PARTITION_0, 1L); + final ConsumerRecord second = createRecord(TOPIC, PARTITION_0, 2L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(first, second)); + when(shareConsumer.commitSync()).thenReturn(Collections.emptyMap()); + + collect(service.poll(Duration.ofMillis(100))); + service.rollback(); + + verify(shareConsumer).acknowledge(first, AcknowledgeType.RELEASE); + verify(shareConsumer).acknowledge(second, AcknowledgeType.RELEASE); + verify(shareConsumer).commitSync(); + assertEquals(0, service.getPendingRecordCount()); + } + + @Test + void testRollbackInImplicitModeClearsPendingWithoutAcknowledging() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.IMPLICIT); + final ConsumerRecord first = createRecord(TOPIC, PARTITION_0, 1L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(first)); + + collect(service.poll(Duration.ofMillis(100))); + service.rollback(); + + verify(shareConsumer, never()).acknowledge(any(ConsumerRecord.class), any(AcknowledgeType.class)); + verify(shareConsumer, never()).commitSync(); + assertEquals(0, service.getPendingRecordCount()); + } + + @Test + void testRollbackWithNoPendingRecordsIsNoOp() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + service.rollback(); + + verify(shareConsumer, never()).acknowledge(any(ConsumerRecord.class), any(AcknowledgeType.class)); + verify(shareConsumer, never()).commitSync(); + } + + @Test + void testAcknowledgeIgnoredInImplicitMode() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.IMPLICIT); + final ConsumerRecord first = createRecord(TOPIC, PARTITION_0, 1L); + when(shareConsumer.poll(any(Duration.class))).thenReturn(consumerRecords(first)); + + final List byteRecords = collect(service.poll(Duration.ofMillis(100))); + service.acknowledge(byteRecords.get(0), Acknowledgement.RELEASE); + + verify(shareConsumer, never()).acknowledge(any(ConsumerRecord.class), any(AcknowledgeType.class)); + } + + @Test + void testAcknowledgeForUnknownRecordIsIgnored() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ByteRecord stranger = new ByteRecord("other-topic", 0, 99L, 0L, Collections.emptyList(), null, new byte[0], 1); + + service.acknowledge(stranger, Acknowledgement.ACCEPT); + + verify(shareConsumer, never()).acknowledge(any(ConsumerRecord.class), any(AcknowledgeType.class)); + } + + @Test + void testCloseFlagsServiceClosed() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + assertFalse(service.isClosed()); + + service.close(); + + assertTrue(service.isClosed()); + verify(shareConsumer).close(); + } + + @Test + void testCommitClearsPendingEvenIfMultiplePollsBetweenCommits() { + final Kafka4ShareConsumerService service = newService(ShareAcknowledgementMode.EXPLICIT); + final ConsumerRecord first = createRecord(TOPIC, PARTITION_0, 1L); + final ConsumerRecord second = createRecord(TOPIC, PARTITION_0, 2L); + when(shareConsumer.poll(any(Duration.class))) + .thenReturn(consumerRecords(first)) + .thenReturn(consumerRecords(second)); + when(shareConsumer.commitSync()).thenReturn(Collections.emptyMap()); + + collect(service.poll(Duration.ofMillis(100))); + collect(service.poll(Duration.ofMillis(100))); + assertEquals(2, service.getPendingRecordCount()); + + service.commit(); + + verify(shareConsumer).acknowledge(eq(first), eq(AcknowledgeType.ACCEPT)); + verify(shareConsumer).acknowledge(eq(second), eq(AcknowledgeType.ACCEPT)); + verify(shareConsumer, times(1)).commitSync(); + assertEquals(0, service.getPendingRecordCount()); + } + + private Kafka4ShareConsumerService newService(final ShareAcknowledgementMode mode) { + return new Kafka4ShareConsumerService(componentLog, shareConsumer, mode, TOPICS); + } + + @SafeVarargs + private ConsumerRecords consumerRecords(final ConsumerRecord... records) { + if (records.length == 0) { + return ConsumerRecords.empty(); + } + final Map>> recordsMap = new HashMap<>(); + for (final ConsumerRecord record : records) { + recordsMap.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), k -> new ArrayList<>()).add(record); + } + return new ConsumerRecords<>(recordsMap, Collections.emptyMap()); + } + + private List collect(final Iterable iterable) { + final List list = new ArrayList<>(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + list.add(iterator.next()); + } + return list; + } + + private ConsumerRecord createRecord(final String topic, final int partition, final long offset) { + return new ConsumerRecord<>( + topic, + partition, + offset, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 0, + "value-%d".formatted(offset).length(), + null, + "value-%d".formatted(offset).getBytes(), + new RecordHeaders(), + Optional.empty() + ); + } + + private ConsumerRecord createTombstoneRecord(final String topic, final int partition, final long offset) { + return new ConsumerRecord<>( + topic, + partition, + offset, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 0, + 0, + null, + null, + new RecordHeaders(), + Optional.empty() + ); + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/pom.xml b/nifi-extension-bundles/nifi-kafka-bundle/pom.xml index 4bf9274b7e27..ffd10c6697c5 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/pom.xml +++ b/nifi-extension-bundles/nifi-kafka-bundle/pom.xml @@ -26,6 +26,7 @@ false false + apache/kafka:4.2.0 @@ -44,6 +45,28 @@ + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${kafka.docker.image} + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + ${kafka.docker.image} + + + + + org.jacoco