在Redis的使用过程中,大多数人都是使用现成的客户端,如Jedis,Redisson,Lettuce。因此本文研究用BIO的方式手写Redis客户端尝试,对遇到的问题进行探究及总结。
1、手写BIO调用类
使用BIO的方式进行Redis客户端的编写,首先定义一个Connection,Connection类包含了建立BIO连接的远程地址host,端口port,套接字Socket以及输入输出流。
此类一个Connection的构造方法,一个Connection的初始化方法,以及请求发送方法。
public class Connection { private String host; private int port; private Socket socket; private InputStream inputStream; private OutputStream outputStream; public Connection(String host, int port) { this.host = host; this.port = port; } public boolean isConnection() { if (socket != null && !socket.isClosed() && !socket.isBound() && socket.isConnected()) { return true; } try { socket = new Socket(host, port); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); } catch (IOException e) { e.printStackTrace(); return false; } return true; } public String sendCommand(byte[] command) { if (isConnection()) { try { outputStream.write(command); int length = 0; byte[] response = new byte[1024]; while ((length = inputStream.read(response)) > 0) { return new String(response, 0, length); } } catch (IOException e) { e.printStackTrace(); } } return null; } }
有了连接类后就可以发送BIO请求,然后就构建Connection进行请求发送测试:
public class MainTest { public static void main(String[] args) { String command = "set ant 123456"; Connection connection = new Connection("localhost", 6379); System.out.println(connection.sendCommand(command.getBytes())); } }
发现结果如下图,请求调用后没有返回,但是main方法也没有结束,通过debug可以知道是因为inputStream.read(response))这一句代码是阻塞调用,因为一直没返回结果,因此main方法阻塞,如下图:
实际上的原因是因为任何请求都是基于协议,发送了请求command = "set ant 123456"后,由于并未遵循Redis的任何访问协议,因此Redis无法识别请求并做出返回。Redis采用RESP协议,RESP协议是在Redis 1.2中引入的,但是它成为Redis 2.0中与Redis服务器通信的标准方法。
RESP实际上是支持以下数据类型的序列化协议:简单字符串,错误,整数,大容量字符串和数组。
RESP在Redis中用作请求-响应协议的方式如下:
- 客户端将命令作为大容量字符串的RESP数组发送到Redis服务器。
- 服务器根据命令实现以RESP类型之一进行回复。
在RESP中,某些数据的类型取决于第一个字节:
- 对于简单字符串,答复的第一个字节为“ +”
- 对于错误,回复的第一个字节为“-”
- 对于整数,答复的第一个字节为“:”
- 对于批量字符串,答复的第一个字节为“ $”
- 对于数组,回复的第一个字节为“
*
”
另外,RESP可以使用Bulk Strings或Array的特殊变体来表示Null值,如稍后指定。在RESP中,协议的不同部分始终以“ \ r \ n”(CRLF)终止。详情请查看https://redis.io/topics/protocol
2、手写RESP协议类
定义一个协议类Protocol,本示例未完全实现,仅实现简单的SET、GET请求以及内容的解析。
public class Protocol { public static final String DOLLER="$"; public static final String ALLERSTIC="*"; public static final String CRLF="\r\n"; // 如SET请求 set ant 7777 // *3\r\n 长度为3的数组 // $3\r\n 第一个字符串长度为3 // SET\r\n 第一个字符串为SET // $3\r\n 第二个字符串长度为3 // ant\r\n 第二个字符串为ant // $4\r\n 第三个字符串长度为4 // 7777\r\n 第三个字符串为7777 public static byte[] buildRespByte(Command command, byte[]... bytes){ StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(ALLERSTIC).append(bytes.length+1).append(CRLF); // 封装方法SET、GET stringBuilder.append(DOLLER).append(command.name().length()).append(CRLF); stringBuilder.append(command.name()).append(CRLF); // 封装参数 for(byte[] arg:bytes){ stringBuilder.append(DOLLER).append(arg.length).append(CRLF); stringBuilder.append(new String(arg) ).append(CRLF); } return stringBuilder.toString().getBytes(); } public enum Command{ SET,GET } }
然后创建一个Client封装set和get方法的调用进行调用:
public class SelfRedisClient { private Connection connection; public SelfRedisClient(String host, int ip) { connection = new Connection(host, ip); } public String set(String key, String value) { String result = connection.sendCommand( Protocol.buildRespByte(Protocol.Command.SET, key.getBytes(), value.getBytes())); return result; } public String get(String key) { String result = connection.sendCommand( Protocol.buildRespByte(Protocol.Command.GET, key.getBytes())); return result; } }
然后调用Main方法:
public class MainTest { public static void main(String[] args) { SelfRedisClient selfRedisClient = new SelfRedisClient("localhost", 6379); System.out.println(selfRedisClient.set("ant", "123456")); System.out.println(selfRedisClient.get("ant")); } }
可以看出结果正常返回,当然我们未对返回结果使用协议解析:
3、使用多线程对Redis进行请求
上面的示例是在单线程的访问情况下进行的测试,那么在多线程情况下会如何呢。接下来我们构建一个线程池,使用多线程对Redis进行请求尝试,构建一个ClientRunnable方法如下:
public class ClientRunnable implements Runnable { private SelfRedisClient selfRedisClient; private String value; public ClientRunnable(SelfRedisClient selfRedisClient, String value) { this.selfRedisClient = selfRedisClient; this.value = value; } @Override public void run() { selfRedisClient.set("ant", value); } }
main方法如下:
public class MainTest { public static void main(String[] args) { SelfRedisClient selfRedisClient = new SelfRedisClient("localhost", 6379); ExecutorService pool = Executors.newCachedThreadPool(); for(int i=0;i<20;i++){ pool.execute(new ClientRunnable(selfRedisClient,"value"+i)); } } }
并在set方法中增加输出到控制台:
public String set(String key, String value) { String result = connection.sendCommand( Protocol.buildRespByte(Protocol.Command.SET, key.getBytes(), value.getBytes())); System.out.println("Thread name: " + Thread.currentThread().getName() + "[result]: " + result.replace("\r\n", "") + " [value]: " + value); return result; }
查看结果如下:
发现不但返回结果一次出现了两个甚至多个Redis服务其返回的OK,而且main方法还未执行结束。为什么呢,因为在多线程下Socket是线程不安全的,当多个线程访问Socket的时候,同时发送了请求,然后请求的返回结果会累积,然后被一个线程完全获取的情况,其余发送了请求的线程将一直阻塞等待返回,可是已经被先来的线程截取了流,因此程序无法继续运行。
因此现在就需要一个线程池来管理Connection,每个线程使用一个单独的Connection,对于没有拿到Connection的线程就在阻塞队列等待,直到有线程完成调用,并将Connection释放回线程池,被阻塞的线程才继续进行调用。如下图:
4、实现Connection的线程池管理
首先实现一个阻塞队列用于管理特定数量的Connection,当有Connection使用时就返回Connection,用完Connection后就进行归还。
public class RedisClientPool { private LinkedBlockingQueue<SelfRedisClient> linkedBlockingQueue; public RedisClientPool(String host,int port ,int connectionCount){ this.linkedBlockingQueue = new LinkedBlockingQueue<SelfRedisClient>(connectionCount); for(int i=0;i<connectionCount;i++){ SelfRedisClient selfRedisClient = new SelfRedisClient(host,port); linkedBlockingQueue.add(selfRedisClient); } } public SelfRedisClient getClient(){ try{ return linkedBlockingQueue.take(); }catch (InterruptedException e){ e.printStackTrace(); } return null; } public void returnClient(SelfRedisClient selfRedisClient) { if(selfRedisClient != null){ linkedBlockingQueue.add(selfRedisClient); } } }
修改ClientRunnable方法,改为从线程池获取Connection进行请求调用:
public class ClientRunnable implements Runnable { private RedisClientPool redisClientPool; private String value; public ClientRunnable(RedisClientPool redisClientPool, String value) { this.redisClientPool = redisClientPool; this.value = value; } @Override public void run() { // 执行前先去管理Connection的阻塞队列中获取封装了Connection的SelfRedisClient SelfRedisClient selfRedisClient = redisClientPool.getClient(); selfRedisClient.set("ant", value); // 使用完后进行归还client redisClientPool.returnClient(selfRedisClient); } }
使用Main方法进行请求调用:
public class MainTest { public static void main(String[] args) { RedisClientPool redisClientPool = new RedisClientPool("localhost",6379,5); ExecutorService executorService = Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ executorService.execute(new ClientRunnable(redisClientPool,"value"+i)); } } }
查看执行结果:
可以知道成功返回了所有的请求调用,最后也是线程9成功将value值修改为value8。
因此,可以发现使用一个阻塞队列对Connection资源进行管理不仅近能节省Connection的创建和回收时间,在本例中更核心的功能是实现了线程不安全资源的管理。
原文地址:https://www.cnblogs.com/jing99/p/11854530.html