线程池管理工具(C++)

线程池管理工具(C++)

利用C++开发一个Linux系统下线程池管理类。优点如下:

  1. 非常方便的对多线程的数量进行管理;
  2. 能够执行我们所传入的任意函数;
  3. 执行过程中可以按优先级顺序执行。

解决方案如下:

  1. 对于线程池类,可以设置一个数组,根据传入多线程的数量进行空间的开辟。
  2. 可以将不统一的需要执行的一个函数进行打包,打包成一个统一的某一个类Task的对象。可以将这些对象放入一个队列缓冲区中,线程池可以从中选出函数进行执行。
  3. 可以使用优先队列,将任务按优先级进行排序。(待实现)

Task类

Task.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#ifndef _TASK_H
#define _TASK_H

#include <functional>
#include <iostream>
#include <algorithm>
#include <thread>

namespace myspace {
class Task {
public:
// Func_T -> 函数类型
// ARGS -> 函数参数
template<typename Func_T, typename ...ARGS>
Task(Func_T &&f, ARGS &&...args) {
this->func = std::bind(std::forward<Func_T>(f), std::forward<ARGS>(args)...);
}
void run();
private:
std::function<void()> func;
};
}

#endif

Task.cc

1
2
3
4
5
6
7
#include "Task.h"

using namespace myspace;

void Task::run() {
this->func();
}

thread_pool类

thread_pool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H

#include <thread>
#include <vector>
#include <algorithm>
#include <queue>
#include <condition_variable>
#include <mutex>
#include "Task.h"

namespace myspace {
class thread_pool {
public:
thread_pool(int);
void start();
void stop();
template<typename Func_T, typename ...ARGS>
void add_one_task(Func_T &&f, ARGS &&...args) {
__add_one_task(new Task(std::forward<Func_T>(f), std::forward<ARGS>(args)...));
}
~thread_pool();

private:
// thread_size -> 线程数量
// is_started -> 标记是否无限循环
// m_cond -> 当任务队列为空时进行等待
// m_mutex -> 与条件变量对应的互斥锁
// Threads -> 线程池数组,存储线程对象
// Tasks -> 任务
// thread_loop() -> 入口函数
// get_one_task() -> 取得任务
// __add_one_task(Task *) -> 添加任务
int thread_size;
bool is_started;
std::condition_variable m_cond;
std::mutex m_mutex;
std::vector<std::thread *> Threads;
std::queue<Task *> Tasks;
void thread_loop();
Task *get_one_task();
void __add_one_task(Task *);
};
}

#endif

thread_pool.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include "thread_pool.h"

using namespace myspace;

thread_pool::thread_pool(int thread_size = 1)
: thread_size(thread_size),
is_started(false),
m_mutex(),
m_cond() {}

void thread_pool::start() {
std::unique_lock<std::mutex> lock(m_mutex);
is_started = true;
for (int i = 0; i < this->thread_size; i ++) {
// 新建出来后已经在执行了
Threads.push_back(new std::thread(&thread_pool::thread_loop, this));
}
return ;
}

void thread_pool::stop() {
{
std::unique_lock<std::mutex> lock(m_mutex);
is_started = false;
m_cond.notify_all();
}
// 若这个时候这个线程不释放锁(也就是没有上面的大括号{}),则处于阻塞状态的线程仍然会处于阻塞状态
// 要解决这个问题则可以将上面三行代码放入一个函数,或者直接放入一个程序块{}中
// 当这个互斥锁脱离这个程序块的作用域后,将会被自动释放

for (int i = 0; i < Threads.size(); i ++) {
Threads[i]->join();
delete Threads[i];
}
Threads.clear();
return ;
}

thread_pool::~thread_pool() {
this->stop();
}

