c++如何实现线程池 c++高并发编程实践【实例】

C++轻量线程池基于std::thread、std::queue、std::mutex和std::condition_variable实现,支持任务提交、原子状态控制与优雅关闭,核心包含线程安全队列、工作线程循环、条件变量等待及析构自动清理。

用 C++ 实现一个轻量、实用的线程池,核心是管理一组复用的线程来异步执行任务,避免频繁创建/销毁线程的开销。下面是一个基于 std::threadstd::queuestd::mutexstd::condition_variable 的简洁实现,支持任务提交、自动扩容(可选)、优雅关闭,已在 C++11 及以上环境验证可用。

线程池基础结构设计

线程池需包含:任务队列(线程安全)、工作线程集合、同步原语(互斥锁 + 条件变量)、运行状态控制。所有任务封装为 std::function,支持 lambda、函数指针、绑定对象等。

关键点:

  • 任务队列用 std::queue + std::mutex 保护,避免多线程竞争
  • std::condition_variable 让空闲线程等待新任务,而非忙等
  • 设置 m_stop 原子标志位,控制线程退出循环
  • 析构时调用 stop(),等待所有线程完成当前任务后退出

完整可运行代码示例

以下是一个最小可行线程池(无任务优先级、无动态扩缩容,但稳定高效):

立即学习“C++免费学习笔记(深入)”;

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

class ThreadPool {
public:
    explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
        : m_stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            m_workers.emplace_back([this] {
                while (true) {
                    std::function task;
                    {
                        std::unique_lock lock(m_queue_mutex);
                        m_condition.wait(lock, [this] { return m_stop.load() || !m_tasks.empty(); });
                        if (m_stop.load() && m_tasks.empty()) return;
                        task = std::move(m_tasks.front());
                        m_tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    template
    auto enqueue(F&& f, Args&&... args)
        -> std::future::type> {
        using return_type = typename std::result_of::type;

        auto task = std::make_shared>(
            std::bind(std::forward(f), std::forward(args)...)
        );

        std::future res = task->get_future();
        {
            std::unique_lock lock(m_queue_mutex);
            if (m_stop.load()) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            m_tasks.emplace([task]() { (*task)(); });
        }
        m_condition.notify_one();
        return res;
    }

    ~ThreadPool() {
        stop();
    }

private:
    void stop() {
        {
            std::unique_lock lock(m_queue_mutex);
            m_stop = true;
        }
        m_condition.notify_all();
        for (std::thread &t : m_workers) {
            if (t.joinable()) t.join();
        }
    }

    std::vector m_workers;
    std::queue> m_tasks;
    std::mutex m_queue_mutex;
    std::condition_variable m_condition;
    std::atomic m_stop;
};

高并发使用技巧与注意事项

在真实项目中提升线程池稳定性与性能,注意以下实践:

  • 避免任务阻塞过久:单个任务若长时间 I/O 或 sleep,会占用线程资源;可拆分为异步 I/O + 回调,或单独配 I/O 线程池
  • 限制队列长度:无界队列可能 OOM,建议添加 if (m_tasks.size() > MAX_QUEUE_SIZE) throw ... 防护
  • 返回 future 时注意生命周期:不要让 std::future 在线程池析构后被访问,否则会 std::terminate
  • 异常处理要在线程内捕获:worker 线程中未捕获异常会导致整个程序终止,可在 task 包装层加 try/catch
  • 考虑使用无锁队列(如 moodycamel::ConcurrentQueue):在极端高吞吐场景下减少锁争用

实际调用示例

提交普通函数、lambda、带返回值任务:

int main() {
    ThreadPool pool(4);

    // 提交无返回值任务
    pool.enqueue([]{ std::cout << "Hello from thread\n"; });

    // 提交带参数的 lambda
    pool.enqueue([](int x, const std::string& s) {
        std::cout << "x=" << x << ", s=" << s << "\n";
    }, 42, "world");

    // 提交有返回值任务,获取 future
    auto fut = pool.enqueue([] -> int {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 123;
    });

    std::cout << "Result: " << fut.get() << "\n"; // 阻塞等待结果

    return 0;
}

不复杂但容易忽略细节。按这个结构写,线程池就既安全又够用。