在Java中如何实现生产者消费者模式

Java中实现生产者消费者模式的核心是通过线程间通信协调共享缓冲区的访问,常用方法有三种:1. 使用synchronized配合wait()和notify()实现基础同步;2. 使用BlockingQueue如ArrayBlockingQueue,利用其内置阻塞机制简化开发;3. 使用Lock与Condition提供更灵活的多条件等待控制。其中BlockingQueue最推荐用于实际开发,而wait/notify和Lock/Condition更适合理解底层原理或定制场景。

在Java中实现生产者消费者模式,核心是让生产者线程向共享缓冲区添加数据,消费者线程从中取出数据,同时保证线程安全和避免资源浪费。关键在于线程间的协调与通信,常用方式包括使用 wait()notify()BlockingQueueLockCondition。下面介绍几种典型实现方法。

使用 wait() 和 notify()

这是最基础的实现方式,依赖 synchronized 关键字控制同步,通过 wait() 让线程等待,notify() 唤醒等待线程。

定义一个共享的缓冲区(例如容量为1的队列):

class Buffer {
    private int data;
    private boolean isEmpty = true;

    public synchronized void put(int value) {
        while (!isEmpty) {
            try {
                wait(); // 缓冲区非空,生产者等待
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        data = value;
        isEmpty = false;
        notify(); // 唤醒消费者
    }

    public synchronized int take() {
    

while (isEmpty) { try { wait(); // 缓冲区为空,消费者等待 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } isEmpty = true; notify(); // 唤醒生产者 return data; } }

生产者和消费者分别作为线程运行:

Buffer buffer = new Buffer();

Thread producer = new Thread(() -> {
    for (int i = 0; i < 5; i++) {
        buffer.put(i);
        System.out.println("生产: " + i);
    }
});

Thread consumer = new Thread(() -> {
    for (int i = 0; i < 5; i++) {
        int value = buffer.take();
        System.out.println("消费: " + value);
    }
});

producer.start();
consumer.start();

使用 BlockingQueue

更推荐的方式是使用 java.util.concurrent 包中的 BlockingQueue,它内部已实现线程安全和阻塞操作。

常见实现类有 ArrayBlockingQueue(有界队列)和 LinkedBlockingQueue(可选有界)。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

BlockingQueue queue = new ArrayBlockingQueue<>(10);

Thread producer = new Thread(() -> {
    for (int i = 0; i < 5; i++) {
        try {
            queue.put(i); // 队列满时自动阻塞
            System.out.println("生产: " + i);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
});

Thread consumer = new Thread(() -> {
    for (int i = 0; i < 5; i++) {
        try {
            Integer value = queue.take(); // 队列空时自动阻塞
            System.out.println("消费: " + value);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
});

producer.start();
consumer.start();

使用 Lock 和 Condition

这种方式比 synchronized 更灵活,可以创建多个条件变量,适用于复杂场景。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.*;

class BufferWithLock {
    private final Queue queue = new LinkedList<>();
    private final int MAX_SIZE = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public void put(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == MAX_SIZE) {
                notFull.await(); // 队列满,等待
            }
            queue.offer(value);
            notEmpty.signal(); // 唤醒消费者
        } finally {
            lock.unlock();
        }
    }

    public int take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 队列空,等待
            }
            int value = queue.poll();
            notFull.signal(); // 唤醒生产者
            return value;
        } finally {
            lock.unlock();
        }
    }
}

使用方式与前面类似,创建线程调用 put 和 take 方法即可。

基本上就这些。BlockingQueue 是最简洁可靠的实现,适合大多数情况;而 wait/notify 和 Lock/Condition 更适合学习原理或定制逻辑。