多线程之模拟自定义消息队列


1.背景

面试官问,假设让你设计一个队列,你的思路是...

2.代码

package com.ldp.demo01;

import com.common.MyThreadUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

/**
 * @author 姿势帝-博客园
 * @address https://www.cnblogs.com/newAndHui/
 * @WeChat 851298348
 * @create 01/31 8:24
 * @description
 */
@Slf4j
public class Test08MessageQueue {
    /**
     * 测试自定义的队列
     *
     * @param args
     */
    public static void main(String[] args) {
        // 创建队列
        MessageQueue queue = new MessageQueue(2);
        // 模拟4个生产者参数数据到队列
        for (int i = 0; i < 4; i++) {
            int n = i + 1;
            new Thread(() -> {
                int m = 1;
                // 不停的循环放入数据
                while (true) {
                    int k = n * 1000 + m;
                    Message message = new Message(k, "消息-" + k);
                    log.info("放入的消息是:{}", message);
                    queue.put(message);
                    m++;
                }
            }, "生产者-" + n).start();
        }
        // 模拟2个消费者到队列中取数据
        for (int i = 0; i < 1; i++) {
            int n = i + 1;
            new Thread(() -> {
                // 每个消费者都是不停的取数据
                while (true) {
                    // 每隔2秒取一次
                    MyThreadUtil.sleep(2);
                    Message message = queue.take();
                    log.info("取到的消息是:{}", message);
                }
            }, "消费者-" + n).start();
        }
    }
}

@Slf4j
class MessageQueue {
    /**
     * 队列的容量
     */
    private Integer capacity;
    /**
     * 装消息的队列
     */
    private LinkedList list = new LinkedList<>();

    public MessageQueue(Integer capacity) {
        this.capacity = capacity;
    }

    /**
     * 获取一条消息
     *
     * @return
     */
    public Message take() {
        synchronized (list) {
            while (list.isEmpty()) {
                try {
                    // log.debug("无数据,等待中....");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 取出头部消息,并删除
            Message message = list.removeFirst();
            // 通知放入队列
            list.notifyAll();
            return message;
        }
    }

    /**
     * 放入一条数据
     *
     * @param message
     */
    public void put(Message message) {
        synchronized (list) {
            while (list.size() >= capacity) {
                try {
                    //  log.debug("队列已满,当前队列数:{},最大队列数:{}", list.size(), capacity);
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 放入队列
            list.addLast(message);
            // 通知消费者取数据
            list.notifyAll();
        }
    }
}

/**
 * 消息对象
 * 使用final不让其被继承,避免修改消息
 */
final class Message {
    private Integer id;
    private Object object;

    /**
     * 提供一个构造方法
     *
     * @param id
     * @param object
     */
    public Message(Integer id, Object object) {
        /**
         * 消息id
         */
        this.id = id;
        /**
         * 具体的消息内容
         */
        this.object = object;
    }

    /**
     * 只提供获取方法,让消息不可以修改
     *
     * @return
     */
    public Integer getId() {
        return id;
    }

    public Object getObject() {
        return object;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", object=" + object +
                '}';
    }
}

完美