一: 前言
写这个程序主要是用来理解生产者消费者模型, 以及通过这个 Demo 来理解 Redis 的单线程取原子任务是怎么实现的和巩固一下并发相关的知识; 这个虽然是个 Demo, 但是只要稍加改下 Appender 部分也是可以用于项目中的, 假设项目里确实不需要 log4j/logback 之类的日志组件的时候;
二: 实现方式
1. 利用 LinkedList 作为 MQ(还可以用 jdk 自带的 LinkedBlockingQueue, 不过这个 Demo 主要是为了更好的理解原理因此写的比较底层);
2. 利用一个 Daemon 线程作为消费者从 MQ 里实时获取日志对象 / 日志记录, 并将它提交给线程池, 由线程池再遍历所有的 appender 并调用它们的通知方法, 这个地方还可以根据场景进行效率优化, 如将循环遍历 appender 改为将每个 appender 都再此提交到线程池实现异步通知观察者;
3. 为生产者提供 log 方法作为生产日志记录的接口, 无论是生产日志对象还是消费日志对象在操作队列时都需要对队列加锁, 因为个人用的是非并发包里的;
4. 消费者在获取之前会先判断 MQ 里是否有数据, 有则获取并提交给线程池处理, 否则 wait;
5. 生产者生产了日志对象后通过 notify 通知消费者去取, 因为只有一个消费者, 而生产者是不会 wait 的因此只需要 notify 而不用 notifyAll
6... 剩下的就看代码来说明吧;
三: 代码
1.MyLogger 类的实现
- package me.silentdoer.mqlogger.log;
- import java.io.IOException;
- import java.io.PrintWriter;
- import java.io.Writer;
- import java.text.SimpleDateFormat;
- import java.util.*;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.DEBUG;
- import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.ERROR;
/**
* @author silentdoer
* @version 1.0
* @description 这里只是做一个简单的 logger 实现, 不提供 Appender 之类的功能, 主要是用来学习生产者和消费者及 MQ 的实现原理
* @date 4/26/18 6:07 PM
*/
- public class MyLogger{
- private LogLevel loggerLevel = DEBUG;
- private String charset = "UTF-8"; // 暂且没用, 但是当需要序列化时是可能用到的;
- // TODO 也可以直接用 LinkedQueue, 然后手动通过 ReentrantLock 来实现并发时的数据安全(synchronized 也可)
- //private BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<LogRecord>(); // 可以理解为支持并发的 LinkedList
- // TODO 想了一下既然是要学习原理干脆就实现的更底层一点
- private final Queue<LogRecord> records = new LinkedList<LogRecord>();
- // TODO 用于记录生产了多少条日志, 可供外部获取
- private AtomicLong produceCount = new AtomicLong(0);
- // TODO 用于记录消费了多少条日志
- private AtomicLong consumeCount = new AtomicLong(0);
- // TODO 日志记录的 Consumer
- private Thread consumer = new LogDaemon();
- public MyLogger(){
- consumer.setDaemon(true);
- consumer.start();
- }
- /**
- * 对外提供的接口, 即 log 方法就是生产者用于生产日志数据的接口
- * @param msg
- * @param level
- */
- public void log(String msg, LogLevel level){
- Date curr = generateCurrDate();
- log(new LogRecord(level, msg, curr));
- }
- /**
- * 对外提供的接口, 即 log 方法就是生产者用于生产日志数据的接口
- * @param msg
- */
- public void log(String msg){
- Date curr = generateCurrDate();
- log(new LogRecord(this.loggerLevel, msg, curr));
- }
- /**
- * 给生产者 (即调用 log 的方法都可以理解为生产者在生产日志对象) 提供用于生产日志记录的接口
- * @param record
- */
- public void log(LogRecord record){
- // ReentrantLock 可以替代 synchronized, 不过当前场景下 synchronized 已经足够
- synchronized (this.records){ // TODO 如果用的是 LinkedBlockingQueue 是不需要这个的
- this.records.offer(record);
- this.produceCount.incrementAndGet();
- this.records.notify(); // TODO 只有一个线程会 records.wait(), 因此 notify()足够
- }
- }
- // TODO 类似 Redis 的那个单线程, 用于读取命令对象, 而这里则是用于读取 LogRecord 并通过 appender 将数据写到相应位置
- private class LogDaemon extends Thread{
- private volatile boolean valid = true;
- // 充当 appenders 的角色
- private List<Writer> appenders = null;
- private ExecutorService threadPool = new ThreadPoolExecutor(1, 3
- , 180000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024));
- @Override
- public void run() {
- while(this.valid){
- // TODO 根据最少知道原则, 在这里不要去想整体里是否存在打断此线程的地方, 你就认为此线程是可能被外界打断的即可, 因此需要做一定处理
- try {
- synchronized (MyLogger.this.records) {
- if (MyLogger.this.records.size() <= 0) {
- MyLogger.this.records.wait();
- }
- final LogRecord firstRecord = MyLogger.this.records.poll();
- MyLogger.this.consumeCount.incrementAndGet();
- //threadPool.submit()
- threadPool.execute(() -> MyLogger.this.notifyAppender(this.appenders, firstRecord));
- }
- }catch (InterruptedException ex){
- this.valid = false;
- ex.printStackTrace();
- }catch (Throwable t){
- t.printStackTrace();
- }
- }
- }
- }
- private void notifyAppender(final List<Writer> appenders, final LogRecord record) {
- if(appenders == null){
- PrintWriter writer = new PrintWriter(record.level == ERROR ? System.err : System.out);
- writer.append(record.toString());
- writer.flush();
- }else{
- // TODO 这种是同步的方式, 如果是异步的方式可以将每个 appender 的执行都由一个 Runnable 对象包装, 然后 submit 给线程池(或者中间加个中间件)
- for(Writer writer : appenders){
- try {
- writer.append(record.toString());
- }catch (IOException ex){
- ex.printStackTrace();
- }
- }
- }
- }
- /**
- * 用于产生当前时间的模块, 防止因为并发而导致 LogRecord 的 timestamp 根实际情况不符
- */
- private Lock currDateLock = new ReentrantLock(); // 直接用 synchronized 亦可
- private Date generateCurrDate(){
- currDateLock.lock();
- Date result = new Date();
- currDateLock.unlock();
- return result;
- }
- // 生产者生产的数据对象
- public static class LogRecord{
- private LogLevel level;
- private String msg;
- private Date timestamp;
- private static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
- private SimpleDateFormat dateFormat = DEFAULT_DATE_FORMAT;
- /*public LogRecord(){
- this(INFO, "");
- }*/
- public LogRecord(LogLevel level, String msg){
- this(level, msg, new Date()); // 还是最好由外界设置 timestamp, 否则高并发下会比较不准
- }
- // TODO 最好用这个, 不然高并发下 timestamp 容易出现顺序不准确的情况.
- public LogRecord(LogLevel level, String msg, Date timestamp){
- this.level = level;
- this.msg = msg;
- this.timestamp = timestamp;
- }
- @Override
- public String toString(){
- return String.format("[Level:%s, Datetime:%s] : %s\n", level, dateFormat.format(timestamp), msg);
- }
- public LogLevel getLevel() {
- return level;
- }
- public String getMsg() {
- return msg;
- }
- public void setDateFormat(SimpleDateFormat dateFormat) {
- this.dateFormat = dateFormat;
- }
- public void setTimestamp(Date timestamp) {
- this.timestamp = timestamp;
- }
- }
- public enum LogLevel{ // TODO 内部 enum 默认就是 static
- INFO,
- DEBUG,
- ERROR
- }
- public LogLevel getLoggerLevel() {
- return loggerLevel;
- }
- public void setLoggerLevel(LogLevel loggerLevel) {
- this.loggerLevel = loggerLevel;
- }
- public String getCharset() {
- return charset;
- }
- public void setCharset(String charset) {
- this.charset = charset;
- }
- public AtomicLong getProduceCount() {
- return produceCount;
- }
- public AtomicLong getConsumeCount() {
- return consumeCount;
- }
- }
2. 测试用例 1
- package me.silentdoer.mqlogger;
- import me.silentdoer.mqlogger.log.MyLogger;
- import java.util.Scanner;
- /**
- * @author silentdoer
- * @version 1.0
- * @description the description
- * @date 4/26/18 10:13 PM
- */
- public class Entrance {
- private static MyLogger logger = new MyLogger();
- public static void main(String[] args){
- //logger.setLoggerLevel(MyLogger.LogLevel.ERROR);
- Scanner scanner = new Scanner(System.in);
- String line;
- while(!(line = scanner.nextLine()).equals("exit")){
- if(line.equals(""))
- continue;
- logger.log(line);
- System.out.println(String.format("共生产了 %s 条日志.", logger.getConsumeCount()));
- try {
- Thread.sleep(500);
- }catch (InterruptedException ex){ }
- System.out.println(String.format("共消费了 %s 条日志.", logger.getProduceCount()));
- }
- }
- }
3. 测试用例 2
- package me.silentdoer.mqlogger;
- import me.silentdoer.mqlogger.log.MyLogger;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- * @author silentdoer
- * @version 1.0
- * @description the description
- * @date 4/26/18 10:32 PM
- */
- public class Entrance2 {
- private static MyLogger logger = new MyLogger();
- public static void main(String[] args){
- logger.setLoggerLevel(MyLogger.LogLevel.ERROR);
- ExecutorService threadPool = Executors.newCachedThreadPool();
- for(int i=0;i<10;i++){
- final int index = i + 1;
- threadPool.execute(() -> {
- logger.log(String.format("生产的第 %s 条记录.", index));
- System.out.println(String.format("共生产了 %s 条记录.", index));
- });
- try {
- Thread.sleep(100);
- }catch (InterruptedException ex){ }
- }
- try {
- Thread.sleep(3000);
- System.out.println(String.format("共 %s 条记录被消费.", logger.getConsumeCount()));
- }catch (InterruptedException ex){ }
- //threadPool.shutdown();
- //threadPool.shutdownNow();
- }
- }
四: 补充
如果想实现像 BlockingQueue 一样能够控制 MQ 的元素个数范围, 则可以通过 ReentrantLock 的 Confition 来实现, 即通过 lock 创建两个 Condition 对象, 一个用来描述是否 MQ 中元素达到上限的情况, 一个用于描述 MQ 中元素降到下限的情况;
无论是达到上限或降到下限都会通过相应的 condition 对象来阻塞对应的生产者或消费者的生产 / 消费过程从而实现 MQ 元素个数的可控性;
来源: https://www.cnblogs.com/silentdoer/p/8955713.html