netty学习:UDP服务器与Spring整合(2)

上一篇文章中,介绍了netty实现UDP服务器的栗子,本文将会对UDP服务器与spring boot整合起来,并使用RedisTemplate的操作类访问Redis和使用JPA链接MySQL,其中会使用多线程、异步等知识。

本人使用的编辑器是IntelliJ IDEA 2017.1.exe版本(链接:http://pan.baidu.com/s/1pLODHm7 密码:dlx7);建议使用STS或者是idea编辑器来进行spring的学习。

1)项目目录结构

整个项目的目录结构如下:

2)jar包

其中pom.xml文件的内容如下:

只有netty-all和commons-lang3是手动加入的jar包,其余的都是创建spring boot项目时候选择组件后自动导入的。
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5
 6     <groupId>com.example</groupId>
 7     <artifactId>udplearning</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10
11     <name>udplearning</name>
12     <description>Demo project for Spring Boot</description>
13
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>1.5.6.RELEASE</version>
18         <relativePath/> <!-- lookup parent from repository -->
19     </parent>
20
21     <properties>
22         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
24         <commons-lang3.version>3.4</commons-lang3.version>
25         <java.version>1.8</java.version>
26     </properties>
27
28     <dependencies>
29
30         <!-- netty  -->
31
32         <dependency>
33             <groupId>io.netty</groupId>
34             <artifactId>netty-all</artifactId>
35             <version>4.0.49.Final</version>
36         </dependency>
37
38
39         <dependency>
40             <groupId>org.apache.commons</groupId>
41             <artifactId>commons-lang3</artifactId>
42             <version>${commons-lang3.version}</version>
43         </dependency>
44
45
46
47         <dependency>
48             <groupId>org.springframework.boot</groupId>
49             <artifactId>spring-boot-starter-data-jpa</artifactId>
50         </dependency>
51         <dependency>
52             <groupId>org.springframework.boot</groupId>
53             <artifactId>spring-boot-starter-data-redis</artifactId>
54         </dependency>
55         <dependency>
56             <groupId>org.springframework.boot</groupId>
57             <artifactId>spring-boot-starter-jdbc</artifactId>
58         </dependency>
59         <dependency>
60             <groupId>org.springframework.boot</groupId>
61             <artifactId>spring-boot-starter-web</artifactId>
62         </dependency>
63         <dependency>
64             <groupId>org.springframework.boot</groupId>
65             <artifactId>spring-boot-starter-web-services</artifactId>
66         </dependency>
67
68         <dependency>
69             <groupId>mysql</groupId>
70             <artifactId>mysql-connector-java</artifactId>
71             <scope>runtime</scope>
72         </dependency>
73         <dependency>
74             <groupId>org.springframework.boot</groupId>
75             <artifactId>spring-boot-starter-test</artifactId>
76             <scope>test</scope>
77         </dependency>
78     </dependencies>
79
80     <build>
81         <plugins>
82             <plugin>
83                 <groupId>org.springframework.boot</groupId>
84                 <artifactId>spring-boot-maven-plugin</artifactId>
85             </plugin>
86         </plugins>
87     </build>
88
89
90 </project>

3)配置文件application.properties

application.properties的内容:

1 spring.profiles.active=test
2
3 spring.messages.encoding=utf-8
4
5 logging.config=classpath:logback.xml
“spring.profiles.active” 针对多种启动环境的spring boot配置方法,此时启动的是test运行环境,即默认是启动application-test.properties里面的配置信息;
“spring.messages.encoding=utf-8”是指编码方式utf-8;“logging.config=classpath:logback.xml”是指日志文件位置。
application-test.properties的内容如下:
 1 context.listener.classes=com.example.demo.init.StartupEvent
 2
 3 #mysql
 4 spring.jpa.show-sql=true
 5 spring.jpa.database=mysql
 6 #spring.jpa.hibernate.ddl-auto=update
 7 spring.datasource.url=jdbc:mysql://127.0.0.1/test
 8 spring.datasource.username=root
 9 spring.datasource.password=123456
