利用生产者消费者模型和MQ模型写一个自己的日志系统-并发设计里一定会用到的手段

一:前言

  写这个程序主要是用来理解生产者消费者模型,以及通过这个Demo来理解Redis的单线程取原子任务是怎么实现的和巩固一下并发相关的知识;这个虽然是个Demo,但是只要稍加改下Appender部分也是可以用于项目中的,假设项目里确实不需要log4j/logback之类的日志组件的时候;

二:实现方式

1.利用LinkedList作为MQ(还可以用jdk自带的LinkedBlockingQueue,不过这个Demo主要是为了更好的理解原理因此写的比较底层);

2.利用一个Daemon线程作为消费者从MQ里实时获取日志对象/日志记录,并将它提交给线程池,由线程池再遍历所有的appender并调用它们的通知方法,这个地方还可以根据场景进行效率优化,如将循环遍历appender改为将每个appender都再此提交到线程池实现异步通知观察者;

3.为生产者提供log方法作为生产日志记录的接口,无论是生产日志对象还是消费日志对象在操作队列时都需要对队列加锁,因为个人用的是非并发包里的;

4.消费者在获取之前会先判断MQ里是否有数据,有则获取并提交给线程池处理,否则wait;

5.生产者生产了日志对象后通过notify通知消费者去取,因为只有一个消费者,而生产者是不会wait的因此只需要notify而不用notifyAll

6.。。剩下的就看代码来说明吧;

三:代码

1.MyLogger类的实现

package me.silentdoer.mqlogger.log;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.DEBUG;
import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.ERROR;

/**
 * @author silentdoer
 * @version 1.0
 * @description 这里只是做一个简单的logger实现,不提供Appender之类的功能,主要是用来学习生产者和消费者及MQ的实现原理
 * @date 4/26/18 6:07 PM
 */
public class MyLogger{
    private LogLevel loggerLevel = DEBUG;
    private String charset = "UTF-8";  // 暂且没用,但是当需要序列化时是可能用到的;
    // TODO 也可以直接用LinkedQueue,然后手动通过ReentrantLock来实现并发时的数据安全(synchronized也可)
    //private BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<LogRecord>();  // 可以理解为支持并发的LinkedList
    // TODO 想了一下既然是要学习原理干脆就实现的更底层一点
    private final Queue<LogRecord> records = new LinkedList<LogRecord>();
    // TODO 用于记录生产了多少条日志,可供外部获取
    private AtomicLong produceCount = new AtomicLong(0);
    // TODO 用于记录消费了多少条日志
    private AtomicLong consumeCount = new AtomicLong(0);
    // TODO 日志记录的Consumer
    private Thread consumer = new LogDaemon();

    public MyLogger(){
        consumer.setDaemon(true);
        consumer.start();
    }

    /**
     * 对外提供的接口,即log方法就是生产者用于生产日志数据的接口
     * @param msg
     * @param level
     */
    public void log(String msg, LogLevel level){
        Date curr = generateCurrDate();
        log(new LogRecord(level, msg, curr));
    }

    /**
     * 对外提供的接口,即log方法就是生产者用于生产日志数据的接口
     * @param msg
     */
    public void log(String msg){
        Date curr = generateCurrDate();
        log(new LogRecord(this.loggerLevel, msg, curr));
    }

    /**
     * 给生产者(即调用log的方法都可以理解为生产者在生产日志对象)提供用于生产日志记录的接口
     * @param record
     */
    public void log(LogRecord record){
        // ReentrantLock可以替代synchronized,不过当前场景下synchronized已经足够
        synchronized (this.records){  // TODO 如果用的是LinkedBlockingQueue是不需要这个的
            this.records.offer(record);
            this.produceCount.incrementAndGet();
            this.records.notify();  // TODO 只有一个线程会records.wait(),因此notify()足够
        }
    }

    // TODO 类似Redis的那个单线程,用于读取命令对象,而这里则是用于读取LogRecord并通过appender将数据写到相应位置
    private class LogDaemon extends Thread{
        private volatile boolean valid = true;
        // 充当appenders的角色
        private List<Writer> appenders = null;
        private ExecutorService threadPool = new ThreadPoolExecutor(1, 3
                , 180000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024));

