JavaWeb项目架构之Redis分布式日志队列-SpringBoot实例

架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Redis做消息队列罢了。

?

  • 为什么需要消息队列?

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。
比如我们系统中常见的邮件、短信发送,把这些不需要及时响应的功能写入队列,异步处理请求,减少响应时间。

  • 如何实现?

成熟的JMS消息队列中间件产品市面上有很多,但是基于目前项目的架构以及部署情况,我们采用Redis做消息队列。

  • 为什么用Redis?

Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。

它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础。

  • SpringBoot演示
  • application.properties

主要用于redis基本连接配置

#配置redis
#在RedisProperties.class有redis的默认配置,默认host为localhost,默认端口为6379
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.maxIdle=3
spring.redis.maxTotal=20
  • RedisConfig.java

redis连接配置,序列化配置,订阅发布者配置

package com.niugang;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.niugang.mq.MessageDelegate;
import com.niugang.mq.MessageDelegateImpl;
import redis.clients.jedis.JedisPoolConfig;

@Configuration
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfig {
        public String host;
public int port;
public int maxIdle;
public int maxTotal;
@Bean
public JedisConnectionFactory jedisConnectionFactory() {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host);
jedisConnectionFactory.setPort(port);
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxTotal(maxTotal);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig);
return jedisConnectionFactory;
}
  // 默认用的是用JdkSerializationRedisSerializer进行序列化的
@Bean
@SuppressWarnings({ "rawtypes", "unchecked" })
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
// 注入数据源
redisTemplate.setConnectionFactory(jedisConnectionFactory());
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
               // 设置value的序列化规则和 key的序列化规则
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
     @Bean
public StringRedisTemplate stringRedisTemplate() {
return new StringRedisTemplate(jedisConnectionFactory());
}

// ################发布订阅配置###################################
@Bean
public MessageDelegate messageDelegate() {
return new MessageDelegateImpl();
}
      @Bean
public MessageListenerAdapter messageListener() {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageDelegate());
return messageListenerAdapter;

}
/**
* RedisMessageListenerContainer充当消息侦听器容器;它用于接收来自Redis通道的消息,
* 并驱动注入到其中的MessageListeners。侦听器容器负责消息接收的所有线程,并将消息分派到侦听器中进行处理。
* 消息侦听器容器是MDP(message-driven POJOs)和消息传递提供程序之间的中介 负责注册接收消息,资源获取和发布,异常。
*
* 转换等
*/
      @Bean
public RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer con = new RedisMessageListenerContainer();
con.setConnectionFactory(jedisConnectionFactory());
ChannelTopic channelTopic = new ChannelTopic("log_queue");
con.addMessageListener(messageListener(), channelTopic);
return con;
}
public String getHost() {
return host;
}
        public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public int getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
}
  • 日志注解
package com.niugang.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * 定义记录日志注解
 * @author niugang
 *
 */
@Retention(RetentionPolicy.RUNTIME)
//注解可用在方法和类上
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface LogApi {
   /**
    * 日志描述
    */
   String value() default "";
   /**
    * 日志类型
    * @return
    */
   String type() default LogType.OPERATE_LOG;

}

  • 日志注解解析类
package com.niugang.annotation;
/**
 * 日志解析bean
 * @author niugang
 *
 */
public class LogParseBean {
private String value;
private String type;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
  • 日志类型常量类
package com.niugang.annotation;
/**
 * 日志类型常量类
 * @author niugang
 *
 */
public class LogType {
   public static final String OPERATE_LOG="redis:operate:log";
}
  • 消息委托接口(根据Spring-data-redis官方文档配置)
package com.niugang.mq;
import java.io.Serializable;
import java.util.Map;

/**
 * 考虑下面的接口定义。
 * 注意,尽管接口没有扩展MessageListener接口,仍然可以通过使用MessageListenerAdapter将其用作MDP
 *  类(容器中配置)。还要注意各种消息处理方法是如何根据
 *
 * 它们可以接收和处理的各种消息类型的内容
 *
 * @author niugang
 *
 */
public interface MessageDelegate {
// 默认监听的方法就是handleMessage
void handleMessage(String message);
/*
* void handleMessage(Map message); void handleMessage(byte[] message); void
* handleMessage(Serializable message); // pass the channel/pattern as well
* void handleMessage(Serializable message, String channel);
*/

}
 

  • 消息委托接口实现类(根据Spring-data-redis官方文档配置)

以下处理主要是订阅者接收到消息,将消息根据类型存到redis中,只是为了演示

package com.niugang.mq;
import java.io.Serializable;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import com.alibaba.fastjson.JSON;
import com.niugang.annotation.LogParseBean;
public class MessageDelegateImpl implements MessageDelegate{
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void handleMessage(String message) {
if(StringUtils.isNotBlank(message)){
System.out.println("message:"+message);
//这里演示,存储到redis里面
LogParseBean parseObject = JSON.parseObject(message, LogParseBean.class);
String type = parseObject.getType();
redisTemplate.opsForList().leftPush(type, parseObject.getValue());
//项目中可以存储到Elasticsearch等非关系型数据库
}

}

}
 

  • 日志注解AOP
package com.niugang.aop;
import java.lang.reflect.Method;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.niugang.annotation.LogApi;

@Aspect
@Component
public class LogAscept {

