一、需求描述
1. 利用Redis做消息队列,实现一个异步化服务框架;如图:
2. 利用搭建好的框架实现异步化发送点赞信息和登录异常信息 。
二、具体diamante实现
首先搭建应用Redis做消息队列的异步化框架
1.准备
JedisAdapter.java
类中加上lpush 和 bpop的代码用来实现消息队列;加上setObject 和 getObject实现序列化与反序列的过程(将事件存入消息队列的时候要序列化,从队列中取出事件的时候需要反序列化):
public long lpush(String key, String value){ Jedis jedis = null; try { jedis = jedisPool.getResource(); return jedis.lpush(key, value); }catch (Exception e){ logger.error("Jedis lpush 发生异常 " + e.getMessage()); return 0; }finally { if(jedis != null){ try { jedis.close(); }catch (Exception e){ logger.error("Jedis 关闭异常 " + e.getMessage()); } } } } public List<String> brpop(int timeout, String key){ Jedis jedis = null; try { jedis = jedisPool.getResource(); return jedis.brpop(timeout, key); }catch (Exception e){ logger.error("Jedis brpop发生异常 " + e.getMessage()); return null; }finally { if (jedis != null){ try { jedis.close(); }catch (Exception e){ logger.error("Jedis 关闭异常" + e.getMessage()); } } } } //序列化 public void setObject(String key, Object object){ set(key, JSON.toJSONString(object)); } //反序列化 public <T> T getObject(String key, Class<T> clazz){ String value = get(key); if(value != null){ return JSON.parseObject(value, clazz); } return null; }
RedisKeyUtil.java
类中加上一个生成事件key的方法,以后的事件都存入这个key对应的set集合中。
private static String BIZ_EVENT = "DISLIKE"; /** * 事件发生的时候,生成key * @return */ public static String getEventQueueKey(){ return BIZ_EVENT; }
2. 异步化框架
EventType.java :事件类型
package com.nowcoder.async; /** * Created by Administrator on 2017/5/7. */ public enum EventType { LIKE(0), COMMENT(1), LOGIN(1), MAIL(3); private int value; public int getValue() { return value; } EventType(int value) { this.value = value; } }
EventModel.java : 发生的事件的数据都打包成一个Model(然后对这个model中数据进行序列化)
package com.nowcoder.async; import java.util.HashMap; import java.util.Map; /** * Created by Administrator on 2017/5/7. */ public class EventModel { private EventType type; //事件触发者 private int actorId; //表示一个触发事件的对象 private int entityId; private int entityType; //事件对象的拥有者 private int entityOwnerId; //存放触发的事件数据 Map<String, String> exts = new HashMap<>(); public EventModel(EventType type){ this.type = type; } public String getExt(String key) { return exts.get(key); } public EventModel setExt(String key, String value) { exts.put(key, value); return this; } public EventModel(){ } public EventType getType() { return type; } public EventModel setType(EventType type) { this.type = type; return this; } public int getActorId() { return actorId; } public EventModel setActorId(int actorId) { this.actorId = actorId; return this; } public int getEntityId() { return entityId; } public EventModel setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityType() { return entityType; } public EventModel setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityOwnerId() { return entityOwnerId; } public EventModel setEntityOwnerId(int entityOwnerId) { this.entityOwnerId = entityOwnerId; return this; } public Map<String, String> getExts() { return exts; } public void setExts(Map<String, String> exts) { this.exts = exts; } }
EventProducer.java : 将发生的事件推送到消息队列。
package com.nowcoder.async; import com.alibaba.fastjson.JSONObject; import com.nowcoder.util.JedisAdapter; import com.nowcoder.util.RedisKeyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * Created by Administrator on 2017/5/7. */ @Service public class EventProducer { private static final Logger logger = LoggerFactory.getLogger(EventProducer.class); @Autowired JedisAdapter jedisAdapter; /** * 将产生的事件model推送到redis的工作队列中 * @param model * @return */ public boolean fireEvent(EventModel model){ try { //序列化 String json = JSONObject.toJSONString(model); //产生key String eventkey = RedisKeyUtil.getEventQueueKey(); //放入工作队列 jedisAdapter.lpush(eventkey, json); return true; }catch (Exception e){ logger.error("EventProducer fireEvent 发生异常 : " + e.getMessage()); return false; } } }
EventConsumer.java : 从消息队列中获取事件交给Handler类进行处理。
package com.nowcoder.async; import com.alibaba.fastjson.JSON; import com.nowcoder.util.JedisAdapter; import com.nowcoder.util.RedisKeyUtil; import jdk.nashorn.api.scripting.JSObject; import org.apache.commons.collections.map.HashedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by Administrator on 2017/5/7. */ @Service public class EventConsumer implements InitializingBean, ApplicationContextAware{ private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); //用来存储各种type事件的Handler private Map<EventType, List<EventHandler>> config = new HashMap<EventType, List<EventHandler>>(); private ApplicationContext applicationContext; @Autowired JedisAdapter jedisAdapter; @Override public void afterPropertiesSet() throws Exception { //获取上下文所有实现EventHandler的类 //使用BeanFatory的getBeansOfType()方法,该方法返回一个Map类型的实例,Map中的key为Bean的名,key对应的内容为Bean的实例。 Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class); if (beans != null){ for (Map.Entry<String, EventHandler> entry : beans.entrySet()){ List<EventType> eventTypes = entry.getValue().getSupportEventType(); for (EventType type : eventTypes){ //初始化的时候,若没有type,就加入 if(!config.containsKey(type)){ config.put(type, new ArrayList<EventHandler>()); } config.get(type).add(entry.getValue()); } } } //启动线程从工作队列中取出事件进行处理 Thread thread = new Thread(new Runnable() { @Override public void run() { while (true){ String key = RedisKeyUtil.getEventQueueKey(); //从Redis数据库的键为key的set集合中获取存储的事件(事件Event为序列化过的,String类型) List<String> events = jedisAdapter.brpop(0, key); for (String message : events){ if (message.equals(key)){ continue; } EventModel eventModel = JSON.parseObject(message, EventModel.class); //若事件没有注册过 if (!config.containsKey(eventModel.getType())){ logger.error("不能识别的事件 "); continue; } //获取关注过该事件的handler,一一进行处理事件 for (EventHandler handler : config.get(eventModel.getType())){ handler.doHandle(eventModel); } } } } }); thread.start(); } /** * 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量. */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
EventHandler.java: 接口,可以从消费者中获取事件交给对应的Handler实现类去处理:
package com.nowcoder.async; import java.util.List; /** * Created by Administrator on 2017/5/7. */ public interface EventHandler { //对EventConsumer中的event事件进行处理 void doHandle(EventModel model); //获取哪些关注事件类型 List<EventType> getSupportEventType(); }
LikeHandler.java: 实现点赞通知的类
package com.nowcoder.async.handler; import com.nowcoder.async.EventHandler; import com.nowcoder.async.EventModel; import com.nowcoder.async.EventType; import com.nowcoder.model.HostHolder; import com.nowcoder.model.Message; import com.nowcoder.model.User; import com.nowcoder.service.MessageService; import com.nowcoder.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Date; import java.util.List; /** * Created by Administrator on 2017/5/7. */ @Component public class LikeHandler implements EventHandler{ @Autowired MessageService messageService; @Autowired UserService userService; @Autowired HostHolder hostHolder; @Override public void doHandle(EventModel model) { System.out.print("有人点赞了"); Message message = new Message(); //测试方便查看:就是自己发送给自己站内信 //int fromId = model.getActorId(); // int toId = fromId; //正常情况下fromId是当前点赞用户id,toId是点赞的咨询news所在的id // actorId = hostHolder.getUser().getId(); int fromId = model.getActorId(); //entityOwnerId = news.getId() int toId = model.getEntityOwnerId(); message.setHasRead(0);// 0 代表未读 1 代表已读 message.setFromId(fromId); message.setToId(toId); message.setConversationId(fromId < toId ? String.format("%d_$d", fromId, toId) : String.format("%d_%d", toId, fromId)); User user = userService.getUser(model.getActorId()); message.setContent("用户" + user.getName() + "赞了你的资讯,http://127.0.0.1:8080/news/" + model.getEntityId()); message.setCreatedDate(new Date()); messageService.addMessage(message); } @Override public List<EventType> getSupportEventType() { return Arrays.asList(EventType.LIKE); } }
LoginExceptionHandler:登录时发生登录异常到对应的站内信,以及实现邮件发送
package com.nowcoder.async.handler; import com.nowcoder.async.EventHandler; import com.nowcoder.async.EventModel; import com.nowcoder.async.EventType; import com.nowcoder.model.Message; import com.nowcoder.service.MessageService; import com.nowcoder.util.MailSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; /** * Created by Administrator on 2017/5/7. */ @Service public class LoginExceptionHandler implements EventHandler{ @Autowired MessageService messageService; @Autowired MailSender mailSender; @Override public void doHandle(EventModel model) { // 判断是否有异常登陆 Message message = new Message(); message.setToId(model.getActorId()); message.setContent("你上次的登陆ip异常"); message.setFromId(17); message.setCreatedDate(new Date()); messageService.addMessage(message); //邮件发送 Map<String, Object> map = new HashMap<String, Object>(); map.put("username", model.getExt("username")); mailSender.sendWithHTMLTemplate(model.getExt("email"), "登陆异常", "mails/welcome.html", map); } @Override public List<EventType> getSupportEventType() { return Arrays.asList(EventType.LOGIN); } }
3. 邮件发送
引入jar包:
<dependency> <groupId>com.sun.mail</groupId> <artifactId>javax.mail</artifactId> <version>1.5.5</version> </dependency>
package com.nowcoder.util; import org.apache.velocity.app.VelocityEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.mail.javamail.JavaMailSenderImpl; import org.springframework.mail.javamail.MimeMessageHelper; import org.springframework.stereotype.Service; import javax.mail.internet.MimeUtility; import org.springframework.ui.velocity.VelocityEngineUtils; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import java.util.Map; import java.util.Properties; /** * Created by Administrator on 2017/5/7. */ @Service public class MailSender implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(MailSender.class); private JavaMailSenderImpl mailSender; @Autowired private VelocityEngine velocityEngine; public boolean sendWithHTMLTemplate(String to, String subject, String template, Map<String, Object> model) { try { String nick = MimeUtility.encodeText("阮宏宝"); InternetAddress from = new InternetAddress(nick + "<[email protected]>"); MimeMessage mimeMessage = mailSender.createMimeMessage(); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage); String result = VelocityEngineUtils .mergeTemplateIntoString(velocityEngine, template, "UTF-8", model); mimeMessageHelper.setTo(to); mimeMessageHelper.setFrom(from); mimeMessageHelper.setSubject(subject); mimeMessageHelper.setText(result, true); mailSender.send(mimeMessage); return true; } catch (Exception e) { logger.error("发送邮件失败" + e.getMessage()); return false; } } @Override public void afterPropertiesSet() throws Exception { mailSender = new JavaMailSenderImpl(); mailSender.setUsername("[email protected]"); mailSender.setPassword("***********"); mailSender.setHost("smtp.qq.com"); mailSender.setPort(465); mailSender.setProtocol("smtps"); mailSender.setDefaultEncoding("utf8"); Properties javaMailProperties = new Properties(); javaMailProperties.put("mail.smtp.ssl.enable", true); mailSender.setJavaMailProperties(javaMailProperties); } }
4. 测试
LikeController.java:
点赞的时候加入异步点赞通知:
//异步发送 eventProducer.fireEvent(new EventModel(EventType.LIKE) .setActorId(hostHolder.getUser().getId()) .setEntityId(newsId) .setEntityType(EntityType.ENTITY_NEWS) .setEntityOwnerId(news.getUserId()));
LoginController.java
登录时加上登录异常的通知:
eventProducer.fireEvent(new EventModel(EventType.LOGIN) .setActorId(18) .setExt("username", username).setExt("email", "1032335358 @qq.com"));
5 相关代码
LoginController.java
package com.nowcoder.controller; import com.nowcoder.async.EventModel; import com.nowcoder.async.EventProducer; import com.nowcoder.async.EventType; import com.nowcoder.service.UserService; import com.nowcoder.util.ToutiaoUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.*; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletResponse; import java.util.Map; /** * Created by Administrator on 2017/4/8. */ @Controller public class LoginController { private static final Logger logger = LoggerFactory.getLogger(LoginController.class); @Autowired UserService userService; @Autowired EventProducer eventProducer; @RequestMapping(path = {"/reg/"}, method = {RequestMethod.GET, RequestMethod.POST}) @ResponseBody public String reg(Model model, @RequestParam("username") String username, @RequestParam("password") String password, @RequestParam(value = "rember", defaultValue = "0") int rember, HttpServletResponse response){ try { Map<String, Object> map = userService.register(username, password); if(map.containsKey("ticket")){ Cookie cookie = new Cookie("ticket", map.get("ticket").toString()); cookie.setPath("/"); //有记住我,就设置时间长一点 if(rember > 0){ cookie.setMaxAge(3600 * 24 * 5); } response.addCookie(cookie); return ToutiaoUtil.getJSONString(0, "注册成功"); }else { return ToutiaoUtil.getJSONString(1, map); } }catch (Exception e){ logger.error("注册异常" + e.getMessage()); return ToutiaoUtil.getJSONString(1, "注册异常"); } } @RequestMapping(path = {"/login/"}, method = {RequestMethod.GET, RequestMethod.POST}) @ResponseBody public String login(Model model, @RequestParam("username") String username, @RequestParam("password") String password, @RequestParam(value = "rember", defaultValue = "0") int rememberme, HttpServletResponse response){ try { Map<String, Object> map = userService.login(username, password); if (map.containsKey("ticket")) { Cookie cookie = new Cookie("ticket", map.get("ticket").toString()); cookie.setPath("/"); if (rememberme > 0) { cookie.setMaxAge(3600*24*5); } response.addCookie(cookie); eventProducer.fireEvent(new EventModel(EventType.LOGIN) .setActorId(18) .setExt("username", username).setExt("email", "1032335358 @qq.com")); return ToutiaoUtil.getJSONString(0, "登录成功"); } else { return ToutiaoUtil.getJSONString(1, map); } } catch (Exception e) { logger.error("登录异常" + e.getMessage()); return ToutiaoUtil.getJSONString(1, "登录异常"); } } @RequestMapping(path = {"/logout/"}, method = {RequestMethod.POST, RequestMethod.GET}) public String logout(@CookieValue("ticket") String ticket){ userService.logout(ticket); return "redirect:/"; } }
LikeController.java
package com.nowcoder.controller; import com.nowcoder.async.EventModel; import com.nowcoder.async.EventProducer; import com.nowcoder.async.EventType; import com.nowcoder.model.EntityType; import com.nowcoder.model.HostHolder; import com.nowcoder.model.News; import com.nowcoder.service.LikeService; import com.nowcoder.service.NewsService; import com.nowcoder.util.ToutiaoUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; /** * Created by Administrator on 2017/5/1. */ @Controller public class LikeController { @Autowired LikeService likeService; @Autowired NewsService newsService; @Autowired HostHolder hostHolder; @Autowired EventProducer eventProducer; @RequestMapping(path = {"/like"}, method = {RequestMethod.GET, RequestMethod.POST}) @ResponseBody public String like(@RequestParam("newsId") int newsId){ //在likeKey对应的集合中加入当前用户 long likeCount = likeService.like(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId); //资讯上更新点赞数 News news = newsService.getById(newsId); newsService.updateLikeCount(newsId, (int)likeCount); //异步发送 eventProducer.fireEvent(new EventModel(EventType.LIKE) .setActorId(hostHolder.getUser().getId()) .setEntityId(newsId) .setEntityType(EntityType.ENTITY_NEWS) .setEntityOwnerId(news.getUserId())); return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount)); } @RequestMapping(path = {"/dislike"}, method = {RequestMethod.POST, RequestMethod.GET}) @ResponseBody public String disLike(@RequestParam("newsId") int newsId){ //在disLikeKey对应的集合中加入当前用户 long likeCount = likeService.disLike(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId); if(likeCount <= 0){ likeCount = 0; } //资讯上更新喜欢数 newsService.updateLikeCount(newsId, (int)likeCount); return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount)); } }
时间: 2024-10-09 08:40:31