        @Override
        public void run() {
            while(this.valid){
                // TODO 根据最少知道原则,在这里不要去想整体里是否存在打断此线程的地方,你就认为此线程是可能被外界打断的即可,因此需要做一定处理
                try {
                    synchronized (MyLogger.this.records) {
                        if (MyLogger.this.records.size() <= 0) {
                            MyLogger.this.records.wait();
                        }
                        final LogRecord firstRecord = MyLogger.this.records.poll();
                        MyLogger.this.consumeCount.incrementAndGet();
                        //threadPool.submit()
                        threadPool.execute(() -> MyLogger.this.notifyAppender(this.appenders, firstRecord));
                    }
                }catch (InterruptedException ex){
                    this.valid = false;
                    ex.printStackTrace();
                }catch (Throwable t){
                    t.printStackTrace();
                }
            }
        }
    }

    private void notifyAppender(final List<Writer> appenders, final LogRecord record) {
        if(appenders == null){
            PrintWriter writer = new PrintWriter(record.level == ERROR ? System.err : System.out);
            writer.append(record.toString());
            writer.flush();
        }else{
            // TODO 这种是同步的方式,如果是异步的方式可以将每个appender的执行都由一个Runnable对象包装,然后submit给线程池(或者中间加个中间件)
            for(Writer writer : appenders){
                try {
                    writer.append(record.toString());
                }catch (IOException ex){
                    ex.printStackTrace();
                }
            }
        }
    }

    /**
     * 用于产生当前时间的模块,防止因为并发而导致LogRecord的timestamp根实际情况不符
     */
    private Lock currDateLock = new ReentrantLock();  // 直接用synchronized亦可
    private Date generateCurrDate(){
        currDateLock.lock();
        Date result = new Date();
        currDateLock.unlock();
        return result;
    }

    // 生产者生产的数据对象
    public static class LogRecord{
        private LogLevel level;
        private String msg;
        private Date timestamp;
        private static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        private SimpleDateFormat dateFormat = DEFAULT_DATE_FORMAT;

        /*public LogRecord(){
            this(INFO, "");
        }*/

        public LogRecord(LogLevel level, String msg){
            this(level, msg, new Date());  // 还是最好由外界设置timestamp,否则高并发下会比较不准
        }

        // TODO 最好用这个,不然高并发下timestamp容易出现顺序不准确的情况。
        public LogRecord(LogLevel level, String msg, Date timestamp){
            this.level = level;
            this.msg = msg;
            this.timestamp = timestamp;
        }

        @Override
        public String toString(){
            return String.format("[Level:%s, Datetime:%s] : %s\n", level, dateFormat.format(timestamp), msg);
        }

        public LogLevel getLevel() {
            return level;
        }

        public String getMsg() {
            return msg;
        }

        public void setDateFormat(SimpleDateFormat dateFormat) {
            this.dateFormat = dateFormat;
        }

        public void setTimestamp(Date timestamp) {
            this.timestamp = timestamp;
        }
    }

    public enum LogLevel{  // TODO 内部enum默认就是static
        INFO,
        DEBUG,
        ERROR
    }

    public LogLevel getLoggerLevel() {
        return loggerLevel;
    }

    public void setLoggerLevel(LogLevel loggerLevel) {
        this.loggerLevel = loggerLevel;
    }

    public String getCharset() {
        return charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public AtomicLong getProduceCount() {
        return produceCount;
    }

    public AtomicLong getConsumeCount() {
        return consumeCount;
    }
}

2.测试用例1

package me.silentdoer.mqlogger;

import me.silentdoer.mqlogger.log.MyLogger;

import java.util.Scanner;

/**
 * @author silentdoer
 * @version 1.0
 * @description the description
 * @date 4/26/18 10:13 PM
 */
public class Entrance {
    private static MyLogger logger = new MyLogger();

    public static void main(String[] args){
        //logger.setLoggerLevel(MyLogger.LogLevel.ERROR);
        Scanner scanner = new Scanner(System.in);
        String line;
        while(!(line = scanner.nextLine()).equals("exit")){
            if(line.equals(""))
                continue;
            logger.log(line);
            System.out.println(String.format("共生产了%s条日志。", logger.getConsumeCount()));
            try {
                Thread.sleep(500);
            }catch (InterruptedException ex){ }
            System.out.println(String.format("共消费了%s条日志。", logger.getProduceCount()));
        }
    }
}

3.测试用例2

package me.silentdoer.mqlogger;

import me.silentdoer.mqlogger.log.MyLogger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author silentdoer
 * @version 1.0
 * @description the description
 * @date 4/26/18 10:32 PM
 */
public class Entrance2 {
    private static MyLogger logger = new MyLogger();

    public static void main(String[] args){
        logger.setLoggerLevel(MyLogger.LogLevel.ERROR);
        ExecutorService threadPool = Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            final int index = i + 1;
            threadPool.execute(() -> {
                logger.log(String.format("生产的第%s条记录。", index));
                System.out.println(String.format("共生产了%s条记录。", index));
            });
            try {
                Thread.sleep(100);
            }catch (InterruptedException ex){ }
        }
        try {
            Thread.sleep(3000);
            System.out.println(String.format("共%s条记录被消费。", logger.getConsumeCount()));
        }catch (InterruptedException ex){ }
        //threadPool.shutdown();
        //threadPool.shutdownNow();
    }
}