      @Autowired
private RedisTemplate<String, String> redisTemplate;

@Around("@annotation(com.niugang.annotation.LogApi)")
public Object aroundAdvice(ProceedingJoinPoint pjd) throws Throwable {
Object proceed = pjd.proceed();
MethodSignature signature = (MethodSignature)pjd.getSignature();
Method method = signature.getMethod();
LogApi logApi= method.getAnnotation(LogApi.class);
//调用redis发布消息
messageQueue(logApi);
return proceed; // 必须得有返回值,否则不能往下执行
}

private void messageQueue(LogApi logApi) {
String value = logApi.value();
String type = logApi.type();
JSONObject jsonObject = new JSONObject();
jsonObject.put("value", value);
jsonObject.put("type", type);
if(StringUtils.isNotBlank(value)){
redisTemplate.convertAndSend("log_queue", jsonObject.toJSONString());
}

}
}
  • Controller接口演示
/**
* 跳转到登录页面
*
* @param map
* @return
*/
@LogApi(value="跳转到登录页面")
@RequestMapping(value = "/login", method = RequestMethod.GET)
public String toLogin(ModelMap map) {
return "login";
}

     /**
* 跳转到index页面
*
* @return
*/
@RequestMapping(value = "/index")
@LogApi(value="列表页面")
public String index(ModelMap map) {
List<User> list = userService.queryList(null);
map.put("users", list);
return "index";
}
 

最后日志记录到Redis中

       微信公众号: 

?

JAVA程序猿成长之路

分享资源,记录程序猿成长点滴。专注于Java,Spring,SpringBoot,SpringCloud,分布式,微服务。

原文地址:https://www.cnblogs.com/niugang0920/p/12192413.html

时间: 2024-10-09 16:37:39

JavaWeb项目架构之Redis分布式日志队列-SpringBoot实例的相关文章

JavaWeb项目架构之Redis分布式日志队列

架构.分布式.日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Redis做消息队列罢了. 前言 为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 比如我们系统中常见的邮件.短信发送,把这些不需要及时响应的功能写入队列,异步处理请求,减少响应时间. 如何实现? 成熟的JMS消息队列中间件产品市面上有很多,但是基于目前项目的架构以及部署情况,我们采用Redis做

JavaWeb项目架构之Kafka分布式日志队列

架构.分布式.日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了. kafka介绍 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 特性 Kafka是一种高吞

JavaWeb项目架构之FastDFS分布式文件系统

概述 分布式文件系统:Distributed file system, DFS,又叫做网络文件系统:Network File System.一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间. FastDFS是用c语言编写的一款开源的分布式文件系统,充分考虑了冗余备份.负载均衡.线性扩容等机制,并注重高可用.高性能等指标,功能包括:文件存储.文件同步.文件访问(文件上传.文件下载)等,解决了大容量存储和负载均衡的问题.特别适合中小文件(建议范围:4KB < f

大型网站架构系列:分布式消息队列(一) (转)

以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统). 本次分享大纲 消息队列概述 消息队列应用场景 消息中间件示例 JMS消息服务(见第二篇:大型网站架构系列:分布式消息队列(二)) 常用消息队列(见第二篇:大型网站架构系列:分布式消息队列(二)) 参考(推荐)资料(见第二篇:大型网站架构系列:分布式消息队列(二)) 本次分享总结(见第二篇:大型网站架构系列:分布式消息队列(二)) 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要

大型网站架构系列:分布式消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

大型网站架构系列:分布式消息队列(一)(转)

大型网站架构系列:分布式消息队列(一) 以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统). 本次分享大纲 消息队列概述 消息队列应用场景 消息中间件示例 JMS消息服务(见第二篇:大型网站架构系列:分布式消息队列(二)) 常用消息队列(见第二篇:大型网站架构系列:分布式消息队列(二)) 参考(推荐)资料(见第二篇:大型网站架构系列:分布式消息队列(二)) 本次分享总结(见第二篇:大型网站架构系列:分布式消息队列(二)) 一.消息队列概述 消息

大型网站架构系列:分布式消息队列(一)

以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统). 本次分享大纲 消息队列概述 消息队列应用场景 消息中间件示例 JMS消息服务 常用消息队列 参考(推荐)资料 本次分享总结 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,K

利用开源架构ELK构建分布式日志系统

本文介绍了如何使用成熟的经典架构ELK(即Elastic search,Logstash和Kibana)构建分布式日志监控系统,很多公司采用该架构构建分布式日志系统,包括新浪微博,freewheel,畅捷通等. 背景日志,对每个系统来说,都是很重要,又很容易被忽视的部分.日志里记录了程序执行的关键信息,ERROR和WARNING信息等等.我们可以根据日志做很多事情,做数据分析,系统监控,排查问题等等 .但是,任何一个中大型系统都不可能是单台Server,日志文件散落在几十台甚至成千上万台Serv

redis分布式锁&amp;队列应用

分布式锁 setnx(set if not exists) 如果设值成功则证明上锁成功,然后再调用del指令释放. // 这里的冒号:就是一个普通的字符,没特别含义,它可以是任意其它字符,不要误解 > setnx lock:codehole true OK ... do something critical ... > del lock:codehole (integer) 1 但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放.