在Java中如何实现生产者消费者模型_Java经典并发模型实现解析

Java实现生产者消费者模型应优先使用BlockingQueue,因其天然线程安全且避免唤醒丢失、虚假唤醒等问题;需高级控制时才用ReentrantLock+Condition。

Java 中实现生产者消费者模型,核心不是手写 wait/notify,而是优先用 java.util.concurrent 包里现成的线程安全组件——它们已解决唤醒丢失、虚假唤醒、锁粒度、公平性等底层问题。

BlockingQueue 实现最简可靠版本

这是最推荐的起点:避免自己管理锁和条件变量,BlockingQueueput()take() 方法天然阻塞且线程安全。

  • ArrayBlockingQueue:固定容量、基于数组、可选公平策略,适合容量明确、注重吞吐或响应公平性的场景
  • LinkedBlockingQueue:默认无界(实际是 Integer.MAX_VALUE),注意 OOM 风险;有界构造时性能略低于 ArrayBlockingQueue
  • SynchronousQueue:不存储元素,每个 put() 必须等待配对 take(),适合“直传”型任务交接,常用于线程池的 DirectHandoff
class Producer implements Runnable {
    private final BlockingQueue queue;

    Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                String item = "item-" + i;
                System.out.println("Produced: " + item);
                queue.put(item); // 自动阻塞直到有空间
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;

    Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String item = queue.take(); // 自动阻塞直到有元素
                System.out.println("Consumed: " + item);
                if (item.equals("item-4")) break; // 简单退出条件
                Thread.sleep(150);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

为什么不用 wait()/notify() 手写?

除非教学或特殊调度逻辑,否则手写极易出错:

  • 必须在 synchronized 块内调用,否则抛 IllegalMonitorStateException
  • notify() 可能唤醒错误线程(比如多个生产者+多个消费者共用一把锁),应优先用 notifyAll()
  • 必须用 while 而非 if 检查条件,否则遭遇虚假唤醒(spurious wakeup)会直接跳过判断导致逻辑崩溃
  • 无法控制唤醒顺序,容易造成线程饥饿(如生产者一直抢到锁,消费者永远等不到)

需要自定义逻辑时:用 ReentrantLock + Condition

当需分离「生产就绪」和「消费就绪」两个等待队列,或需尝试获取、超时获取、中断响应等高级行为时,才考虑该组合。

  • 一个 Lock 对应多个 Condition:例如 notFullnotEmpty,避免 notifyAll() 唤醒无关线程
  • awaitNanos(long)awaitUntil(Date) 支持超时,比 wait(long) 更灵活
  • lockInterruptibly() 让阻塞中的线程能响应中断,比 synchronized 更可控
class BoundedBuffer {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final Object[] items;
    private int putIndex, takeIndex, count;

    BoundedBuffer(int capacity) {
        this.items = new Object[capacity];
    }

    public void put(T x) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await(); // 等待非满
            items[putIndex] = x;
            if (++putIndex == items.length) putIndex = 0;
            ++count;
            notEmpty.signal(); // 唤醒一个消费者
        } finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); // 等待非空
            Object x

= items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; --count; notFull.signal(); // 唤醒一个生产者 return (T) x; } finally { lock.unlock(); } } }

真正难的从来不是写出能跑的代码,而是让边界条件(如中断、容量耗尽、多线程竞争、JVM 优化重排序)下的行为完全可预期。用好 BlockingQueue 就规避了 90% 的坑;真要手写,ReentrantLock + Condition 是唯一值得投入精力的底层路径。