// 取出任务执行
void thread_pool::thread_loop() {
// 为 true 的话当前线程池对象没有调用 stop 方法,则一直循环
while (is_started) {
// 当任务队列为空并且线程池开始运行的时候会一直等待
// 否则若队列为空,线程池开始运行且这个时候 t 为 nullptr
// 则会一直循环,占用大量CPU资源,但这是没必要的
Task *t = get_one_task();
if (t != nullptr) {
std::cout << "thread_loop tid : " << std::this_thread::get_id() << std::endl;
t->run();
}
}
return ;
}

Task* thread_pool::get_one_task() {
// 当前线程条件变量所对应的互斥锁
std::unique_lock<std::mutex> lock(m_mutex);
// 当队列为空并且线程池正在运行的时候进行等待
while (Tasks.empty() && is_started) {
// 阻塞线程,将获取到的互斥锁传入,直到条件成立
// 等待有人添加任务
m_cond.wait(lock);
}
// 取得队首任务
Task *t = nullptr;
if (!Tasks.empty() && is_started) {
t = Tasks.front();
Tasks.pop();
}
return t;
}

void thread_pool::__add_one_task(Task *t) {
// 多线程情况下访问这个队列时需要首先获得互斥锁
std::unique_lock<std::mutex> lock(m_mutex);
Tasks.push(t);
// 通知所有正在等待条件执行的线程(get_one_task中),可以继续执行下去了
m_cond.notify_one();
return ;
}

主程序main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <cstdio>
#include <cstring>
#include <algorithm>
#include <cmath>
#include "Task.h"
#include "thread_pool.h"
#include "color.h"

using namespace std;
using namespace myspace;

void test_func(int x, int &y) {
y ++;
cout << "test_func : " << x << " " << y << endl << endl;
}

int main() {
int n = 100;
Task t(test_func, 1, ref(n));
thread_pool tp(7);
tp.start();
cout << GREEN_HL("Before Adding Tasks") << endl;
for (int i = 0; i < 10; i ++) {
//std::this_thread::sleep_for(std::chrono::milliseconds(200));
tp.add_one_task(test_func, i, ref(n));
}
cout << PURPLE_HL("Add task end") << endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
tp.stop();
cout << RED_HL("End of task") << endl;
return 0;
}

其他头文件color.h

仅仅是用于在终端输出有颜色的字符串,比如main.cpp文件中第31行cout << RED_HL("End of task") << endl;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#ifndef _COLOR_H
#define _COLOR_H

#define COLOR(a, b) "\033[" #b "m" a "\033[0m"
#define COLOR_HL(a, b) "\033[1;" #b "m" a "\033[0m"

#define GREEN(a) COLOR(a, 32)
#define RED(a) COLOR(a, 31)
#define BLUE(a) COLOR(a, 34)
#define YELLOW(a) COLOR(a, 33)
#define PURPLE(a) COLOR(a, 35)

#define GREEN_HL(a) COLOR_HL(a, 32)
#define RED_HL(a) COLOR_HL(a, 31)
#define BLUE_HL(a) COLOR_HL(a, 34)
#define YELLOW_HL(a) COLOR_HL(a, 33)
#define PURPLE_HL(a) COLOR_HL(a, 35)

#endif

使用方法

将以上头文件.h文件和相应的源文件.cc文件放入同一个文件夹,使用命令:

1
g++ -std=c++11 main.cpp Task.cc thread_pool.cc -o a.out -lpthread && ./a.out && rm a.out

执行结果

main.cpp文件main函数中加上std::this_thread::sleep_for(std::chrono::milliseconds(200));后,执行结果如图所示:

将其注释掉,执行结果如图所示:

可以看出,加与不加延时,线程执行顺序都是不确定的(可由线程tid看出),这是由于在多线程中执行顺序是由操作系统决定的,我们按顺序添加任务,但操作系统不一定按这个顺序执行。

但是加上延时的输出看起来比不加上延时的输出好很多,这是因为对于cout来说,他是线程不安全的,并不是一个原子操作,在多线程时很可能由于多个线程在同时输出,导致输出杂乱,加上延时后即可适当延缓这种情况。但这并没有从根本上解决问题,之后会进行一个线程安全的日志输出程序,会对此进行优化。