From 9d879bb4d10a7c6bab49d6aae3be510ca346b9fc Mon Sep 17 00:00:00 2001 From: David Wang Date: Mon, 20 Oct 2025 16:09:44 -0700 Subject: [PATCH 1/3] Handle TopicTerminated for producers to fail immediately rather than retrying and timing out. --- lib/HandlerBase.cc | 1 + lib/HandlerBase.h | 2 ++ lib/ProducerImpl.cc | 7 +++++-- lib/ResultUtils.h | 1 + 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 1a4f573e..bc8cd530 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& case Closing: case Closed: case Producer_Fenced: + case Terminated: case Failed: LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore"); break; diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 0e733f00..229404ee 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -138,6 +138,8 @@ class HandlerBase : public std::enable_shared_from_this { Failed, // Handler is failed, in Java client: HandlerState.State.Failed Producer_Fenced, // The producer has been fenced by the broker // in Java client: HandlerState.State.ProducerFenced + Terminated, // The topic has been terminatedproducer has been fenced by the broker + // in Java client: HandlerState.State.Terminated }; std::atomic state_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d8f2dbac..7632581e 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result } } - if (result == ResultProducerFenced) { - state_ = Producer_Fenced; + if (result == ResultProducerFenced || result == ResultTopicTerminated) { + state_ = result == ResultProducerFenced ? Producer_Fenced : Terminated; failPendingMessages(result, false); auto client = client_.lock(); if (client) { @@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const { case HandlerBase::Producer_Fenced: callback(ResultProducerFenced, {}); return false; + case HandlerBase::Terminated: + callback(ResultTopicTerminated, {}); + return false; case HandlerBase::NotStarted: case HandlerBase::Failed: default: diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h index cf4ff1fb..a3d2ec4e 100644 --- a/lib/ResultUtils.h +++ b/lib/ResultUtils.h @@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) { ResultInvalidConfiguration, ResultIncompatibleSchema, ResultTopicNotFound, + ResultTopicTerminated, ResultOperationNotSupported, ResultNotAllowedError, ResultChecksumError, From 03661b09a88692f4ab6006da572f5973b2d6d181 Mon Sep 17 00:00:00 2001 From: David Wang Date: Tue, 14 Apr 2026 13:42:43 -0700 Subject: [PATCH 2/3] Add regression tests for topic termination --- tests/ProducerTest.cc | 47 +++++++++++++++++++++++++++++++++++++++++++ tests/PulsarFriend.h | 5 +++++ 2 files changed, 52 insertions(+) diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 4220a2ed..69f99eda 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -215,6 +215,53 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } +TEST(ProducerTest, testCreateProducerAfterTopicTermination) { + const auto topicName = + "testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr)); + const auto topic = "persistent://public/default/" + topicName; + + Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("content").build())); + ASSERT_EQ(ResultOk, producer.close()); + + const auto httpCode = + makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", ""); + ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode; + + Producer terminatedProducer; + ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, terminatedProducer)); + + client.close(); +} + +TEST(ProducerTest, testSendAfterTopicTerminationReconnect) { + const auto topicName = + "testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr)); + const auto topic = "persistent://public/default/" + topicName; + + Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("before-terminate").build())); + + const auto httpCode = + makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", ""); + ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode; + + PulsarFriend::getProducerImpl(producer).disconnectProducer(); + ASSERT_TRUE(waitUntil(std::chrono::seconds(3), + [&producer] { return PulsarFriend::isTerminated(producer); })); + + ASSERT_EQ(ResultTopicTerminated, + producer.send(MessageBuilder().setContent("after-terminate").build())); + + client.close(); +} + class ProducerTest : public ::testing::TestWithParam {}; TEST_P(ProducerTest, testMaxMessageSize) { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 3296953b..8017e0ba 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -257,6 +257,11 @@ class PulsarFriend { return waitUntil(std::chrono::seconds(3), [producerImpl] { return !producerImpl->getCnx().expired(); }); } + + static bool isTerminated(Producer producer) { + auto producerImpl = std::dynamic_pointer_cast(producer.impl_); + return producerImpl && producerImpl->state_ == HandlerBase::Terminated; + } }; } // namespace pulsar From f358ce46112f758710441393a2c2cc63fe222483 Mon Sep 17 00:00:00 2001 From: David Wang Date: Wed, 15 Apr 2026 10:21:40 -0700 Subject: [PATCH 3/3] Format producer termination tests --- tests/ProducerTest.cc | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 69f99eda..d0552df2 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -216,8 +216,7 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { } TEST(ProducerTest, testCreateProducerAfterTopicTermination) { - const auto topicName = - "testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr)); + const auto topicName = "testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr)); const auto topic = "persistent://public/default/" + topicName; Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); @@ -238,8 +237,7 @@ TEST(ProducerTest, testCreateProducerAfterTopicTermination) { } TEST(ProducerTest, testSendAfterTopicTerminationReconnect) { - const auto topicName = - "testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr)); + const auto topicName = "testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr)); const auto topic = "persistent://public/default/" + topicName; Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); @@ -253,11 +251,10 @@ TEST(ProducerTest, testSendAfterTopicTerminationReconnect) { ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode; PulsarFriend::getProducerImpl(producer).disconnectProducer(); - ASSERT_TRUE(waitUntil(std::chrono::seconds(3), - [&producer] { return PulsarFriend::isTerminated(producer); })); + ASSERT_TRUE( + waitUntil(std::chrono::seconds(3), [&producer] { return PulsarFriend::isTerminated(producer); })); - ASSERT_EQ(ResultTopicTerminated, - producer.send(MessageBuilder().setContent("after-terminate").build())); + ASSERT_EQ(ResultTopicTerminated, producer.send(MessageBuilder().setContent("after-terminate").build())); client.close(); }