2021年4月5日星期一

Java 并发编程 生产者消费者模式


本文部分摘自《Java 并发编程的艺术》


模式概述

在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的数据。生产者和消费者彼此之间不直接通信,而是通过阻塞队列进行通信,所以生产者生产完数据后不用等待消费者处理,而是直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列取,阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力


模式实战

假设现有需求:把各部门的邮件收集起来,统一处理归纳。可以使用生产者 - 消费者模式,启动一个线程把所有邮件抽取到队列中,消费者启动多个线程处理邮件。Java 代码如下:

public class QuickCheckEmailExtractor { private final ThreadPoolExecutor threadsPool; private final BlockingQueue<EmailDTO> emailQueue; private final EmailService emailService; public QuickCheckEmailExtractor() {  emailQueue = new LinkedBlockingQueue<>();  int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;  threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 101,    TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000),    Executors.defaultThreadFactory(),    new ThreadPoolExecutor.AbortPolicy());  emailService = new EmailService(); } public void extract() {  // 抽取所有邮件到队列里  new ExtractEmailTask().start();  // 处理队列里的邮件  check(); } private void check() {  try {   while (true) {    // 两秒内取不到就退出    EmailDTO email = emailQueue.poll(2, TimeUnit.SECONDS);    if (email == null) {     break;    }    threadsPool.submit(new CheckEmailTask());   }  } catch (InterruptedException e) {   e.printStackTrace();  } } protected void extractEmail() {  List<EmailDTO> allEmails = emailService.queryAllEmails();  if (allEmails == null) {   return;  }  for (EmailDTO emailDTO : allEmails) {   emailQueue.offer(emailDTO);  } } protected void checkEmail(EmailDTO email) {  System.out.println("邮件" + email.getId() + "已处理"); } public class ExtractEmailTask extends Thread {  @Override  public void run() {   extractEmail();  } } public class CheckEmailTask extends Thread {  private EmailDTO email;  @Override  public void run() {   checkEmail(email);  }  public CheckEmailTask() {   super();  }  public CheckEmailTask(EmailDTO email) {   super();   this.email = email;  } }}

多生产者和多消费者场景

在多核时代,多线程并发处理速度比单线程处理速度更快,所以可以使用多个线程来生产数据,多个线程来消费数据。更复杂的情况是,消费者消费完的数据,可能还要交给其他消费者继续处理,如图所示:

我们在一个长连接服务器中使用这种模式,生产者 1 负责将所有客户端发送的消息存放在阻塞队列 1 里,消费者 1 从队列里读消息,然后通过消息 ID 进行散列得到 N 个队列中的一个,然后根据编号将消息存放在不同的队列里,每个阻塞队列会分配一个线程来阻塞队列里的数据。如果消费者 2 无法消费消息,就将消息再抛回阻塞队列 1 中,交给其他消费者处理

public class MsgQueueManager { /**  * 消息总队列  */ private final BlockingQueue<Message> messageQueue; /**  * 消息子队列集合  */ private final List<BlockingQueue<Message>> subMsgQueues; private MsgQueueManager() {  messageQueue = new LinkedBlockingQueue<>();  subMsgQueues = new ArrayList<>(); } public static MsgQueueManager getInstance() {  return new MsgQueueManager(); } public void put(Message msg) {  try {   messageQueue.put(msg);  } catch (InterruptedException e) {   Thread.currentThread().interrupt();  } } public Message take() {  try {   return messageQueue.take();  } catch (InterruptedException e) {   Thread.currentThread().interrupt();  }  return null; } /**  * 消费者线程获取子队列  */ public BlockingQueue<Message> addSubMsgQueue() {  BlockingQueue<Message> subMsgQueue = new LinkedBlockingQueue<>();  subMsgQueues.add(subMsgQueue);  return subMsgQueue; } /**  * 消息分发线程,负责把消息从大队列塞到小队列里  */ class DispatchMessageTask implements Runnable {  /**   * 控制消息分发开始与结束   */  private boolean flag = true;  public void setFlag(boolean flag) {   this.flag = flag;  }  @Override  public void run() {   BlockingQueue<Message> subQueue;   while (flag) {    // 如果没有数据,则阻塞在这里    Message msg = take();    // 如果为空,表示没有Session连接,需要等待Session连接上来    while ((subQueue = getSubQueue()) == null) {     try {      Thread.sleep(1000);     } catch (InterruptedException e) {      Thread.currentThread().interrupt();     }    }    // 把消息放到小队列里    try {     subQueue.put(msg);    } catch (InterruptedException e) {     Thread.currentThread().interrupt();    }   }  }  /**   * 均衡获取一个子队列   */  public BlockingQueue<Message> getSubQueue() {   List<BlockingQueue<Message>> subMsgQueues = getInstance().subMsgQueues;   if (subMsgQueues.isEmpty()) {    return null;   }   int index = (int) (System.nanoTime() % subMsgQueues.size());   return subMsgQueues.get(index);  } }}









原文转载:http://www.shaoqun.com/a/664719.html

跨境电商:https://www.ikjzd.com/

白色清关:https://www.ikjzd.com/w/1410

浩方:https://www.ikjzd.com/w/1046


本文部分摘自《Java并发编程的艺术》模式概述在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的数据。生产者和消费者彼此之间不直接通信,而是通过阻塞队列进行通信,所以生产者生产完数据后不用等待消费者处理,而是直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列取,阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力模式实战假设现有需求:把各部门的邮件收集起来,统一处理归纳。
Sunrate:https://www.ikjzd.com/w/2685
netporter:https://www.ikjzd.com/w/2132
mymall:https://www.ikjzd.com/w/1050
震惊 | 又遭同行恶搞!卖家产品图片无故被篡改,这是最快的处理方式 :https://www.ikjzd.com/home/10998
重要 | 2021年1月1日起,英国增值税法规变更!:https://www.ikjzd.com/home/132198
增加曝光率!亚马逊为自有品牌新推出"Our Brand"标签!:https://www.ikjzd.com/home/107501

没有评论:

发表评论