10 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
11 spring.datasource.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)
12
13 spring.session.store-type=none
14
15 # (RedisProperties)
16 spring.redis.database=3
17 spring.redis.host=127.0.0.1
18 spring.redis.port=6379
19 spring.redis.password=123456
20 spring.redis.pool.max-active=8
21 spring.redis.pool.max-wait=-1
22 spring.redis.pool.max-idle=8
23 spring.redis.pool.min-idle=0
24 spring.redis.timeout=0
25
26
27 #UDP消息接收打端口
28 sysfig.udpReceivePort = 7686
29
30 #线程池
31 spring.task.pool.corePoolSize = 5
32 spring.task.pool.maxPoolSize = 100
33 spring.task.pool.keepAliveSeconds = 100
34 spring.task.pool.queueCapacity = 100

其中配置了context.listener.classes=com.example.demo.init.StartupEvent,将StartupEvent类作为Spring boot启动后执行文件。

其中还配置了一些mysql、redis和自定义的属性。可根据项目的实际情况修改。


4)日志文件logback.xml
logback.xml的内容如下:
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <configuration xmlns="http://ch.qos.logback/xml/ns/logback"
 3                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4                xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback
 5                http://ch.qos.logback/xml/ns/logback/logback.xsd
 6                http://ch.qos.logback/xml/ns/logback ">
 7     <property name="APP_Name" value="udplearning" />
 8     <timestamp key="bySecond" datePattern="yyyyMMdd‘T‘HHmmss" />
 9     <contextName>${APP_Name}</contextName>
10
11     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
12         <encoder>
13             <pattern>%d{yyyyMMddHHmmss}|%-5level| %logger{0}.%M | %msg | %thread %n</pattern>
14         </encoder>
15     </appender>
16
17   <appender name="FILELOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
18     <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
19       <fileNamePattern>${catalina.home}/logs/app.%d{yyyyMMdd}.log</fileNamePattern>
20       <maxHistory>30</maxHistory>
21     </rollingPolicy>
22     <encoder>
23       <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern>
24     </encoder>
25   </appender>
26
27     <appender name="RUNLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
28     <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
29       <fileNamePattern>${catalina.home}/logs/run.%d{yyyyMMdd}.log</fileNamePattern>
30       <maxHistory>7</maxHistory>
31     </rollingPolicy>
32     <encoder>
33       <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern>
34     </encoder>
35   </appender>
36
37     <logger name="com.example.demo" level="debug" additivity="false">
38         <appender-ref ref="STDOUT" />
39         <appender-ref ref="FILELOG" />
40     </logger>
41
42     <root level="info">
43         <appender-ref ref="STDOUT" />
44     </root>
45 </configuration>

日志的级别是info级别  可以根据自己项目的实际情况进行设置。

5)StartupEvent.java

 1 package com.example.demo.init;
 2
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.context.ApplicationContext;
 6 import org.springframework.context.ApplicationListener;
 7 import org.springframework.context.event.ContextRefreshedEvent;
 8
 9 /**
10  *
11  * Created by wj on 2017/8/28.
12  */
13
14 public class StartupEvent implements ApplicationListener<ContextRefreshedEvent> {
15     private static final Logger log = LoggerFactory.getLogger(StartupEvent.class);
16
17     private static ApplicationContext context;
18
19     @Override
20     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
21
22         try {
23
24             context = contextRefreshedEvent.getApplicationContext();
25
26             SysConfig sysConfig = (SysConfig) context.getBean(SysConfig.class);
27
28             //接收UDP消息并保存至redis中
29             UdpServer udpServer = (UdpServer)StartupEvent.getBean(UdpServer.class);
30             udpServer.run(sysConfig.getUdpReceivePort());
31
32
33 //            这里可以开启多个线程去执行不同的任务
34 //            此处为工作的内容,不便公开!
35
36
37         } catch (Exception e) {
38             log.error("Exception", e);
39         }
40     }
41
42     public static Object getBean(Class beanName) {
43         return context != null ? context.getBean(beanName) : null;
44     }
45 }

6)UdpServer.java

 1 package com.example.demo.init;
 2
 3 import com.example.demo.handle.UdpServerHandler;
 4 import io.netty.bootstrap.Bootstrap;
 5 import io.netty.channel.ChannelOption;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.nio.NioDatagramChannel;
 9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.springframework.scheduling.annotation.Async;
