Skip to content

C++ 并发编程

概述

C++11引入了标准线程库,提供了跨平台的并发编程支持。并发编程允许程序同时执行多个任务,充分利用多核处理器的能力。本章介绍线程、同步原语、异步编程等并发编程技术。

🧵 线程基础

创建和管理线程

cpp
#include <iostream>
#include <thread>
#include <chrono>
#include <string>

class ThreadBasics {
public:
    // 简单的线程函数
    static void simpleTask(int id) {
        std::cout << "线程 " << id << " 开始执行" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        std::cout << "线程 " << id << " 执行完成" << std::endl;
    }
    
    // 带参数的线程函数
    static void taskWithParams(const std::string& name, int count) {
        for (int i = 0; i < count; ++i) {
            std::cout << name << ": " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    }
    
    static void basicThreadDemo() {
        std::cout << "=== 基础线程演示 ===" << std::endl;
        
        // 创建线程
        std::thread t1(simpleTask, 1);
        std::thread t2(simpleTask, 2);
        
        // 等待线程完成
        t1.join();
        t2.join();
        
        std::cout << "所有线程完成" << std::endl;
    }
    
    static void threadWithLambda() {
        std::cout << "\n=== Lambda线程 ===" << std::endl;
        
        int shared_data = 0;
        
        // 使用lambda创建线程
        std::thread t1([&shared_data]() {
            for (int i = 0; i < 5; ++i) {
                shared_data++;
                std::cout << "Lambda线程: " << shared_data << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(300));
            }
        });
        
        // 使用函数对象创建线程
        class ThreadTask {
        public:
            void operator()(const std::string& msg) {
                for (int i = 0; i < 3; ++i) {
                    std::cout << msg << " " << i << std::endl;
                    std::this_thread::sleep_for(std::chrono::milliseconds(400));
                }
            }
        };
        
        std::thread t2(ThreadTask(), "函数对象");
        
        t1.join();
        t2.join();
    }
    
    static void threadManagement() {
        std::cout << "\n=== 线程管理 ===" << std::endl;
        
        std::thread t(taskWithParams, "任务", 3);
        
        // 获取线程信息
        std::cout << "线程ID: " << t.get_id() << std::endl;
        std::cout << "硬件并发线程数: " << std::thread::hardware_concurrency() << std::endl;
        
        // 检查线程是否可join
        if (t.joinable()) {
            std::cout << "线程可以join" << std::endl;
            t.join();
        }
        
        // detach示例
        std::thread detached_thread([]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            std::cout << "分离的线程执行" << std::endl;
        });
        
        detached_thread.detach();  // 分离线程
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // 等待分离线程完成
    }
};

🔒 同步原语

互斥量和锁

cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <vector>
#include <chrono>

class SynchronizationDemo {
private:
    static std::mutex mtx_;
    static std::shared_mutex shared_mtx_;
    static int shared_counter_;
    static std::vector<int> shared_data_;
    
public:
    static void unsafeIncrement() {
        // 不安全的操作
        for (int i = 0; i < 100000; ++i) {
            shared_counter_++;
        }
    }
    
    static void safeIncrement() {
        // 使用lock_guard自动管理锁
        for (int i = 0; i < 100000; ++i) {
            std::lock_guard<std::mutex> lock(mtx_);
            shared_counter_++;
        }
    }
    
    static void uniqueLockDemo() {
        // unique_lock提供更灵活的锁管理
        std::unique_lock<std::mutex> lock(mtx_);
        
        shared_counter_ += 1000;
        
        // 可以提前释放锁
        lock.unlock();
        
        // 其他不需要锁的操作
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        
        // 重新获取锁
        lock.lock();
        shared_counter_ += 1000;
    }
    
