回顾Java中经典的生产/消费者模型

Posted by Dorck on April 29, 2023

概述

生产者-消费者模型是 Java 并发编程中比较常见的加锁应用场景之一,以下是维基百科的对于该名词的定义:

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

问题分析

java_producer_consumer

结合上图及定义我们可以提取出该模型具有以下特点:

  • 生产者和消费者可以同时并发运作
  • 缓冲区满则阻塞等待消费者消费数据
  • 缓冲区空则阻塞等待生产者生产数据

我们知道,生产者和消费者可以同时运作,并且二者执行效率和规模也很有可能出现严重的不对等性。那么为了保证生产和消费操作的原子性和共享数据的可见性,我们需要借助一种同步机制来保障该模型的正常运行。

实现方式

以下来介绍几种常见的生产-消费模型的实现方式。

1. synchorized + wait/notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class GoodsStorageDispatcher {
    // 仓库容纳上限
    private static final int MAX_SIZE = 100;
    // 仓库容纳的货物集合
    private LinkedList<Good> list = new LinkedList();


    public void produce(Good good) {
        synchronized (list) {
            while (list.size() == MAX_SIZE) {
                System.out.println("仓库已满 >> 生产暂停,等待消费");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(good);
            System.out.println("生产了一个新产品,现库存为:" + list.size());
            list.notifyAll();
        }
    }
    public void consume() {
        synchronized (list) {
            while (list.size() == 0) {
                System.out.println("库存已清仓 >> 消费暂停,等待生产");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.remove();
            System.out.println("消费了一个产品,现库存为:" + list.size());
            list.notifyAll();
        }
    }

}

可以看到,我们采用对象锁以及等待/唤醒的方式来实现生产和消费的同步机制。这里的 wait 和 notifyAll 在示例中含义如下:

  • wait:在缓冲区满/空的情况下先挂起生产/消费线程并释放锁,进入等待状态,让其他线程执行
  • notify:当生产/消费一个商品时,通知其他等待的线程继续执行并释放锁,进入等待状态

值得注意的是,produce 和 consume 方法中都使用了 while 循环来判断缓冲区的存储状态(空/满),这样处理的原因是什么呢,为何不直接用 if 呢?

其实这里用 while 是为了防止虚假唤醒,我们结合一个例子更好理解一些:

fake_notify

如上图所示,假设我们现在有 1 个生产者,3 个消费者在执行生产和消费任务,而目前缓冲区(Warehouse)仅有一个数据:A。

如果当 Consumer-1 消费了 A 后,缓冲区就为空了,Consumer-2 和 Consumer-3 从缓冲区取数据消费时就会陷入等待状态(因为 Consumer-2 消费时缓冲区为空会执行 wait 方法并释放锁,Consumer-3也会重蹈覆辙)。而此时又有一个生产者 Producer-2 生产了数据 B 并加入缓冲区,通过 notifyAll 去唤醒所有等待的消费者,假设消费者 Consumer-2 优先抢到了使用权将 B 消费后缓冲区又恢复到空空如也的状态。接下来的情况就值得注意了:消费者 Consumer-3 想终于可以消费了,然而由于使用的是 if,所以唤醒后继续执行到 list.remove(),毫无疑问的抛出了 IndexOutOfBoundsException,因为此时 list 为空了,这就是所谓的“虚假唤醒”。其实这里我们不能直接消费数据,而是要继续等待。因此这里使用 while 循环判断,当唤醒继续执行代码时重新进入 while 内判断缓冲区数据是否为空。

2. Lock + Condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class GoodsStorageDispatcher {
    // 仓库容纳上限
    private static final int MAX_SIZE = 100;
    // 仓库容纳的货物集合
    private LinkedList<Good> list = new LinkedList();
    // 锁
    final Lock lock = new ReentrantLock();
    // 用于等待或唤醒线程(仓库满后需要等待被消费,成功消费后需要唤醒线程继续生产)
    final Condition notFull = lock.newCondition();
    // 用于等待或唤醒线程(仓库清仓后需要等待生产,生产成功后需要唤醒线程继续消费)
    final Condition notEmpty = lock.newCondition();


    public void produce(Good good) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == MAX_SIZE)  { //防止虚假唤醒,Condition的await调用一般会放在一个循环判断中
                System.out.println("仓库已满 >> 生产暂停,等待消费");
                notFull.await();
            }
            list.add(good);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    public void consume() throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == 0) {
                System.out.println("库存已清仓 >> 消费暂停,等待生产");
                notEmpty.await();
            }
            list.remove();
            notFull.signal();
        } finally {
            lock.unlock();
        }
    }

}

通过 ReentrantLock + Condition 来替代实现上面的等待/唤醒,但无疑它的功能性更加齐全和灵活(有限等待、公平锁、读写锁分离等)。

3. BlockingQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class GoodsStorageDispatcher {
    // 仓库容纳上限
    private static final int MAX_SIZE = 100;
    // 仓库容纳的货物集合
    private LinkedBlockingQueue<Good> list = new LinkedBlockingQueue<>(MAX_SIZE);
  

    public void produce(Good good) {
        if (list.size() == MAX_SIZE) {
            System.out.println("仓库已满 >> 生产暂停,等待消费");
        }
        try {
            list.put(good);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void consume() {
        if (list.size() == 0) {
            System.out.println("库存已清仓 >> 消费暂停,等待生产");
        }
        try {
            list.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

}

BlockingQueue 其实就是阻塞队列,是基于阻塞机制实现的线程安全的队列。而阻塞机制的实现是通过在入队和出队时加锁的方式避免并发操作。我们可以理解它内部其实已经实现了上述的加锁和等待/唤醒这一套流程,只不过帮我们封装好了这一切。

应用场景

我们所熟知的线程池,它的内部其实就是构建了一个生产者-消费者模型用于将任务管理线程管理两部分工作解耦,很大程度上提高了可扩展性。

threadpool_running_process

平时业务中也会有生产消费模型的用武之地,比如多固件更新过程就符合生产-消费模型,用户通过手动操作选中特定设备的固件包加入升级队列,随后由 UpdateDispatcher 根据具体设备类型进行不同策略的分发消费。

相关参考


许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。