Skip to content

Commit 4017c57

Browse files
committed
Fix subscription race
1 parent 57cfe47 commit 4017c57

1 file changed

Lines changed: 29 additions & 2 deletions

File tree

bin/mqtt5_socks5_app/main.cpp

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -714,18 +714,28 @@ int main(int argc, char **argv)
714714
subscriptionList.push_back(sub);
715715
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(app_ctx.allocator);
716716
subscribe->WithSubscriptions(subscriptionList);
717+
718+
std::promise<bool> subscribeAckPromise;
717719
bool subscribeSuccess = mqtt5Client->Subscribe(
718720
subscribe,
719-
[](int, std::shared_ptr<Mqtt5::SubAckPacket> packet)
721+
[&subscribeAckPromise](int errorCode, std::shared_ptr<Mqtt5::SubAckPacket> packet)
720722
{
721-
if(packet == nullptr) return;
723+
if (errorCode != AWS_ERROR_SUCCESS || packet == nullptr) {
724+
subscribeAckPromise.set_value(false);
725+
return;
726+
}
722727
std::cout << "**********************************************************" << std::endl;
723728
std::cout << "MQTT5: check suback packet : " << std::endl;
729+
bool allGranted = true;
724730
for (auto code : packet->getReasonCodes())
725731
{
726732
std::cout << "Get suback with codes: " << code << std::endl;
733+
if (code > 2) { // Non-granted reason codes are >= 0x80
734+
allGranted = false;
735+
}
727736
}
728737
std::cout << "**********************************************************" << std::endl;
738+
subscribeAckPromise.set_value(allGranted);
729739
});
730740

731741
if (!subscribeSuccess) {
@@ -736,6 +746,23 @@ int main(int argc, char **argv)
736746
return 2; // Subscription failure
737747
}
738748

749+
// Wait for SUBACK before publishing so we don't race the proxy latency.
750+
std::cout << "**********************************************************" << std::endl;
751+
std::cout << "MQTT5: Waiting for SUBACK confirmation..." << std::endl;
752+
std::cout << "**********************************************************" << std::endl;
753+
bool subAckGranted = false;
754+
try {
755+
subAckGranted = subscribeAckPromise.get_future().get();
756+
} catch (...) {
757+
subAckGranted = false;
758+
}
759+
if (!subAckGranted) {
760+
std::cout << "[ERROR]Subscription was not granted by broker." << std::endl;
761+
mqtt5Client->Stop();
762+
stoppedPromise.get_future().get();
763+
return 2;
764+
}
765+
739766
// Publish to the same topic
740767
ByteCursor payload = Aws::Crt::ByteCursorFromCString("mqtt5 publish test");
741768
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(app_ctx.allocator);

0 commit comments

Comments
 (0)