    static void sharedLockDemo() {
        // 读操作使用共享锁
        std::shared_lock<std::shared_mutex> lock(shared_mtx_);
        
        std::cout << "读取共享数据大小: " << shared_data_.size() << std::endl;
        
        // 模拟读取操作
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    
    static void exclusiveLockDemo() {
        // 写操作使用独占锁
        std::unique_lock<std::shared_mutex> lock(shared_mtx_);
        
        shared_data_.push_back(shared_data_.size());
        std::cout << "写入数据,新大小: " << shared_data_.size() << std::endl;
        
        // 模拟写入操作
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
    
    static void mutexDemo() {
        std::cout << "=== 互斥量演示 ===" << std::endl;
        
        // 不安全的并发
        shared_counter_ = 0;
        std::vector<std::thread> unsafe_threads;
        
        for (int i = 0; i < 4; ++i) {
            unsafe_threads.emplace_back(unsafeIncrement);
        }
        
        for (auto& t : unsafe_threads) {
            t.join();
        }
        
        std::cout << "不安全操作结果: " << shared_counter_ << " (期望: 400000)" << std::endl;
        
        // 安全的并发
        shared_counter_ = 0;
        std::vector<std::thread> safe_threads;
        
        for (int i = 0; i < 4; ++i) {
            safe_threads.emplace_back(safeIncrement);
        }
        
        for (auto& t : safe_threads) {
            t.join();
        }
        
        std::cout << "安全操作结果: " << shared_counter_ << std::endl;
    }
    
    static void readerWriterDemo() {
        std::cout << "\n=== 读写锁演示 ===" << std::endl;
        
        std::vector<std::thread> threads;
        
        // 创建多个读线程
        for (int i = 0; i < 3; ++i) {
            threads.emplace_back(sharedLockDemo);
        }
        
        // 创建写线程
        threads.emplace_back(exclusiveLockDemo);
        threads.emplace_back(exclusiveLockDemo);
        
        // 再创建读线程
        for (int i = 0; i < 2; ++i) {
            threads.emplace_back(sharedLockDemo);
        }
        
        for (auto& t : threads) {
            t.join();
        }
    }
};

// 静态成员定义
std::mutex SynchronizationDemo::mtx_;
std::shared_mutex SynchronizationDemo::shared_mtx_;
int SynchronizationDemo::shared_counter_ = 0;
std::vector<int> SynchronizationDemo::shared_data_;

条件变量

cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

class ConditionVariableDemo {
private:
    static std::queue<int> data_queue_;
    static std::mutex queue_mutex_;
    static std::condition_variable condition_;
    static bool finished_;
    
public:
    static void producer(int id) {
        for (int i = 0; i < 5; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            
            {
                std::lock_guard<std::mutex> lock(queue_mutex_);
                int value = id * 100 + i;
                data_queue_.push(value);
                std::cout << "生产者 " << id << " 生产: " << value << std::endl;
            }
            
            // 通知等待的消费者
            condition_.notify_one();
        }
    }
    
    static void consumer(int id) {
        while (true) {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            
            // 等待数据或完成信号
            condition_.wait(lock, []() {
                return !data_queue_.empty() || finished_;
            });
            
            if (!data_queue_.empty()) {
                int value = data_queue_.front();
                data_queue_.pop();
                lock.unlock();
                
                std::cout << "消费者 " << id << " 消费: " << value << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(150));
            } else if (finished_) {
                break;
            }
        }
    }
    
    static void producerConsumerDemo() {
        std::cout << "=== 生产者消费者模式 ===" << std::endl;
        
        finished_ = false;
        
        // 创建生产者和消费者线程
        std::vector<std::thread> threads;
        
        // 2个生产者
        threads.emplace_back(producer, 1);
        threads.emplace_back(producer, 2);
        
        // 2个消费者
        threads.emplace_back(consumer, 1);
        threads.emplace_back(consumer, 2);
        
        // 等待生产者完成
        threads[0].join();
        threads[1].join();
        
        // 通知消费者结束
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            finished_ = true;
        }
        condition_.notify_all();
        
        // 等待消费者完成
        threads[2].join();
        threads[3].join();
        
        std::cout << "生产者消费者演示完成" << std::endl;
    }
    
    // 超时等待示例
    static void timeoutWaitDemo() {
        std::cout << "\n=== 超时等待演示 ===" << std::endl;
        
        std::mutex mtx;
        std::condition_variable cv;
        bool ready = false;
        
        std::thread worker([&]() {
            std::unique_lock<std::mutex> lock(mtx);
            
            // 等待3秒或收到通知
            auto now = std::chrono::system_clock::now();
            if (cv.wait_until(lock, now + std::chrono::seconds(3), [&]() { return ready; })) {
                std::cout << "收到通知" << std::endl;
            } else {
                std::cout << "等待超时" << std::endl;
            }
        });
        
        // 主线程等待2秒后发送通知
        std::this_thread::sleep_for(std::chrono::seconds(2));
        {
            std::lock_guard<std::mutex> lock(mtx);
            ready = true;
        }
        cv.notify_one();
        
        worker.join();
    }
};

// 静态成员定义
std::queue<int> ConditionVariableDemo::data_queue_;
std::mutex ConditionVariableDemo::queue_mutex_;
std::condition_variable ConditionVariableDemo::condition_;
bool ConditionVariableDemo::finished_ = false;

🔮 异步编程

future和promise

cpp
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <exception>