12 import org.springframework.stereotype.Component;
13
14 /**
15  * server服务器
16  * Created by wj on 2017/8/30.
17  */
18 @Component
19 public class UdpServer {
20
21     private static final Logger log= LoggerFactory.getLogger(UdpServer.class);
22
23 //    private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));
24
25     @Async("myTaskAsyncPool")
26     public void run(int udpReceivePort) {
27
28         EventLoopGroup group = new NioEventLoopGroup();
29         log.info("Server start!  Udp Receive msg Port:" + udpReceivePort );
30
31         try {
32             Bootstrap b = new Bootstrap();
33             b.group(group)
34                     .channel(NioDatagramChannel.class)
35                     .option(ChannelOption.SO_BROADCAST, true)
36                     .handler(new UdpServerHandler());
37
38             b.bind(udpReceivePort).sync().channel().closeFuture().await();
39         } catch (InterruptedException e) {
40             e.printStackTrace();
41         } finally {
42             group.shutdownGracefully();
43         }
44     }
45
46 }

此处NioDatagramChannel.class采用的是非阻塞的模式接受UDP消息,若是接受的UDP消息少,可以采用阻塞式的方式接受UDP消息。

UdpServer.run()方法使用@Async将该方法定义成异步的,myTaskAsyncPool是自定义的线程池。

7)UdpServerHandler.java

 1 package com.example.demo.handle;
 2
 3 import com.example.demo.init.StartupEvent;
 4 import com.example.demo.repository.redis.RedisRepository;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import io.netty.channel.SimpleChannelInboundHandler;
 8 import io.netty.channel.socket.DatagramPacket;
 9 import io.netty.util.CharsetUtil;
10 import org.apache.commons.lang3.StringUtils;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13
14 /**
15  * 接受UDP消息,并保存至redis的list链表中
16  * Created by wj on 2017/8/30.
17  *
18  */
19
20 public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
21
22     private static final Logger log= LoggerFactory.getLogger(UdpServerHandler.class);
23
24     //用来计算server接收到多少UDP消息
25     private static int count = 0;
26
27     @Override
28     public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
29
30         String receiveMsg = packet.content().toString(CharsetUtil.UTF_8);
31
32         log.info("Received UDP Msg:" + receiveMsg);
33
34         //判断接受到的UDP消息是否正确(未实现)
35         if (StringUtils.isNotEmpty(receiveMsg) ){
36             //计算接收到的UDP消息的数量
37             count++;
38
39             //获取RedirRepository对象
40             RedisRepository redisRepository = (RedisRepository) StartupEvent.getBean(RedisRepository.class);
41             //将获取到的UDP消息保存至redis的list列表中
42             redisRepository.lpush("udp:msg", receiveMsg);
43             redisRepository.setKey("UDPMsgNumber", String.valueOf(count));
44
45
46 //            在这里可以返回一个UDP消息给对方,告知已接收到UDP消息,但考虑到这是UDP消息,此处可以注释掉
47             ctx.write(new DatagramPacket(
48                     Unpooled.copiedBuffer("QOTM: " + "Got UDP Message!" , CharsetUtil.UTF_8), packet.sender()));
49
50         }else{
51             log.error("Received Error UDP Messsage:" + receiveMsg);
52         }
53     }
54
55     @Override
56     public void channelReadComplete(ChannelHandlerContext ctx) {
57         ctx.flush();
58     }
59
60     @Override
61     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
62         cause.printStackTrace();
63         // We don‘t close the channel because we can keep serving requests.
64     }
65
66 }

此处若不借用ApplicationContext.getBean,是无法获取到RedisRepository对象的。

注:这里是无法使用注解@Autowired来获取到redisTemplate对象的。

8) RedisRepository.java

 1 package com.example.demo.repository.redis;
 2
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.data.redis.core.RedisTemplate;
 7 import org.springframework.stereotype.Service;
 8
 9 /**
10  * 链接redis
11  * 实现list lpush和rpop
12  * Created by wj on 2017/8/30.
13  */
14
15
16 @Service
17 public class RedisRepository {
18     private static final Logger log = LoggerFactory.getLogger(RedisRepository.class);
19
20     @Autowired
21     private RedisTemplate<String, String> redisTemplate;
22
23     //----------------String-----------------------
24     public void setKey(String key,String value){
25         redisTemplate.opsForValue().set(key, value);
26     }
27
28
29     //----------------list----------------------
30     public Long lpush(String key, String val) throws Exception{
31         log.info("UDP Msg保存至redis中,key:" + key + ",val:" + val);
32         return redisTemplate.opsForList().leftPush(key, val);
33     }
34
35     public String rpop(String key) throws Exception {
36         return redisTemplate.opsForList().rightPop(key);
37     }
38
39 }