四:补充

  如果想实现像BlockingQueue一样能够控制MQ的元素个数范围,则可以通过ReentrantLock的Confition来实现,即通过lock创建两个Condition对象,一个用来描述是否MQ中元素达到上限的情况,一个用于描述MQ中元素降到下限的情况;

无论是达到上限或降到下限都会通过相应的condition对象来阻塞对应的生产者或消费者的生产/消费过程从而实现MQ元素个数的可控性;

原文地址:https://www.cnblogs.com/silentdoer/p/8955713.html

时间: 2024-10-07 13:28:26

利用生产者消费者模型和MQ模型写一个自己的日志系统-并发设计里一定会用到的手段的相关文章

用java写一个远程视频监控系统,实时监控(类似直播)我想用RPT协议,不知道怎么把RPT协议集成到项目中

我最近在用java写一个远程视频监控系统,实时监控(类似直播)我想用RPT协议,不知道怎么把RPT协议集成到项目中,第一次写项目,写过这类项目的多多提意见,哪方面的意见都行,有代码或者demo的求赏给我,谢谢

Actor模型和CSP模型的区别

Akka/Erlang的actor模型与Go语言的协程Goroutine与通道Channel代表的CSP(Communicating Sequential Processes)模型有什么区别呢? 首先这两者都是并发模型的解决方案,我们看看其定义和实现方式: Actor模型描述了一组为了避免并发编程的常见问题的公理: 1.所有Actor状态是Actor本地的,外部无法访问. 2.Actor必须只有通过消息传递进行通信. 3.一个Actor可以响应消息:推出新Actor,改变其内部状态,或将消息发送

分享:计算机图形学期末作业!!利用WebGL的第三方库three.js写一个简单的网页版“我的世界小游戏”

这几天一直在忙着期末考试,所以一直没有更新我的博客,今天刚把我的期末作业完成了,心情澎湃,所以晚上不管怎么样,我也要写一篇博客纪念一下我上课都没有听,还是通过强大的度娘完成了我的作业的经历.(当然作业不是百度来的,我只是百度了一些示例代码的意思,怎么用!算了,越解释万一越黑呢!哈哈O(∩_∩)O哈哈~) ----------------------------------------------------------------分界线------------------------------

Java小练习之利用面向对象写一个简单的登录系统

import java.util.Scanner; /** * 采用面向对象的方式 写一个登录系统 * @author Administrator * */ //用户信息 class UserInfo{ public static String[] user = new String[10]; public static String[] passwd = new String[10]; public UserInfo() { this.user[0] = "test"; this.p

利用生产者消费者模型实现大文件的拷贝

代码如下,一般10个生产者10个消费者拷贝1个g的文件大概在6s左右,速度还是不错的. 1 #include <semaphore.h> 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <fcntl.h> 5 #include <pthread.h> 6 #include <sys/stat.h> 7 #include <unistd.h> 8 #include

java——利用生产者消费者模式思想实现简易版handler机制

参考教程:http://www.sohu.com/a/237792762_659256 首先介绍每一个类: 1.Message: 这个类的作用是存储一个生产者生产出来的具体的消息,就类似链表队列中的一个节点,自行定义需要存储的内容. code:消息要执行的具体动作代码 msg:消息内容 target:用来关联hadler,根本目的时为了使这几个类共享一个MessageQueue,这个很重要 2.MessageQueue: 这个类就是生产者和消费者线程需要共享的一个存储消息的队列,生产者将消息放入

比较一下Linux下的Epoll模型和select模型的区别

一. select 模型(apache的常用) 1. 最大并发数限制,因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024/2048 ,因此 Select 模型的最大并发数就被相应限制了.自己改改这个 FD_SETSIZE ?想法虽好,可是先看看下面吧 … 2. 效率问题, select 每次调用都会线性扫描全部的 FD 集合,这样效率就会呈现线性下降,把 FD_SETSIZE 改大的后果就是,大家都慢慢来,什么?都超时了. 3. 内核 / 用

NLP中word2vec的CBOW模型和Skip-Gram模型

参考:tensorflow_manual_cn.pdf     Page83 例子(数据集): the quick brown fox jumped over the lazy dog. (1)CBOW模型: (2)Skip-Gram模型:

MVC模型和MVT模型

MVC是众所周知的模式:model(模型).view(视图).controller(控制器),其核心思想是分工.解耦,让不同的代码块之间降低耦合,增强代码的可扩展性和可移植性,实现向后兼容.用户在页面输入url,转交给url控制器,然后根据url匹配相应的视图函数,viwe会去到models取数据,然后models在数据库中取得数据后返回给视图,视图把要展示的数据返回给模版,然后就输出到页面上. MTV,Django也是一个MVC框架,但是在Django中,控制器接受用户输入的部分由框架自行处理