Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/GraphCtrl/GraphElement/GElementDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

CGRAPH_NAMESPACE_BEGIN

const static CSize CGRAPH_DEFAULT_LOOP_TIMES = 1; // 默认循环次数信息
const static CLevel CGRAPH_DEFAULT_ELEMENT_LEVEL = 0; // 默认的element级别,用于控制init函数
const static CIndex CGRAPH_DEFAULT_BINDING_INDEX = -1; // 默认绑定线程id,-1表示不绑定
const static CMSec CGRAPH_DEFAULT_ELEMENT_TIMEOUT = 0; // 默认element的超时时间
const static CSize CGRAPH_DEFAULT_LOOP_TIMES = 1; // 默认循环次数信息
const static CLevel CGRAPH_DEFAULT_ELEMENT_LEVEL = 0; // 默认的element级别,用于控制init函数
const static CMSec CGRAPH_DEFAULT_ELEMENT_TIMEOUT = 0; // 默认element的超时时间
const static CIndex CGRAPH_DEFAULT_BINDING_INDEX = CGRAPH_DEFAULT_TASK_STRATEGY; // 默认绑定线程id,-1表示不绑定

enum class GElementType {
ELEMENT = 0x00000000, // 元素
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ CGRAPH_NAMESPACE_BEGIN
template<GMultiConditionType type>
GMultiCondition<type>::GMultiCondition() {
element_type_ = GElementType::MULTI_CONDITION;
if (type == GMultiConditionType::PARALLEL) {
// 多并发的情况下,需要触发线程唤醒
binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
}
session_ = URandom<>::generateSession(CGRAPH_STR_MULTI_CONDITION);
}

Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CGRAPH_NAMESPACE_BEGIN
GMutable::GMutable() {
element_type_ = GElementType::MUTABLE;
session_ = URandom<>::generateSession(CGRAPH_STR_MUTABLE);
binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GElementManager)
}

Expand Down
6 changes: 3 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

CGRAPH_NAMESPACE_BEGIN

