From 50275e93ee7e452cf51f1c13ddfeec44a3f33b75 Mon Sep 17 00:00:00 2001 From: Chunel Date: Mon, 16 Mar 2026 23:57:17 +0800 Subject: [PATCH 1/2] [feat] add CGRAPH_DETACH_CONN_ID --- src/GraphCtrl/GraphMessage/GMessage.h | 15 +++++- src/GraphCtrl/GraphMessage/GMessageManager.h | 52 +++++++++++++++----- src/GraphCtrl/GraphMessage/GMessageUtils.h | 4 ++ src/UtilsCtrl/UtilsDefine.h | 6 +-- 4 files changed, 60 insertions(+), 17 deletions(-) diff --git a/src/GraphCtrl/GraphMessage/GMessage.h b/src/GraphCtrl/GraphMessage/GMessage.h index b178395b..ecb40d80 100644 --- a/src/GraphCtrl/GraphMessage/GMessage.h +++ b/src/GraphCtrl/GraphMessage/GMessage.h @@ -20,8 +20,10 @@ template::value, int> = 0> class GMessage : public GMessageObject { public: - explicit GMessage(CUInt size = capacity) { + explicit GMessage(CUInt size = capacity, + const CIndex connId = 0) { queue_.setCapacity(size); + conn_id_ = connId; } /** @@ -65,8 +67,17 @@ class GMessage : public GMessageObject { return queue_.getCapacity(); } + /** + * 获取 connId 信息 + * @return + */ + CIndex getConnId() const { + return conn_id_; + } + private: - UAtomicRingBufferQueue queue_; + UAtomicRingBufferQueue queue_ {}; + CIndex conn_id_ {0}; // 用于记录message的唯一标识,用于 pub-sub场景 }; diff --git a/src/GraphCtrl/GraphMessage/GMessageManager.h b/src/GraphCtrl/GraphMessage/GMessageManager.h index a59bd896..733a1dc4 100644 --- a/src/GraphCtrl/GraphMessage/GMessageManager.h +++ b/src/GraphCtrl/GraphMessage/GMessageManager.h @@ -36,7 +36,7 @@ class GMessageManager : public GMessageObject, CStatus createTopic(const std::string& topic, CUInt size) { CGRAPH_FUNCTION_BEGIN - auto innerTopic = internal::SEND_RECV_PREFIX + topic; // 中间做一层映射,用来区分是 PubSub的,还是SendRecv的 + const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic; // 中间做一层映射,用来区分是 PubSub的,还是SendRecv的 CGRAPH_LOCK_GUARD lk(send_recv_mutex_); auto result = send_recv_message_map_.find(innerTopic); if (result != send_recv_message_map_.end()) { @@ -60,7 +60,7 @@ class GMessageManager : public GMessageObject, */ CStatus removeTopic(const std::string& topic) { CGRAPH_FUNCTION_BEGIN - auto innerTopic = internal::SEND_RECV_PREFIX + topic; + const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic; CGRAPH_LOCK_GUARD lk(send_recv_mutex_); auto result = send_recv_message_map_.find(innerTopic); if (result == send_recv_message_map_.end()) { @@ -80,8 +80,6 @@ class GMessageManager : public GMessageObject, * @param value * @param timeout * @return - * @notice 这里的逻辑,跟上面的函数一致。里面调用了底层RingBuffer的同名不同入参的接口。 - * 本人暂时没有能力完成接口的统一。如果有了解这一块内容的朋友,欢迎交流指正。 */ template::value, int> = 0> @@ -89,7 +87,7 @@ class GMessageManager : public GMessageObject, std::shared_ptr& value, CMSec timeout = CGRAPH_MAX_BLOCK_TTL) { CGRAPH_FUNCTION_BEGIN - auto innerTopic = internal::SEND_RECV_PREFIX + topic; + const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic; auto result = send_recv_message_map_.find(innerTopic); if (result == send_recv_message_map_.end()) { CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic"); @@ -116,7 +114,7 @@ class GMessageManager : public GMessageObject, const std::shared_ptr& value, GMessagePushStrategy strategy) { CGRAPH_FUNCTION_BEGIN - auto innerTopic = internal::SEND_RECV_PREFIX + topic; + const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic; auto result = send_recv_message_map_.find(innerTopic); if (result == send_recv_message_map_.end()) { CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic"); @@ -139,11 +137,11 @@ class GMessageManager : public GMessageObject, template::value, int> = 0> CIndex bindTopic(const std::string& topic, CUInt size) { - auto innerTopic = internal::PUB_SUB_PREFIX + topic; + const std::string& innerTopic = internal::PUB_SUB_PREFIX + topic; CGRAPH_LOCK_GUARD lk(pub_sub_mutex_); - auto message = CAllocator::safeMallocTemplateCObject>(size); CIndex connId = (++cur_conn_id_); + auto message = CAllocator::safeMallocTemplateCObject>(size, connId); auto result = pub_sub_message_map_.find(innerTopic); if (result != pub_sub_message_map_.end()) { // 如果之前有的话,则在后面添加一个 @@ -151,7 +149,7 @@ class GMessageManager : public GMessageObject, messageSet.insert((GMessagePtr)message); } else { // 如果是这个topic第一次被绑定,则创建一个对应的set信息 - std::set> messageSet; + std::set> messageSet {}; messageSet.insert((GMessagePtr)message); pub_sub_message_map_[innerTopic] = messageSet; } @@ -159,6 +157,35 @@ class GMessageManager : public GMessageObject, return connId; } + /** + * 取消特定的订阅,bindTopic的反向函数 + * @param topic + * @param connId + * @return + */ + CStatus detachConnId(const std::string& topic, + const CIndex connId) { + CGRAPH_FUNCTION_BEGIN + const std::string& innerTopic = internal::PUB_SUB_PREFIX + topic; + + CGRAPH_LOCK_GUARD lk(pub_sub_mutex_); + auto result = pub_sub_message_map_.find(innerTopic); + if (result == pub_sub_message_map_.end()) { + CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic"); + } + + auto& messageSet = result->second; + for (auto* msg : messageSet) { + if (connId == msg->getConnId()) { + CGRAPH_DELETE_PTR(msg); + conn_message_map_.erase(connId); + break; + } + } + + CGRAPH_FUNCTION_END + } + /** * 开始发送对应topic的信息 * @tparam TImpl @@ -214,7 +241,7 @@ class GMessageManager : public GMessageObject, */ CStatus dropTopic(const std::string& topic) { CGRAPH_FUNCTION_BEGIN - auto innerTopic = internal::PUB_SUB_PREFIX + topic; + const std::string& innerTopic = internal::PUB_SUB_PREFIX + topic; CGRAPH_LOCK_GUARD lk(pub_sub_mutex_); auto result = pub_sub_message_map_.find(innerTopic); if (result == pub_sub_message_map_.end()) { @@ -222,7 +249,7 @@ class GMessageManager : public GMessageObject, } auto& messageSet = result->second; - for (auto msg : messageSet) { + for (auto* msg : messageSet) { CGRAPH_DELETE_PTR(msg) } pub_sub_message_map_.erase(innerTopic); @@ -246,11 +273,12 @@ class GMessageManager : public GMessageObject, { CGRAPH_LOCK_GUARD lk(pub_sub_mutex_); for (auto& cur : pub_sub_message_map_) { - for (auto iter : cur.second) { + for (auto* iter : cur.second) { CGRAPH_DELETE_PTR(iter); } } pub_sub_message_map_.clear(); + conn_message_map_.clear(); cur_conn_id_ = 0; } diff --git a/src/GraphCtrl/GraphMessage/GMessageUtils.h b/src/GraphCtrl/GraphMessage/GMessageUtils.h index 2f967e22..19eca0ac 100644 --- a/src/GraphCtrl/GraphMessage/GMessageUtils.h +++ b/src/GraphCtrl/GraphMessage/GMessageUtils.h @@ -59,6 +59,10 @@ CGRAPH_NAMESPACE_BEGIN #define CGRAPH_DROP_MESSAGE_TOPIC(topic) \ CGraph::GMessageManagerSingleton::get()->dropTopic(topic); \ +/* 取消connId关联 */ +#define CGRAPH_DETACH_CONN_ID(topic, connId) \ + CGraph::GMessageManagerSingleton::get()->detachConnId(topic, connId); \ + /**************************** * 以下针对所有功能 diff --git a/src/UtilsCtrl/UtilsDefine.h b/src/UtilsCtrl/UtilsDefine.h index f1038b78..c598ad9f 100644 --- a/src/UtilsCtrl/UtilsDefine.h +++ b/src/UtilsCtrl/UtilsDefine.h @@ -141,9 +141,9 @@ inline CVoid CGRAPH_ECHO(const char *cmd, ...) { static std::mutex echo_mtx; std::lock_guard lock{ echo_mtx }; - auto now = std::chrono::system_clock::now(); - auto time = std::chrono::system_clock::to_time_t(now); - auto ms = std::chrono::duration_cast(now.time_since_epoch()).count() % 1000; + const auto now = std::chrono::system_clock::now(); + const auto time = std::chrono::system_clock::to_time_t(now); + const auto ms = std::chrono::duration_cast(now.time_since_epoch()).count() % 1000; std::cout << "[" << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S.") \ << std::setfill('0') << std::setw(3) << ms << "] "; From 7de9470048a5d0d647b6bc33c546adf73be42f1f Mon Sep 17 00:00:00 2001 From: Chunel Date: Tue, 17 Mar 2026 00:13:04 +0800 Subject: [PATCH 2/2] [chron] add (void) to avoid no get return function --- src/GraphCtrl/GraphElement/GElement.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/GraphCtrl/GraphElement/GElement.cpp b/src/GraphCtrl/GraphElement/GElement.cpp index b4a8061b..cb35007c 100644 --- a/src/GraphCtrl/GraphElement/GElement.cpp +++ b/src/GraphCtrl/GraphElement/GElement.cpp @@ -251,8 +251,8 @@ CStatus GElement::fatProcessor(const CFunctionType& type) { * 默认所有element的isHold条件均为false,即不hold,即执行一次 * 可以根据需求,对任意element类型,添加特定的isHold条件 * */ - } while (checkSuspend(), this->isHold() && status.isOK()); - doAspect(internal::GAspectType::FINISH_RUN, status); + } while (checkSuspend(), isHold() && status.isOK()); + (void)doAspect(internal::GAspectType::FINISH_RUN, status); } CGRAPH_THROW_EXCEPTION_BY_STATUS(checkRunResult()) @@ -264,21 +264,21 @@ CStatus GElement::fatProcessor(const CFunctionType& type) { status = doAspect(internal::GAspectType::BEGIN_INIT); CGRAPH_FUNCTION_CHECK_STATUS status = init(); - doAspect(internal::GAspectType::FINISH_INIT, status); + (void)doAspect(internal::GAspectType::FINISH_INIT, status); break; } case CFunctionType::DESTROY: { status = doAspect(internal::GAspectType::BEGIN_DESTROY); CGRAPH_FUNCTION_CHECK_STATUS status = destroy(); - doAspect(internal::GAspectType::FINISH_DESTROY, status); + (void)doAspect(internal::GAspectType::FINISH_DESTROY, status); break; } default: CGRAPH_RETURN_ERROR_STATUS("get function type error") } } catch (const CException& ex) { - doAspect(internal::GAspectType::ENTER_CRASHED); + (void)doAspect(internal::GAspectType::ENTER_CRASHED); status = crashed(ex); }