引言:本文主要介绍线程池,并基于C++11实现。
背景
传统多线程方案是:接受一个任务之后,创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么将处于不停的创建线程,销毁线程的状态。我们将传统方案中的线程执行过程分为三个过程:线程创建时间+线程执行时间+线程销毁时间,如果线程执行时间很短的话,线程本身开销占的比重将会很大,这个开销将不可忽略。另外每个 Thread 都需要有一个内核线程的支持,也就意味着每个Thread都需要消耗一定的内核资源(如内核线程的栈空间),因为能创建的 Thread 是有限的,默认一个线程的线程栈大小是1M,如果每来一个任务就创建一个线程的话,1024个任务就会创建1024个线程,就会占用1个G的内存,很容易就系统崩溃了。
因此,线程池的出现正是着眼于线程本身的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| #pragma once
#include <vector> #include <queue> #include <thread> #include <atomic> #include <future> #include <functional>
class ThreadPool { public: using Ptr = std::shared_ptr<ThreadPool>;
ThreadPool(int idl_thr_num) : run_(true), idl_thr_num_(idl_thr_num) { for (int i = 0; i < idl_thr_num_; ++i) { pool_.emplace_back([this]{ while (run_) { Task task; { std::unique_lock<std::mutex> lock{ mutex_ }; cv_.wait(lock, [this]{ return !run_ || !task_.empty(); }); if (!run_ && task_.empty()) { return; } task = std::move(task_.front()); task_.pop(); } idl_thr_num_--; task(); idl_thr_num_++; } }); } }
~ThreadPool() { run_ = false; cv_.notify_all(); for (std::thread& thread : pool_) { if (thread.joinable()) { thread.join(); } } }
public: int idlCount() { return idl_thr_num_; }
template <class F, class... Args> auto commit(F&& f, Args&&... args)->std::future<decltype(f(args...))> { if (!run_) { throw std::runtime_error("commit on ThreadPool is stopped."); } using RetType = decltype(f(args...)); auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<RetType> future = task->get_future(); { std::lock_guard<std::mutex> lock{mutex_ }; task_.emplace([task](){ (*task)(); }); } cv_.notify_one(); return future; }
private: using Task = std::function<void()>; std::vector<std::thread> pool_; std::queue<Task> task_; std::mutex mutex_; std::condition_variable cv_; std::atomic<bool> run_; std::atomic<int> idl_thr_num_; };
|
简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #include "thread_pool.h" #include <iostream> #include <chrono>
std::mutex g_mutex;
void task(int i) { printf("正在执行第 %d 个任务, 线程id为 %d\n", i, std::this_thread::get_id()); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); printf("执行完成第 %d 个任务, 线程id为 %d\n", i, std::this_thread::get_id()); }
int main(int argc, char** argv) { std::cout << "主线程id为 " << std::this_thread::get_id() << std::endl; ThreadPool::Ptr thread_pool_ptr = std::make_shared<ThreadPool>(3); for (int i = 0; i < 3; ++i) { thread_pool_ptr->commit([i](){task(i);}); }
std::this_thread::sleep_for(std::chrono::milliseconds(10000)); }
|
代码链接:https://github.com/zsh4614/thread_pool