class AsyncProgrammingDemo {
public:
    static int computeSum(int start, int end) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        
        int sum = 0;
        for (int i = start; i <= end; ++i) {
            sum += i;
        }
        
        std::cout << "计算 " << start << " 到 " << end << " 的和: " << sum << std::endl;
        return sum;
    }
    
    static void futurePromiseDemo() {
        std::cout << "=== future和promise演示 ===" << std::endl;
        
        // 使用std::async
        auto future1 = std::async(std::launch::async, computeSum, 1, 1000);
        auto future2 = std::async(std::launch::async, computeSum, 1001, 2000);
        
        // 继续其他工作
        std::cout << "异步任务已启动,继续其他工作..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        
        // 获取结果
        int result1 = future1.get();
        int result2 = future2.get();
        
        std::cout << "总和: " << (result1 + result2) << std::endl;
    }
    
    static void promiseDemo() {
        std::cout << "\n=== promise演示 ===" << std::endl;
        
        std::promise<int> promise;
        std::future<int> future = promise.get_future();
        
        // 在另一个线程中设置值
        std::thread worker([&promise]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            
            try {
                int result = 42 * 42;
                promise.set_value(result);
            } catch (...) {
                promise.set_exception(std::current_exception());
            }
        });
        
        std::cout << "等待结果..." << std::endl;
        
        try {
            int result = future.get();
            std::cout << "从promise获取结果: " << result << std::endl;
        } catch (const std::exception& e) {
            std::cout << "异常: " << e.what() << std::endl;
        }
        
        worker.join();
    }
    
    static void sharedFutureDemo() {
        std::cout << "\n=== shared_future演示 ===" << std::endl;
        
        std::promise<std::string> promise;
        std::shared_future<std::string> shared_future = promise.get_future().share();
        
        auto worker = [](std::shared_future<std::string> sf, int id) {
            std::string result = sf.get();
            std::cout << "线程 " << id << " 获取结果: " << result << std::endl;
        };
        
        // 多个线程等待同一个结果
        std::thread t1(worker, shared_future, 1);
        std::thread t2(worker, shared_future, 2);
        std::thread t3(worker, shared_future, 3);
        
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        promise.set_value("共享的结果");
        
        t1.join();
        t2.join();
        t3.join();
    }
    
    static void packagedTaskDemo() {
        std::cout << "\n=== packaged_task演示 ===" << std::endl;
        
        // 创建packaged_task
        std::packaged_task<int(int, int)> task(computeSum);
        std::future<int> future = task.get_future();
        
        // 在线程中执行任务
        std::thread t(std::move(task), 1, 100);
        
        // 获取结果
        int result = future.get();
        std::cout << "packaged_task结果: " << result << std::endl;
        
        t.join();
    }
};

⚡ 原子操作

无锁编程

cpp
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>

class AtomicDemo {
private:
    static std::atomic<int> atomic_counter_;
    static std::atomic<bool> ready_;
    static std::atomic<std::string*> data_ptr_;
    
public:
    static void atomicBasics() {
        std::cout << "=== 原子操作基础 ===" << std::endl;
        
        atomic_counter_ = 0;
        
        auto increment = []() {
            for (int i = 0; i < 100000; ++i) {
                atomic_counter_++;  // 原子递增
            }
        };
        
        // 多个线程同时递增
        std::vector<std::thread> threads;
        for (int i = 0; i < 4; ++i) {
            threads.emplace_back(increment);
        }
        
        for (auto& t : threads) {
            t.join();
        }
        
        std::cout << "原子操作结果: " << atomic_counter_.load() << std::endl;
    }
    
    static void memoryOrdering() {
        std::cout << "\n=== 内存序演示 ===" << std::endl;
        
        std::atomic<int> x{0};
        std::atomic<int> y{0};
        std::atomic<int> r1{0};
        std::atomic<int> r2{0};
        
        auto thread1 = [&]() {
            x.store(1, std::memory_order_relaxed);
            r1.store(y.load(std::memory_order_relaxed), std::memory_order_relaxed);
        };
        
        auto thread2 = [&]() {
            y.store(1, std::memory_order_relaxed);
            r2.store(x.load(std::memory_order_relaxed), std::memory_order_relaxed);
        };
        
        // 多次测试内存序效果
        for (int i = 0; i < 100; ++i) {
            x = 0; y = 0; r1 = 0; r2 = 0;
            
            std::thread t1(thread1);
            std::thread t2(thread2);
            
            t1.join();
            t2.join();
            
            if (r1.load() == 0 && r2.load() == 0) {
                std::cout << "检测到重排序 (r1=" << r1.load() << ", r2=" << r2.load() << ")" << std::endl;
                break;
            }
        }
    }
    
