架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Redis做消息队列罢了。
?
- 为什么需要消息队列?
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。
比如我们系统中常见的邮件、短信发送,把这些不需要及时响应的功能写入队列,异步处理请求,减少响应时间。
- 如何实现?
成熟的JMS消息队列中间件产品市面上有很多,但是基于目前项目的架构以及部署情况,我们采用Redis做消息队列。
- 为什么用Redis?
Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。
它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础。
- SpringBoot演示
- application.properties
主要用于redis基本连接配置
#配置redis #在RedisProperties.class有redis的默认配置,默认host为localhost,默认端口为6379 spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.maxIdle=3 spring.redis.maxTotal=20
- RedisConfig.java
redis连接配置,序列化配置,订阅发布者配置
package com.niugang; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.niugang.mq.MessageDelegate; import com.niugang.mq.MessageDelegateImpl; import redis.clients.jedis.JedisPoolConfig; @Configuration @ConfigurationProperties(prefix = "spring.redis") public class RedisConfig { public String host; public int port; public int maxIdle; public int maxTotal; @Bean public JedisConnectionFactory jedisConnectionFactory() { JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); jedisConnectionFactory.setHostName(host); jedisConnectionFactory.setPort(port); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxTotal(maxTotal); jedisConnectionFactory.setPoolConfig(jedisPoolConfig); return jedisConnectionFactory; } // 默认用的是用JdkSerializationRedisSerializer进行序列化的 @Bean @SuppressWarnings({ "rawtypes", "unchecked" }) public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); // 注入数据源 redisTemplate.setConnectionFactory(jedisConnectionFactory()); // 使用Jackson2JsonRedisSerialize 替换默认序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public StringRedisTemplate stringRedisTemplate() { return new StringRedisTemplate(jedisConnectionFactory()); } // ################发布订阅配置################################### @Bean public MessageDelegate messageDelegate() { return new MessageDelegateImpl(); } @Bean public MessageListenerAdapter messageListener() { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageDelegate()); return messageListenerAdapter; } /** * RedisMessageListenerContainer充当消息侦听器容器;它用于接收来自Redis通道的消息, * 并驱动注入到其中的MessageListeners。侦听器容器负责消息接收的所有线程,并将消息分派到侦听器中进行处理。 * 消息侦听器容器是MDP(message-driven POJOs)和消息传递提供程序之间的中介 负责注册接收消息,资源获取和发布,异常。 * * 转换等 */ @Bean public RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer con = new RedisMessageListenerContainer(); con.setConnectionFactory(jedisConnectionFactory()); ChannelTopic channelTopic = new ChannelTopic("log_queue"); con.addMessageListener(messageListener(), channelTopic); return con; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public int getMaxIdle() { return maxIdle; } public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; } public int getMaxTotal() { return maxTotal; } public void setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; } }
- 日志注解
package com.niugang.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 定义记录日志注解 * @author niugang * */ @Retention(RetentionPolicy.RUNTIME) //注解可用在方法和类上 @Target({ElementType.METHOD, ElementType.TYPE}) public @interface LogApi { /** * 日志描述 */ String value() default ""; /** * 日志类型 * @return */ String type() default LogType.OPERATE_LOG; }
- 日志注解解析类
package com.niugang.annotation; /** * 日志解析bean * @author niugang * */ public class LogParseBean { private String value; private String type; public String getValue() { return value; } public void setValue(String value) { this.value = value; } public String getType() { return type; } public void setType(String type) { this.type = type; } }
- 日志类型常量类
package com.niugang.annotation; /** * 日志类型常量类 * @author niugang * */ public class LogType { public static final String OPERATE_LOG="redis:operate:log"; }
- 消息委托接口(根据Spring-data-redis官方文档配置)
package com.niugang.mq; import java.io.Serializable; import java.util.Map; /** * 考虑下面的接口定义。 * 注意,尽管接口没有扩展MessageListener接口,仍然可以通过使用MessageListenerAdapter将其用作MDP * 类(容器中配置)。还要注意各种消息处理方法是如何根据 * * 它们可以接收和处理的各种消息类型的内容 * * @author niugang * */ public interface MessageDelegate { // 默认监听的方法就是handleMessage void handleMessage(String message); /* * void handleMessage(Map message); void handleMessage(byte[] message); void * handleMessage(Serializable message); // pass the channel/pattern as well * void handleMessage(Serializable message, String channel); */ }
- 消息委托接口实现类(根据Spring-data-redis官方文档配置)
以下处理主要是订阅者接收到消息,将消息根据类型存到redis中,只是为了演示
package com.niugang.mq; import java.io.Serializable; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import com.alibaba.fastjson.JSON; import com.niugang.annotation.LogParseBean; public class MessageDelegateImpl implements MessageDelegate{ @Autowired private RedisTemplate<String, String> redisTemplate; @Override public void handleMessage(String message) { if(StringUtils.isNotBlank(message)){ System.out.println("message:"+message); //这里演示,存储到redis里面 LogParseBean parseObject = JSON.parseObject(message, LogParseBean.class); String type = parseObject.getType(); redisTemplate.opsForList().leftPush(type, parseObject.getValue()); //项目中可以存储到Elasticsearch等非关系型数据库 } } }
- 日志注解AOP
package com.niugang.aop; import java.lang.reflect.Method; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.niugang.annotation.LogApi; @Aspect @Component public class LogAscept { @Autowired private RedisTemplate<String, String> redisTemplate; @Around("@annotation(com.niugang.annotation.LogApi)") public Object aroundAdvice(ProceedingJoinPoint pjd) throws Throwable { Object proceed = pjd.proceed(); MethodSignature signature = (MethodSignature)pjd.getSignature(); Method method = signature.getMethod(); LogApi logApi= method.getAnnotation(LogApi.class); //调用redis发布消息 messageQueue(logApi); return proceed; // 必须得有返回值,否则不能往下执行 } private void messageQueue(LogApi logApi) { String value = logApi.value(); String type = logApi.type(); JSONObject jsonObject = new JSONObject(); jsonObject.put("value", value); jsonObject.put("type", type); if(StringUtils.isNotBlank(value)){ redisTemplate.convertAndSend("log_queue", jsonObject.toJSONString()); } } }
- Controller接口演示
/** * 跳转到登录页面 * * @param map * @return */ @LogApi(value="跳转到登录页面") @RequestMapping(value = "/login", method = RequestMethod.GET) public String toLogin(ModelMap map) { return "login"; } /** * 跳转到index页面 * * @return */ @RequestMapping(value = "/index") @LogApi(value="列表页面") public String index(ModelMap map) { List<User> list = userService.queryList(null); map.put("users", list); return "index"; }
最后日志记录到Redis中
微信公众号:
?
JAVA程序猿成长之路
分享资源,记录程序猿成长点滴。专注于Java,Spring,SpringBoot,SpringCloud,分布式,微服务。
原文地址:https://www.cnblogs.com/niugang0920/p/12192413.html
时间: 2024-10-09 16:37:39