使用springframework框架中的RedisTemplate类去链接redis,此处是将收到的UDP消息左保存(lpush)至list链表中,然后右边弹出(rpop)。

9)线程池的相关信息

TaskExecutePool.java
 1 package com.example.demo.thread;
 2
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.context.annotation.Bean;
 5 import org.springframework.context.annotation.Configuration;
 6 import org.springframework.scheduling.annotation.EnableAsync;
 7 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 8
 9 import java.util.concurrent.Executor;
10 import java.util.concurrent.ThreadPoolExecutor;
11
12 /**
13  * Created by wj on 2017/8/29.
14  *
15  * thread线程池的相关信息
16  */
17 @Configuration
18 @EnableAsync
19 public class TaskExecutePool {
20
21     @Autowired
22     private TaskThreadPoolConfig config;
23
24     @Bean
25     public Executor myTaskAsyncPool() {
26         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
27         executor.setCorePoolSize(config.getCorePoolSize());
28         executor.setMaxPoolSize(config.getMaxPoolSize());
29         executor.setQueueCapacity(config.getQueueCapacity());
30         executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
31         executor.setThreadNamePrefix("MyExecutor-");
32
33         // rejection-policy:当pool已经达到max size的时候,如何处理新任务
34         // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
35         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
36         executor.initialize();
37         return executor;
38     }
39 }
TaskThreadPoolConfig.java
 1 package com.example.demo.thread;
 2
 3 import org.springframework.boot.context.properties.ConfigurationProperties;
 4 import org.springframework.context.annotation.ComponentScan;
 5 import org.springframework.stereotype.Component;
 6
 7 /**
 8  * Created by wj on 2017/8/29.
 9  */
10
11 @Component
12 @ComponentScan("com.example.demo.init")
13 @ConfigurationProperties(prefix = "spring.task.pool") // 该注解的locations已经被启用,现在只要是在环境中,都会优先加载
14 public class TaskThreadPoolConfig {
15     private int corePoolSize;
16
17     private int maxPoolSize;
18
19     private int keepAliveSeconds;
20
21     private int queueCapacity;
22
23     public int getCorePoolSize() {
24         return corePoolSize;
25     }
26
27     public void setCorePoolSize(int corePoolSize) {
28         this.corePoolSize = corePoolSize;
29     }
30
31     public int getMaxPoolSize() {
32         return maxPoolSize;
33     }
34
35     public void setMaxPoolSize(int maxPoolSize) {
36         this.maxPoolSize = maxPoolSize;
37     }
38
39     public int getKeepAliveSeconds() {
40         return keepAliveSeconds;
41     }
42
43     public void setKeepAliveSeconds(int keepAliveSeconds) {
44         this.keepAliveSeconds = keepAliveSeconds;
45     }
46
47     public int getQueueCapacity() {
48         return queueCapacity;
49     }
50
51     public void setQueueCapacity(int queueCapacity) {
52         this.queueCapacity = queueCapacity;
53     }
54 }

10)发送udp消息的测试代码是直接借用官方公布的栗子,上一篇已详细介绍了,在此不再公布

11)小结

其实发送UDP和接收UDP消息的核心代码很简单,只是netty框架将其包装了。

UDP发送消息是

1 byte[] buffer = ...
2 InetAddress address = InetAddress.getByName("localhost");
3
4 DatagramPacket packet = new DatagramPacket(
5     buffer, buffer.length, address, 9999);
6     DatagramSocket datagramSocket = new DatagramSocket();
7     datagramSocket.send(packet);

udp接收消息是

1 DatagramSocket datagramSocket = new DatagramSocket(9999);
2
3 byte[] buffer =....
4 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
5
6 datagramSocket.receive(packet);

看起来是不是很简单???

12)源代码下载地址

https://github.com/wj302763621/udplearning.git

这里只公布了一个框架,其他很多部分由于涉及到了工作内容不便公布。

有需要的同学可以自行下载对其代码进行更改。

时间: 2024-10-11 05:27:01

netty学习:UDP服务器与Spring整合(2)的相关文章

