Skip to content

Commit 7d1002a

Browse files
authored
Fix consumer reconnect state after subscribe failure (#577)
1 parent 0666b5c commit 7d1002a

3 files changed

Lines changed: 43 additions & 1 deletion

File tree

lib/ConsumerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,9 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
371371
}
372372

373373
if (consumerCreatedPromise_.isComplete()) {
374-
// Consumer had already been initially created, we need to retry connecting in any case
374+
// Clear the connection set before SUBSCRIBE so the next reconnect is not skipped.
375375
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
376+
resetCnx();
376377
handleResult = ResultRetryable;
377378
} else {
378379
// Consumer was not yet created, retry to connect to broker if it's possible

tests/ConsumerTest.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,4 +1596,40 @@ TEST(ConsumerTest, testCloseAfterSeek) {
15961596
anotherClient.close();
15971597
}
15981598

1599+
TEST(ConsumerTest, testIsConnectedFalsePositiveAfterSubscribeRejectedOnReconnect) {
1600+
// A reconnect SUBSCRIBE failure happens after the initial subscribe has already completed.
1601+
const std::string topic =
1602+
"persistent://public/default/test-false-positive-" + std::to_string(time(nullptr));
1603+
Client client(lookupUrl);
1604+
Consumer consumer;
1605+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
1606+
ASSERT_TRUE(consumer.isConnected()) << "Precondition: consumer should be connected";
1607+
1608+
auto& consumerImpl = PulsarFriend::getConsumerImpl(consumer);
1609+
1610+
// Capture the current live connection.
1611+
auto cnx = consumerImpl.getCnx().lock();
1612+
ASSERT_TRUE(cnx != nullptr) << "Precondition: cnx should be non-null";
1613+
LOG_INFO("Step 1 passed: consumer subscribed, cnx=" << cnx);
1614+
1615+
// Simulate the broker rejecting the SUBSCRIBE command during reconnect.
1616+
Result handleResult = PulsarFriend::consumerHandleCreateConsumer(consumer, cnx, ResultAuthorizationError);
1617+
LOG_INFO("Step 2: handleCreateConsumer returned " << handleResult);
1618+
EXPECT_EQ(ResultRetryable, handleResult)
1619+
<< "handleCreateConsumer should return ResultRetryable for an already-created consumer";
1620+
1621+
// The failed SUBSCRIBE must clear the connection set before SUBSCRIBE.
1622+
auto cnxAfter = consumerImpl.getCnx().lock();
1623+
LOG_INFO("Step 3: cnx after handleCreateConsumer failure = " << cnxAfter);
1624+
LOG_INFO("Step 3: isConnected() = " << consumer.isConnected());
1625+
1626+
EXPECT_EQ(nullptr, cnxAfter)
1627+
<< "After fix: connection_ must be cleared by resetCnx() so grabCnx() can retry";
1628+
EXPECT_FALSE(consumer.isConnected())
1629+
<< "After fix: isConnected() must return false after SUBSCRIBE rejection";
1630+
1631+
consumer.close();
1632+
client.close();
1633+
}
1634+
15991635
} // namespace pulsar

tests/PulsarFriend.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ class PulsarFriend {
102102
return *consumerImpl;
103103
}
104104

105+
static Result consumerHandleCreateConsumer(Consumer consumer, const ClientConnectionPtr& cnx,
106+
Result result) {
107+
return getConsumerImpl(consumer).handleCreateConsumer(cnx, result);
108+
}
109+
105110
static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
106111
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
107112
}

0 commit comments

Comments
 (0)