    static void compareExchangeDemo() {
        std::cout << "\n=== compare_exchange演示 ===" << std::endl;
        
        std::atomic<int> value{10};
        
        auto worker = [&](int id, int expected, int desired) {
            int exp = expected;
            bool success = value.compare_exchange_weak(exp, desired);
            
            std::cout << "线程 " << id << ": ";
            if (success) {
                std::cout << "成功将 " << expected << " 改为 " << desired << std::endl;
            } else {
                std::cout << "失败,当前值是 " << exp << std::endl;
            }
        };
        
        std::vector<std::thread> threads;
        threads.emplace_back(worker, 1, 10, 20);
        threads.emplace_back(worker, 2, 10, 30);
        threads.emplace_back(worker, 3, 20, 40);
        
        for (auto& t : threads) {
            t.join();
        }
        
        std::cout << "最终值: " << value.load() << std::endl;
    }
    
    static void spinLockDemo() {
        std::cout << "\n=== 自旋锁演示 ===" << std::endl;
        
        class SpinLock {
        private:
            std::atomic<bool> locked_{false};
            
        public:
            void lock() {
                while (locked_.exchange(true, std::memory_order_acquire)) {
                    // 自旋等待
                    std::this_thread::yield();
                }
            }
            
            void unlock() {
                locked_.store(false, std::memory_order_release);
            }
        };
        
        SpinLock spin_lock;
        int shared_data = 0;
        
        auto worker = [&](int id) {
            for (int i = 0; i < 10000; ++i) {
                spin_lock.lock();
                shared_data++;
                spin_lock.unlock();
            }
        };
        
        auto start = std::chrono::high_resolution_clock::now();
        
        std::vector<std::thread> threads;
        for (int i = 0; i < 4; ++i) {
            threads.emplace_back(worker, i);
        }
        
        for (auto& t : threads) {
            t.join();
        }
        
        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        
        std::cout << "自旋锁结果: " << shared_data << std::endl;
        std::cout << "耗时: " << duration.count() << "ms" << std::endl;
    }
};

// 静态成员定义
std::atomic<int> AtomicDemo::atomic_counter_{0};
std::atomic<bool> AtomicDemo::ready_{false};
std::atomic<std::string*> AtomicDemo::data_ptr_{nullptr};

int main() {
    ThreadBasics::basicThreadDemo();
    ThreadBasics::threadWithLambda();
    ThreadBasics::threadManagement();
    
    SynchronizationDemo::mutexDemo();
    SynchronizationDemo::readerWriterDemo();
    
    ConditionVariableDemo::producerConsumerDemo();
    ConditionVariableDemo::timeoutWaitDemo();
    
    AsyncProgrammingDemo::futurePromiseDemo();
    AsyncProgrammingDemo::promiseDemo();
    AsyncProgrammingDemo::sharedFutureDemo();
    AsyncProgrammingDemo::packagedTaskDemo();
    
    AtomicDemo::atomicBasics();
    AtomicDemo::memoryOrdering();
    AtomicDemo::compareExchangeDemo();
    AtomicDemo::spinLockDemo();
    
    return 0;
}

总结

C++并发编程提供了丰富的工具来处理多线程和异步任务:

核心组件

  • std::thread: 线程创建和管理
  • 互斥量: mutex, shared_mutex等同步原语
  • 条件变量: condition_variable线程通信
  • future/promise: 异步任务结果传递
  • 原子操作: atomic无锁编程

同步机制

机制用途性能
mutex互斥访问中等
shared_mutex读写锁中等
condition_variable线程通信中等
atomic无锁操作

最佳实践

  • RAII锁管理: 使用lock_guard和unique_lock
  • 避免死锁: 固定锁顺序,使用超时机制
  • 减少锁竞争: 细粒度锁,读写分离
  • 异步优先: 优先使用async而非手动线程管理
  • 原子操作: 简单数据使用atomic避免锁开销

设计模式

  • 生产者消费者: 使用条件变量协调
  • 线程池: 复用线程资源
  • Future/Promise: 异步任务结果传递
  • 读写锁: 读多写少场景优化

注意事项

  • 数据竞争: 确保共享数据的线程安全
  • 内存序: 理解不同内存序的作用
  • 异常安全: 锁和资源的异常安全释放
  • 性能考虑: 平衡同步开销和并发收益

并发编程是现代C++的重要技能,正确使用能显著提升程序性能和响应性。

本站内容仅供学习和研究使用。