Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
* 默认所有element的isHold条件均为false,即不hold,即执行一次
* 可以根据需求,对任意element类型,添加特定的isHold条件
* */
} while (checkSuspend(), this->isHold() && status.isOK());
doAspect(internal::GAspectType::FINISH_RUN, status);
} while (checkSuspend(), isHold() && status.isOK());
(void)doAspect(internal::GAspectType::FINISH_RUN, status);
}

CGRAPH_THROW_EXCEPTION_BY_STATUS(checkRunResult())
Expand All @@ -264,21 +264,21 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
status = doAspect(internal::GAspectType::BEGIN_INIT);
CGRAPH_FUNCTION_CHECK_STATUS
status = init();
doAspect(internal::GAspectType::FINISH_INIT, status);
(void)doAspect(internal::GAspectType::FINISH_INIT, status);
break;
}
case CFunctionType::DESTROY: {
status = doAspect(internal::GAspectType::BEGIN_DESTROY);
CGRAPH_FUNCTION_CHECK_STATUS
status = destroy();
doAspect(internal::GAspectType::FINISH_DESTROY, status);
(void)doAspect(internal::GAspectType::FINISH_DESTROY, status);
break;
}
default:
CGRAPH_RETURN_ERROR_STATUS("get function type error")
}
} catch (const CException& ex) {
doAspect(internal::GAspectType::ENTER_CRASHED);
(void)doAspect(internal::GAspectType::ENTER_CRASHED);
status = crashed(ex);
}

Expand Down
15 changes: 13 additions & 2 deletions src/GraphCtrl/GraphMessage/GMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ template<typename T, CUInt capacity = CGRAPH_DEFAULT_RINGBUFFER_SIZE,
c_enable_if_t<std::is_base_of<GMessageParam, T>::value, int> = 0>
class GMessage : public GMessageObject {
public:
explicit GMessage(CUInt size = capacity) {
explicit GMessage(CUInt size = capacity,
const CIndex connId = 0) {
queue_.setCapacity(size);
conn_id_ = connId;
}

/**
Expand Down Expand Up @@ -65,8 +67,17 @@ class GMessage : public GMessageObject {
return queue_.getCapacity();
}

/**
* 获取 connId 信息
* @return
*/
CIndex getConnId() const {
return conn_id_;
}

private:
UAtomicRingBufferQueue<T, capacity> queue_;
UAtomicRingBufferQueue<T, capacity> queue_ {};
CIndex conn_id_ {0}; // 用于记录message的唯一标识,用于 pub-sub场景
};


Expand Down
52 changes: 40 additions & 12 deletions src/GraphCtrl/GraphMessage/GMessageManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class GMessageManager : public GMessageObject,
CStatus createTopic(const std::string& topic, CUInt size) {
CGRAPH_FUNCTION_BEGIN

auto innerTopic = internal::SEND_RECV_PREFIX + topic; // 中间做一层映射,用来区分是 PubSub的,还是SendRecv的
const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic; // 中间做一层映射,用来区分是 PubSub的,还是SendRecv的
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
auto result = send_recv_message_map_.find(innerTopic);
if (result != send_recv_message_map_.end()) {
Expand All @@ -60,7 +60,7 @@ class GMessageManager : public GMessageObject,
*/
CStatus removeTopic(const std::string& topic) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
Expand All @@ -80,16 +80,14 @@ class GMessageManager : public GMessageObject,
* @param value
* @param timeout
* @return
* @notice 这里的逻辑,跟上面的函数一致。里面调用了底层RingBuffer的同名不同入参的接口。
* 本人暂时没有能力完成接口的统一。如果有了解这一块内容的朋友,欢迎交流指正。
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recvTopicValue(const std::string& topic,
std::shared_ptr<TImpl>& value,
CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
Expand All @@ -116,7 +114,7 @@ class GMessageManager : public GMessageObject,
const std::shared_ptr<TImpl>& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
const std::string& innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
Expand All @@ -139,26 +137,55 @@ class GMessageManager : public GMessageObject,
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CIndex bindTopic(const std::string& topic, CUInt size) {
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
const std::string& innerTopic = internal::PUB_SUB_PREFIX + topic;

CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto message = CAllocator::safeMallocTemplateCObject<GMessage<TImpl>>(size);
CIndex connId = (++cur_conn_id_);
auto message = CAllocator::safeMallocTemplateCObject<GMessage<TImpl>>(size, connId);
auto result = pub_sub_message_map_.find(innerTopic);
if (result != pub_sub_message_map_.end()) {
// 如果之前有的话,则在后面添加一个
auto& messageSet = result->second;
messageSet.insert((GMessagePtr<T>)message);
} else {
// 如果是这个topic第一次被绑定,则创建一个对应的set信息
std::set<GMessagePtr<T>> messageSet;
std::set<GMessagePtr<T>> messageSet {};
messageSet.insert((GMessagePtr<T>)message);
pub_sub_message_map_[innerTopic] = messageSet;
}
conn_message_map_[connId] = (GMessagePtr<T>)message;
return connId;
}

/**
* 取消特定的订阅,bindTopic的反向函数
* @param topic
* @param connId
* @return
*/
CStatus detachConnId(const std::string& topic,
const CIndex connId) {
CGRAPH_FUNCTION_BEGIN
const std::string& innerTopic = internal::PUB_SUB_PREFIX + topic;

CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result == pub_sub_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}

auto& messageSet = result->second;
for (auto* msg : messageSet) {
if (connId == msg->getConnId()) {
CGRAPH_DELETE_PTR(msg);
conn_message_map_.erase(connId);
break;
}
}

CGRAPH_FUNCTION_END
}

/**
* 开始发送对应topic的信息
* @tparam TImpl
Expand Down Expand Up @@ -214,15 +241,15 @@ class GMessageManager : public GMessageObject,
*/
CStatus dropTopic(const std::string& topic) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
const std::string& innerTopic = internal::PUB_SUB_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result == pub_sub_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}

auto& messageSet = result->second;
for (auto msg : messageSet) {
for (auto* msg : messageSet) {
CGRAPH_DELETE_PTR(msg)
}
pub_sub_message_map_.erase(innerTopic);
Expand All @@ -246,11 +273,12 @@ class GMessageManager : public GMessageObject,
{
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
for (auto& cur : pub_sub_message_map_) {
for (auto iter : cur.second) {
for (auto* iter : cur.second) {
CGRAPH_DELETE_PTR(iter);
}
}
pub_sub_message_map_.clear();
conn_message_map_.clear();
cur_conn_id_ = 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/GraphCtrl/GraphMessage/GMessageUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ CGRAPH_NAMESPACE_BEGIN
#define CGRAPH_DROP_MESSAGE_TOPIC(topic) \
CGraph::GMessageManagerSingleton::get()->dropTopic(topic); \

/* 取消connId关联 */
#define CGRAPH_DETACH_CONN_ID(topic, connId) \
CGraph::GMessageManagerSingleton::get()->detachConnId(topic, connId); \


/****************************
* 以下针对所有功能
Expand Down
6 changes: 3 additions & 3 deletions src/UtilsCtrl/UtilsDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ inline CVoid CGRAPH_ECHO(const char *cmd, ...) {

static std::mutex echo_mtx;
std::lock_guard<std::mutex> lock{ echo_mtx };
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() % 1000;
const auto now = std::chrono::system_clock::now();
const auto time = std::chrono::system_clock::to_time_t(now);
const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() % 1000;
std::cout << "[" << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S.") \
<< std::setfill('0') << std::setw(3) << ms << "] ";

Expand Down
Loading