0%

C++11线程池

引言:本文主要介绍线程池,并基于C++11实现。

背景

传统多线程方案是:接受一个任务之后,创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么将处于不停的创建线程,销毁线程的状态。我们将传统方案中的线程执行过程分为三个过程:线程创建时间+线程执行时间+线程销毁时间,如果线程执行时间很短的话,线程本身开销占的比重将会很大,这个开销将不可忽略。另外每个 Thread 都需要有一个内核线程的支持,也就意味着每个Thread都需要消耗一定的内核资源(如内核线程的栈空间),因为能创建的 Thread 是有限的,默认一个线程的线程栈大小是1M,如果每来一个任务就创建一个线程的话,1024个任务就会创建1024个线程,就会占用1个G的内存,很容易就系统崩溃了。

因此,线程池的出现正是着眼于线程本身的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#pragma once

#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <future>
#include <functional>


class ThreadPool {
public:
using Ptr = std::shared_ptr<ThreadPool>;

ThreadPool(int idl_thr_num) : run_(true), idl_thr_num_(idl_thr_num) {
for (int i = 0; i < idl_thr_num_; ++i) {
pool_.emplace_back([this]{ // 工作线程函数
while (run_) {
Task task;
{
std::unique_lock<std::mutex> lock{ mutex_ };
cv_.wait(lock, [this]{ return !run_ || !task_.empty(); }); // 等待直到任务队列有任务或者线程池停止工作
if (!run_ && task_.empty()) {
return;
}
task = std::move(task_.front()); // 从任务队列首取出一个任务
task_.pop();
}
idl_thr_num_--;
task(); // 执行任务
idl_thr_num_++;
}
});
}
}

~ThreadPool() {
run_ = false;
cv_.notify_all();
for (std::thread& thread : pool_) {
if (thread.joinable()) {
thread.join();
}
}
}

public:
int idlCount() {
return idl_thr_num_;
}

template <class F, class... Args>
auto commit(F&& f, Args&&... args)->std::future<decltype(f(args...))> {
if (!run_) {
throw std::runtime_error("commit on ThreadPool is stopped.");
}
using RetType = decltype(f(args...)); // 函数f的返回值类型
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<RetType> future = task->get_future();
{
std::lock_guard<std::mutex> lock{mutex_ };
task_.emplace([task](){ (*task)(); }); // 添加任务到任务队列
}
cv_.notify_one(); // 唤醒一个线程
return future;
}

private:
using Task = std::function<void()>;
std::vector<std::thread> pool_; // 线程池
std::queue<Task> task_; // 任务队列
std::mutex mutex_; // 线程锁
std::condition_variable cv_; // 条件阻塞
std::atomic<bool> run_; // 线程池是否执行标志
std::atomic<int> idl_thr_num_; // 空闲线程数量
};

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include "thread_pool.h"
#include <iostream>
#include <chrono>

std::mutex g_mutex;

void task(int i) {
printf("正在执行第 %d 个任务, 线程id为 %d\n", i, std::this_thread::get_id());
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
printf("执行完成第 %d 个任务, 线程id为 %d\n", i, std::this_thread::get_id());
}

int main(int argc, char** argv) {
std::cout << "主线程id为 " << std::this_thread::get_id() << std::endl;
ThreadPool::Ptr thread_pool_ptr = std::make_shared<ThreadPool>(3);
for (int i = 0; i < 3; ++i) {
thread_pool_ptr->commit([i](){task(i);});
}

std::this_thread::sleep_for(std::chrono::milliseconds(10000));
}

代码链接:https://github.com/zsh4614/thread_pool