MyBatis学习(三)---MyBatis和Spring整合

想要了解MyBatis基础的朋友可以通过传送门: MyBatis学习(一)---配置文件,Mapper接口和动态SQL http://www.cnblogs.com/ghq120/p/8322302.html MyBatis学习(二)---数据表之间关联 http://www.cnblogs.com/ghq120/p/8323918.html 之前两篇文章都是单独介绍了MyBatis的用法,并没有和任何框架进行整合.使用MyBatis完成数据库的操作仍有一些模板化的代码,比如关闭SqlSessi

Spring框架学习(4)spring整合hibernate

内容源自:spring整合hibernate    spring整合注解形式的hibernate 这里和上一部分学习一样用了模板模式, 将hibernate开发流程封装在ORM层提供的模板类HibernateTemplate中,通过在DAO中对模板类的使用,实现对传统hibernate开发流程的代替. 一.先来看看Hibernate的传统开发流程: 1) 配置SessionFactory对象 hibernate.cfg.xml <session-factory> a 数据源 driver_cl

Spring学习(五)spring整合hibernate

上一篇博客中讲到spring dao层对jdbc的封装,用到了模板模式的设计思想 .这篇我们来看看spring中的orm层对hibernate的封装,也就是所谓的spring整合 hibernate.这里同样用了模板模式, 将hibernate开发流程封装在ORM层提供的模板类HibernateTemplate中,通过在DAO中对模板类的使用,实现对传统hibernate开发流程的代替. 一.先来看看Hibernate的传统开发流程: 1) 配置SessionFactory对象 hibernat

Spring学习(六)spring整合注解形式的hibernate

上篇博客中谈到spring中如何整合普通形式的hibernate,这次我们来总结下如何整合注解形式的hibernate. 我们知道在普通hibernate中,表与实体的映射关系是写在映射关系文件当中的,一个实体类对应一个映射关系配置文件.而在注解形式中是没有这个映射关系文件的,关系直接在实体类中通过注解的方式展现,所以写法上略有些不同. 下面我们通过一个例子来看看他们的区别.还是使用上篇博客的例子,先去掉这个hibernate反向生成的City.hbm.xml文件. Dao层里面是不需要修改的,

Spring学习(八)spring整合struts2

一.spring框架对struts等表现层框架的整合原理 : 使用spring的ioc容器管理struts中用于处理请求的Action 将Action配置成ioc容器中的bean 延伸:spring对持久层框架/技术的整合原理 (封装) : 提供模板类封装对应技术/框架的开发流程 通过对模板类的使用,实现对传统开发流程的"代替". 二.整合方式: 插件方式 struts2为了实现对spring框架整合,也提供了一个插件的配置文件struts-plugin.xml struts2-spr

MyBatis学习(二):与Spring整合(非注解方式配置MyBatis)

搭建SpringMVC的-->传送门<-- 一.环境搭建: 目录结构: 引用的JAR包: 如果是Maven搭建的话,pom.xml的配置如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema

Spring学习(七)spring整合mybatis

相对于mybatis的平常写法,spring中在使用mybatis时,不需要mybatis-config.xml配置,以及MybatisFactory工厂,在applicationContext.xml中配置即可. 还是使用上次的案例:mybatis传送门 附上applicationContext.xml: <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w

Spring整合MyBatis完整示例

为了梳理前面学习的内容<Spring整合MyBatis(Maven+MySQL)一>与<Spring整合MyBatis(Maven+MySQL)二>,做一个完整的示例完成一个简单的图书管理功能,主要使用到的技术包含Spring.MyBatis.Maven.MySQL及简单MVC等.最后的运行效果如下所示: 项目结构如下: 一.新建一个基于Maven的Web项目 1.1.创建一个简单的Maven项目,项目信息如下: 1.2.修改层面信息,在项目上右键选择属性,再选择“Project

Netty学习篇--整合springboot

经过前面的netty学习,大概了解了netty各个组件的概念和作用,开始自己瞎鼓捣netty和我们常用的项目的整合(很简单的整合) 项目准备 工具:IDEA2017 jar包导入:maven 项目框架:springboot+netty 项目操作 右键创建一个maven项目,项目名称: hetangyuese-netty-03(项目已上传github) 项目完整结构 ? maven导包 <!-- netty start --> <dependency> <groupId>