C++ 并发编程
概述
C++11引入了标准线程库,提供了跨平台的并发编程支持。并发编程允许程序同时执行多个任务,充分利用多核处理器的能力。本章介绍线程、同步原语、异步编程等并发编程技术。
🧵 线程基础
创建和管理线程
cpp
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
class ThreadBasics {
public:
// 简单的线程函数
static void simpleTask(int id) {
std::cout << "线程 " << id << " 开始执行" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "线程 " << id << " 执行完成" << std::endl;
}
// 带参数的线程函数
static void taskWithParams(const std::string& name, int count) {
for (int i = 0; i < count; ++i) {
std::cout << name << ": " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
static void basicThreadDemo() {
std::cout << "=== 基础线程演示 ===" << std::endl;
// 创建线程
std::thread t1(simpleTask, 1);
std::thread t2(simpleTask, 2);
// 等待线程完成
t1.join();
t2.join();
std::cout << "所有线程完成" << std::endl;
}
static void threadWithLambda() {
std::cout << "\n=== Lambda线程 ===" << std::endl;
int shared_data = 0;
// 使用lambda创建线程
std::thread t1([&shared_data]() {
for (int i = 0; i < 5; ++i) {
shared_data++;
std::cout << "Lambda线程: " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
});
// 使用函数对象创建线程
class ThreadTask {
public:
void operator()(const std::string& msg) {
for (int i = 0; i < 3; ++i) {
std::cout << msg << " " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
}
};
std::thread t2(ThreadTask(), "函数对象");
t1.join();
t2.join();
}
static void threadManagement() {
std::cout << "\n=== 线程管理 ===" << std::endl;
std::thread t(taskWithParams, "任务", 3);
// 获取线程信息
std::cout << "线程ID: " << t.get_id() << std::endl;
std::cout << "硬件并发线程数: " << std::thread::hardware_concurrency() << std::endl;
// 检查线程是否可join
if (t.joinable()) {
std::cout << "线程可以join" << std::endl;
t.join();
}
// detach示例
std::thread detached_thread([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "分离的线程执行" << std::endl;
});
detached_thread.detach(); // 分离线程
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 等待分离线程完成
}
};🔒 同步原语
互斥量和锁
cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <vector>
#include <chrono>
class SynchronizationDemo {
private:
static std::mutex mtx_;
static std::shared_mutex shared_mtx_;
static int shared_counter_;
static std::vector<int> shared_data_;
public:
static void unsafeIncrement() {
// 不安全的操作
for (int i = 0; i < 100000; ++i) {
shared_counter_++;
}
}
static void safeIncrement() {
// 使用lock_guard自动管理锁
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lock(mtx_);
shared_counter_++;
}
}
static void uniqueLockDemo() {
// unique_lock提供更灵活的锁管理
std::unique_lock<std::mutex> lock(mtx_);
shared_counter_ += 1000;
// 可以提前释放锁
lock.unlock();
// 其他不需要锁的操作
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// 重新获取锁
lock.lock();
shared_counter_ += 1000;
}
static void sharedLockDemo() {
// 读操作使用共享锁
std::shared_lock<std::shared_mutex> lock(shared_mtx_);
std::cout << "读取共享数据大小: " << shared_data_.size() << std::endl;
// 模拟读取操作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
static void exclusiveLockDemo() {
// 写操作使用独占锁
std::unique_lock<std::shared_mutex> lock(shared_mtx_);
shared_data_.push_back(shared_data_.size());
std::cout << "写入数据,新大小: " << shared_data_.size() << std::endl;
// 模拟写入操作
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
static void mutexDemo() {
std::cout << "=== 互斥量演示 ===" << std::endl;
// 不安全的并发
shared_counter_ = 0;
std::vector<std::thread> unsafe_threads;
for (int i = 0; i < 4; ++i) {
unsafe_threads.emplace_back(unsafeIncrement);
}
for (auto& t : unsafe_threads) {
t.join();
}
std::cout << "不安全操作结果: " << shared_counter_ << " (期望: 400000)" << std::endl;
// 安全的并发
shared_counter_ = 0;
std::vector<std::thread> safe_threads;
for (int i = 0; i < 4; ++i) {
safe_threads.emplace_back(safeIncrement);
}
for (auto& t : safe_threads) {
t.join();
}
std::cout << "安全操作结果: " << shared_counter_ << std::endl;
}
static void readerWriterDemo() {
std::cout << "\n=== 读写锁演示 ===" << std::endl;
std::vector<std::thread> threads;
// 创建多个读线程
for (int i = 0; i < 3; ++i) {
threads.emplace_back(sharedLockDemo);
}
// 创建写线程
threads.emplace_back(exclusiveLockDemo);
threads.emplace_back(exclusiveLockDemo);
// 再创建读线程
for (int i = 0; i < 2; ++i) {
threads.emplace_back(sharedLockDemo);
}
for (auto& t : threads) {
t.join();
}
}
};
// 静态成员定义
std::mutex SynchronizationDemo::mtx_;
std::shared_mutex SynchronizationDemo::shared_mtx_;
int SynchronizationDemo::shared_counter_ = 0;
std::vector<int> SynchronizationDemo::shared_data_;条件变量
cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
class ConditionVariableDemo {
private:
static std::queue<int> data_queue_;
static std::mutex queue_mutex_;
static std::condition_variable condition_;
static bool finished_;
public:
static void producer(int id) {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(queue_mutex_);
int value = id * 100 + i;
data_queue_.push(value);
std::cout << "生产者 " << id << " 生产: " << value << std::endl;
}
// 通知等待的消费者
condition_.notify_one();
}
}
static void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(queue_mutex_);
// 等待数据或完成信号
condition_.wait(lock, []() {
return !data_queue_.empty() || finished_;
});
if (!data_queue_.empty()) {
int value = data_queue_.front();
data_queue_.pop();
lock.unlock();
std::cout << "消费者 " << id << " 消费: " << value << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(150));
} else if (finished_) {
break;
}
}
}
static void producerConsumerDemo() {
std::cout << "=== 生产者消费者模式 ===" << std::endl;
finished_ = false;
// 创建生产者和消费者线程
std::vector<std::thread> threads;
// 2个生产者
threads.emplace_back(producer, 1);
threads.emplace_back(producer, 2);
// 2个消费者
threads.emplace_back(consumer, 1);
threads.emplace_back(consumer, 2);
// 等待生产者完成
threads[0].join();
threads[1].join();
// 通知消费者结束
{
std::lock_guard<std::mutex> lock(queue_mutex_);
finished_ = true;
}
condition_.notify_all();
// 等待消费者完成
threads[2].join();
threads[3].join();
std::cout << "生产者消费者演示完成" << std::endl;
}
// 超时等待示例
static void timeoutWaitDemo() {
std::cout << "\n=== 超时等待演示 ===" << std::endl;
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::thread worker([&]() {
std::unique_lock<std::mutex> lock(mtx);
// 等待3秒或收到通知
auto now = std::chrono::system_clock::now();
if (cv.wait_until(lock, now + std::chrono::seconds(3), [&]() { return ready; })) {
std::cout << "收到通知" << std::endl;
} else {
std::cout << "等待超时" << std::endl;
}
});
// 主线程等待2秒后发送通知
std::this_thread::sleep_for(std::chrono::seconds(2));
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one();
worker.join();
}
};
// 静态成员定义
std::queue<int> ConditionVariableDemo::data_queue_;
std::mutex ConditionVariableDemo::queue_mutex_;
std::condition_variable ConditionVariableDemo::condition_;
bool ConditionVariableDemo::finished_ = false;🔮 异步编程
future和promise
cpp
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <exception>
class AsyncProgrammingDemo {
public:
static int computeSum(int start, int end) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
int sum = 0;
for (int i = start; i <= end; ++i) {
sum += i;
}
std::cout << "计算 " << start << " 到 " << end << " 的和: " << sum << std::endl;
return sum;
}
static void futurePromiseDemo() {
std::cout << "=== future和promise演示 ===" << std::endl;
// 使用std::async
auto future1 = std::async(std::launch::async, computeSum, 1, 1000);
auto future2 = std::async(std::launch::async, computeSum, 1001, 2000);
// 继续其他工作
std::cout << "异步任务已启动,继续其他工作..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// 获取结果
int result1 = future1.get();
int result2 = future2.get();
std::cout << "总和: " << (result1 + result2) << std::endl;
}
static void promiseDemo() {
std::cout << "\n=== promise演示 ===" << std::endl;
std::promise<int> promise;
std::future<int> future = promise.get_future();
// 在另一个线程中设置值
std::thread worker([&promise]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
try {
int result = 42 * 42;
promise.set_value(result);
} catch (...) {
promise.set_exception(std::current_exception());
}
});
std::cout << "等待结果..." << std::endl;
try {
int result = future.get();
std::cout << "从promise获取结果: " << result << std::endl;
} catch (const std::exception& e) {
std::cout << "异常: " << e.what() << std::endl;
}
worker.join();
}
static void sharedFutureDemo() {
std::cout << "\n=== shared_future演示 ===" << std::endl;
std::promise<std::string> promise;
std::shared_future<std::string> shared_future = promise.get_future().share();
auto worker = [](std::shared_future<std::string> sf, int id) {
std::string result = sf.get();
std::cout << "线程 " << id << " 获取结果: " << result << std::endl;
};
// 多个线程等待同一个结果
std::thread t1(worker, shared_future, 1);
std::thread t2(worker, shared_future, 2);
std::thread t3(worker, shared_future, 3);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
promise.set_value("共享的结果");
t1.join();
t2.join();
t3.join();
}
static void packagedTaskDemo() {
std::cout << "\n=== packaged_task演示 ===" << std::endl;
// 创建packaged_task
std::packaged_task<int(int, int)> task(computeSum);
std::future<int> future = task.get_future();
// 在线程中执行任务
std::thread t(std::move(task), 1, 100);
// 获取结果
int result = future.get();
std::cout << "packaged_task结果: " << result << std::endl;
t.join();
}
};⚡ 原子操作
无锁编程
cpp
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
class AtomicDemo {
private:
static std::atomic<int> atomic_counter_;
static std::atomic<bool> ready_;
static std::atomic<std::string*> data_ptr_;
public:
static void atomicBasics() {
std::cout << "=== 原子操作基础 ===" << std::endl;
atomic_counter_ = 0;
auto increment = []() {
for (int i = 0; i < 100000; ++i) {
atomic_counter_++; // 原子递增
}
};
// 多个线程同时递增
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(increment);
}
for (auto& t : threads) {
t.join();
}
std::cout << "原子操作结果: " << atomic_counter_.load() << std::endl;
}
static void memoryOrdering() {
std::cout << "\n=== 内存序演示 ===" << std::endl;
std::atomic<int> x{0};
std::atomic<int> y{0};
std::atomic<int> r1{0};
std::atomic<int> r2{0};
auto thread1 = [&]() {
x.store(1, std::memory_order_relaxed);
r1.store(y.load(std::memory_order_relaxed), std::memory_order_relaxed);
};
auto thread2 = [&]() {
y.store(1, std::memory_order_relaxed);
r2.store(x.load(std::memory_order_relaxed), std::memory_order_relaxed);
};
// 多次测试内存序效果
for (int i = 0; i < 100; ++i) {
x = 0; y = 0; r1 = 0; r2 = 0;
std::thread t1(thread1);
std::thread t2(thread2);
t1.join();
t2.join();
if (r1.load() == 0 && r2.load() == 0) {
std::cout << "检测到重排序 (r1=" << r1.load() << ", r2=" << r2.load() << ")" << std::endl;
break;
}
}
}
static void compareExchangeDemo() {
std::cout << "\n=== compare_exchange演示 ===" << std::endl;
std::atomic<int> value{10};
auto worker = [&](int id, int expected, int desired) {
int exp = expected;
bool success = value.compare_exchange_weak(exp, desired);
std::cout << "线程 " << id << ": ";
if (success) {
std::cout << "成功将 " << expected << " 改为 " << desired << std::endl;
} else {
std::cout << "失败,当前值是 " << exp << std::endl;
}
};
std::vector<std::thread> threads;
threads.emplace_back(worker, 1, 10, 20);
threads.emplace_back(worker, 2, 10, 30);
threads.emplace_back(worker, 3, 20, 40);
for (auto& t : threads) {
t.join();
}
std::cout << "最终值: " << value.load() << std::endl;
}
static void spinLockDemo() {
std::cout << "\n=== 自旋锁演示 ===" << std::endl;
class SpinLock {
private:
std::atomic<bool> locked_{false};
public:
void lock() {
while (locked_.exchange(true, std::memory_order_acquire)) {
// 自旋等待
std::this_thread::yield();
}
}
void unlock() {
locked_.store(false, std::memory_order_release);
}
};
SpinLock spin_lock;
int shared_data = 0;
auto worker = [&](int id) {
for (int i = 0; i < 10000; ++i) {
spin_lock.lock();
shared_data++;
spin_lock.unlock();
}
};
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(worker, i);
}
for (auto& t : threads) {
t.join();
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "自旋锁结果: " << shared_data << std::endl;
std::cout << "耗时: " << duration.count() << "ms" << std::endl;
}
};
// 静态成员定义
std::atomic<int> AtomicDemo::atomic_counter_{0};
std::atomic<bool> AtomicDemo::ready_{false};
std::atomic<std::string*> AtomicDemo::data_ptr_{nullptr};
int main() {
ThreadBasics::basicThreadDemo();
ThreadBasics::threadWithLambda();
ThreadBasics::threadManagement();
SynchronizationDemo::mutexDemo();
SynchronizationDemo::readerWriterDemo();
ConditionVariableDemo::producerConsumerDemo();
ConditionVariableDemo::timeoutWaitDemo();
AsyncProgrammingDemo::futurePromiseDemo();
AsyncProgrammingDemo::promiseDemo();
AsyncProgrammingDemo::sharedFutureDemo();
AsyncProgrammingDemo::packagedTaskDemo();
AtomicDemo::atomicBasics();
AtomicDemo::memoryOrdering();
AtomicDemo::compareExchangeDemo();
AtomicDemo::spinLockDemo();
return 0;
}总结
C++并发编程提供了丰富的工具来处理多线程和异步任务:
核心组件
- std::thread: 线程创建和管理
- 互斥量: mutex, shared_mutex等同步原语
- 条件变量: condition_variable线程通信
- future/promise: 异步任务结果传递
- 原子操作: atomic无锁编程
同步机制
| 机制 | 用途 | 性能 |
|---|---|---|
| mutex | 互斥访问 | 中等 |
| shared_mutex | 读写锁 | 中等 |
| condition_variable | 线程通信 | 中等 |
| atomic | 无锁操作 | 高 |
最佳实践
- RAII锁管理: 使用lock_guard和unique_lock
- 避免死锁: 固定锁顺序,使用超时机制
- 减少锁竞争: 细粒度锁,读写分离
- 异步优先: 优先使用async而非手动线程管理
- 原子操作: 简单数据使用atomic避免锁开销
设计模式
- 生产者消费者: 使用条件变量协调
- 线程池: 复用线程资源
- Future/Promise: 异步任务结果传递
- 读写锁: 读多写少场景优化
注意事项
- 数据竞争: 确保共享数据的线程安全
- 内存序: 理解不同内存序的作用
- 异常安全: 锁和资源的异常安全释放
- 性能考虑: 平衡同步开销和并发收益
并发编程是现代C++的重要技能,正确使用能显著提升程序性能和响应性。