diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 1a4f573e..bc8cd530 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& case Closing: case Closed: case Producer_Fenced: + case Terminated: case Failed: LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore"); break; diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 0e733f00..229404ee 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -138,6 +138,8 @@ class HandlerBase : public std::enable_shared_from_this { Failed, // Handler is failed, in Java client: HandlerState.State.Failed Producer_Fenced, // The producer has been fenced by the broker // in Java client: HandlerState.State.ProducerFenced + Terminated, // The topic has been terminatedproducer has been fenced by the broker + // in Java client: HandlerState.State.Terminated }; std::atomic state_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d8f2dbac..7632581e 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result } } - if (result == ResultProducerFenced) { - state_ = Producer_Fenced; + if (result == ResultProducerFenced || result == ResultTopicTerminated) { + state_ = result == ResultProducerFenced ? Producer_Fenced : Terminated; failPendingMessages(result, false); auto client = client_.lock(); if (client) { @@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const { case HandlerBase::Producer_Fenced: callback(ResultProducerFenced, {}); return false; + case HandlerBase::Terminated: + callback(ResultTopicTerminated, {}); + return false; case HandlerBase::NotStarted: case HandlerBase::Failed: default: diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h index cf4ff1fb..a3d2ec4e 100644 --- a/lib/ResultUtils.h +++ b/lib/ResultUtils.h @@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) { ResultInvalidConfiguration, ResultIncompatibleSchema, ResultTopicNotFound, + ResultTopicTerminated, ResultOperationNotSupported, ResultNotAllowedError, ResultChecksumError, diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 4220a2ed..d0552df2 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -215,6 +215,50 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } +TEST(ProducerTest, testCreateProducerAfterTopicTermination) { + const auto topicName = "testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr)); + const auto topic = "persistent://public/default/" + topicName; + + Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("content").build())); + ASSERT_EQ(ResultOk, producer.close()); + + const auto httpCode = + makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", ""); + ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode; + + Producer terminatedProducer; + ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, terminatedProducer)); + + client.close(); +} + +TEST(ProducerTest, testSendAfterTopicTerminationReconnect) { + const auto topicName = "testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr)); + const auto topic = "persistent://public/default/" + topicName; + + Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("before-terminate").build())); + + const auto httpCode = + makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", ""); + ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode; + + PulsarFriend::getProducerImpl(producer).disconnectProducer(); + ASSERT_TRUE( + waitUntil(std::chrono::seconds(3), [&producer] { return PulsarFriend::isTerminated(producer); })); + + ASSERT_EQ(ResultTopicTerminated, producer.send(MessageBuilder().setContent("after-terminate").build())); + + client.close(); +} + class ProducerTest : public ::testing::TestWithParam {}; TEST_P(ProducerTest, testMaxMessageSize) { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 3296953b..8017e0ba 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -257,6 +257,11 @@ class PulsarFriend { return waitUntil(std::chrono::seconds(3), [producerImpl] { return !producerImpl->getCnx().expired(); }); } + + static bool isTerminated(Producer producer) { + auto producerImpl = std::dynamic_pointer_cast(producer.impl_); + return producerImpl && producerImpl->state_ == HandlerBase::Terminated; + } }; } // namespace pulsar