GRegion::GRegion() : GGroup() {
is_init_ = false;
manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GElementManager)
GRegion::GRegion() {
element_type_ = GElementType::REGION;
binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
session_ = URandom<>::generateSession(CGRAPH_STR_REGION);
manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GElementManager)
}


Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CGRAPH_NAMESPACE_BEGIN
template<CInt TriggerNum>
GSome<TriggerNum>::GSome() {
element_type_ = GElementType::SOME;
binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
session_ = URandom<>::generateSession(CGRAPH_STR_SOME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ CVoid GDynamicEngine::commonRunAll() {
*/
finished_end_size_ = 0;
for (const auto& element : front_element_arr_) {
process(element, element == front_element_arr_.back());
process(element, element == front_element_arr_.back() && element->isDefaultBinding());
}

fatWait();
Expand Down Expand Up @@ -257,7 +257,7 @@ CVoid GDynamicEngine::parallelRunAll() {

if (parallel_element_matrix_.size() < static_cast<CSize>(thread_pool_->getConfig().default_thread_size_)) {
// 确保所有的 pt 都可以被唤醒,从而快速执行
thread_pool_->wakeupAllThread();
(void)thread_pool_->wakeupAllThread();
}

{
Expand Down
5 changes: 2 additions & 3 deletions src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class UAtomicQueue : public UQueueObject {
* 阻塞式等待弹出
* @return
*/
std::unique_ptr<T> popWithTimeout(CMSec ms) {
std::unique_ptr<T> popWithTimeout(const CMSec ms) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
if (!cv_.wait_for(lk, std::chrono::milliseconds(ms),
[this] { return (!queue_.empty()) || (!ready_flag_); })) {
Expand Down Expand Up @@ -123,9 +123,8 @@ class UAtomicQueue : public UQueueObject {
queue_.push(std::move(task));
mutex_.unlock();
break;
} else {
CGRAPH_YIELD();
}
CGRAPH_YIELD();
}
cv_.notify_one();
}
Expand Down
35 changes: 20 additions & 15 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ class UThreadBase : public UThreadObject {
* 清空所有任务内容
*/
CVoid reset() {
done_ = false;
{
// 这里必须要加 lock,避免退出的时候,cv_.wait_for() 的 pred竞争
CGRAPH_UNIQUE_LOCK lk(mutex_);
done_ = false;
}

cv_.notify_one(); // 防止主线程 wait时间过长,导致的结束缓慢问题
if (thread_.joinable()) {
thread_.join(); // 等待线程结束
Expand Down Expand Up @@ -176,9 +181,9 @@ class UThreadBase : public UThreadObject {
policy = config_->secondary_thread_policy_;
}

auto handle = thread_.native_handle();
sched_param param = { calcPriority(priority) };
int ret = pthread_setschedparam(handle, calcPolicy(policy), &param);
const auto handle = thread_.native_handle();
const sched_param& param = { calcPriority(priority) };
const int ret = pthread_setschedparam(handle, calcPolicy(policy), &param);
if (0 != ret) {
CGRAPH_ECHO("warning : set thread sched param failed, system error code is [%d]", ret);
}
Expand Down Expand Up @@ -214,7 +219,7 @@ class UThreadBase : public UThreadObject {
* @param policy
* @return
*/
static CInt calcPolicy(int policy) {
static CInt calcPolicy(const int policy) {
return (CGRAPH_THREAD_SCHED_OTHER == policy
|| CGRAPH_THREAD_SCHED_RR == policy
|| CGRAPH_THREAD_SCHED_FIFO == policy)
Expand All @@ -228,25 +233,25 @@ class UThreadBase : public UThreadObject {
* @param priority
* @return
*/
static CInt calcPriority(int priority) {
static CInt calcPriority(const int priority) {
return (priority >= CGRAPH_THREAD_MIN_PRIORITY
&& priority <= CGRAPH_THREAD_MAX_PRIORITY)
? priority : CGRAPH_THREAD_MIN_PRIORITY;
}


protected:
CBool done_; // 线程状态标记
CBool is_init_; // 标记初始化状态
CBool is_running_; // 是否正在执行
CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程)
CULong total_task_num_ = 0; // 处理的任务的数字
CBool done_; // 线程状态标记
CBool is_init_; // 标记初始化状态
CBool is_running_; // 是否正在执行
CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程)
CULong total_task_num_ = 0; // 处理的任务的数字

UAtomicQueue<UTask>* pool_task_queue_; // 用于存放线程池中的普通任务
UAtomicPriorityQueue<UTask>* pool_priority_task_queue_; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行
UThreadPoolConfigPtr config_ = nullptr; // 配置参数信息
UAtomicQueue<UTask>* pool_task_queue_ { nullptr }; // 用于存放线程池中的普通任务
UAtomicPriorityQueue<UTask>* pool_priority_task_queue_ { nullptr }; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行
UThreadPoolConfigPtr config_ { nullptr }; // 配置参数信息

std::thread thread_; // 线程类
std::thread thread_; // 线程类
std::mutex mutex_;
std::condition_variable cv_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class UThreadPrimary : public UThreadBase {
if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_),
[this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !done_; });
[this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !pool_task_queue_->empty() || !done_; });
cur_empty_epoch_ = 0;
}
}
Expand Down
19 changes: 16 additions & 3 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ CIndex UThreadPool::dispatch(const CIndex origIndex) {
CIndex realIndex = 0;
if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) {
realIndex = cur_index_++;
if (realIndex >= 0 && realIndex < config_.default_thread_size_
&& primary_threads_[realIndex]->is_running_) {
// 如果是默认调度,并且被放置到 正在running 的pt中,则切换为 trigger_one 的策略,防止阻塞
realIndex = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
}

if (cur_index_ >= config_.max_thread_size_ || cur_index_ < 0) {
cur_index_ = 0;
}
Expand Down Expand Up @@ -269,14 +275,21 @@ CVoid UThreadPool::monitor() {
}


CVoid UThreadPool::wakeupAllThread() const {
CSize UThreadPool::wakeupAllThread() const {
CSize size = 0;
for (auto& pt : primary_threads_) {
pt->wakeup();
if (pt->wakeup()) {
++size;
}
}

for (auto& st : secondary_threads_) {
st->wakeup();
if (st->wakeup()) {
++size;
}
}

return size;
}

CGRAPH_NAMESPACE_END
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class UThreadPool : public UThreadObject {
* 通知所有thread 开启
* @return
*/
CVoid wakeupAllThread() const;
CSize wakeupAllThread() const;

protected:
/**
Expand Down
4 changes: 3 additions & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "UThreadPool.h"

CGRAPH_NAMESPACE_BEGIN

template<typename FunctionType>
auto UThreadPool::commit(const FunctionType& func, CIndex index)
-> std::future<decltype(std::declval<FunctionType>()())> {
Expand Down Expand Up @@ -62,6 +61,9 @@ CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) {
primary_threads_[realIndex]->pushTask(std::forward<FunctionType>(task));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
priority_task_queue_.push(std::forward<FunctionType>(task), CGRAPH_LONG_TIME_TASK_STRATEGY);
} else if (CGRAPH_TRIGGER_ALL_THREAD_STRATEGY == realIndex) {
task_queue_.push(std::forward<FunctionType>(task));
(void)wakeupAllThread();
} else {
task_queue_.push(std::forward<FunctionType>(task));
}
Expand Down
7 changes: 4 additions & 3 deletions src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ static const CUInt CGRAPH_DEFAULT_RINGBUFFER_SIZE = 64;
static const CIndex CGRAPH_MAIN_THREAD_ID = -1; // 启动线程id标识(非上述主线程)
static const CIndex CGRAPH_SECONDARY_THREAD_COMMON_ID = -2; // 辅助线程统一id标识

static const CInt CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略
static const CInt CGRAPH_POOL_TASK_STRATEGY = -2; // 固定用pool中的队列的调度策略
static const CInt CGRAPH_LONG_TIME_TASK_STRATEGY = -101; // 长时间任务调度策略
static const CIndex CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略
static const CIndex CGRAPH_POOL_TASK_STRATEGY = -2; // 固定用pool中的队列的调度策略
static const CIndex CGRAPH_LONG_TIME_TASK_STRATEGY = -101; // 长时间任务调度策略
static const CIndex CGRAPH_TRIGGER_ALL_THREAD_STRATEGY = -102; // 触发线程逻辑调度策略

/**
* 以下为线程池配置信息
Expand Down
Loading