用BIO手写实现Redis客户端的探究(拒绝Jedis)

  在Redis的使用过程中,大多数人都是使用现成的客户端,如Jedis,Redisson,Lettuce。因此本文研究用BIO的方式手写Redis客户端尝试,对遇到的问题进行探究及总结。

1、手写BIO调用类

  使用BIO的方式进行Redis客户端的编写,首先定义一个Connection,Connection类包含了建立BIO连接的远程地址host,端口port,套接字Socket以及输入输出流。

  此类一个Connection的构造方法,一个Connection的初始化方法,以及请求发送方法。

public class Connection {

    private String host;
    private int port;
    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;

    public Connection(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public boolean isConnection() {

        if (socket != null && !socket.isClosed() && !socket.isBound() && socket.isConnected()) {
            return true;
        }
        try {
            socket = new Socket(host, port);
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    public String sendCommand(byte[] command) {
        if (isConnection()) {
            try {
                outputStream.write(command);
                int length = 0;
                byte[] response = new byte[1024];
                while ((length = inputStream.read(response)) > 0) {
                    return new String(response, 0, length);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
}

  有了连接类后就可以发送BIO请求,然后就构建Connection进行请求发送测试:

public class MainTest {
    public static void main(String[] args) {

        String command = "set ant 123456";
        Connection connection = new Connection("localhost", 6379);
        System.out.println(connection.sendCommand(command.getBytes()));
  }
}

  发现结果如下图,请求调用后没有返回,但是main方法也没有结束,通过debug可以知道是因为inputStream.read(response))这一句代码是阻塞调用,因为一直没返回结果,因此main方法阻塞,如下图:

          

  实际上的原因是因为任何请求都是基于协议,发送了请求command = "set ant 123456"后,由于并未遵循Redis的任何访问协议,因此Redis无法识别请求并做出返回。Redis采用RESP协议,RESP协议是在Redis 1.2中引入的,但是它成为Redis 2.0中与Redis服务器通信的标准方法。

  RESP实际上是支持以下数据类型的序列化协议:简单字符串,错误,整数,大容量字符串和数组。

  RESP在Redis中用作请求-响应协议的方式如下:

  • 客户端将命令作为大容量字符串的RESP数组发送到Redis服务器。
  • 服务器根据命令实现以RESP类型之一进行回复。

  在RESP中,某些数据的类型取决于第一个字节:

  • 对于简单字符串,答复的第一个字节为“ +”
  • 对于错误,回复的第一个字节为“-”
  • 对于整数,答复的第一个字节为“:”
  • 对于批量字符串,答复的第一个字节为“ $”
  • 对于数组,回复的第一个字节为“ *

  另外,RESP可以使用Bulk Strings或Array的特殊变体来表示Null值,如稍后指定。在RESP中,协议的不同部分始终以“ \ r \ n”(CRLF)终止。详情请查看https://redis.io/topics/protocol

2、手写RESP协议类

  定义一个协议类Protocol,本示例未完全实现,仅实现简单的SET、GET请求以及内容的解析。

public class Protocol {

    public static final String DOLLER="$";
    public static final String ALLERSTIC="*";
    public static final String CRLF="\r\n";
  
  // 如SET请求 set ant 7777
  // *3\r\n        长度为3的数组
  // $3\r\n        第一个字符串长度为3
  // SET\r\n       第一个字符串为SET
  // $3\r\n        第二个字符串长度为3
  // ant\r\n       第二个字符串为ant
  // $4\r\n        第三个字符串长度为4
  // 7777\r\n      第三个字符串为7777
    public static byte[] buildRespByte(Command command, byte[]... bytes){
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(ALLERSTIC).append(bytes.length+1).append(CRLF);

     // 封装方法SET、GET             stringBuilder.append(DOLLER).append(command.name().length()).append(CRLF);
        stringBuilder.append(command.name()).append(CRLF);
        // 封装参数
        for(byte[] arg:bytes){
            stringBuilder.append(DOLLER).append(arg.length).append(CRLF);
            stringBuilder.append(new String(arg) ).append(CRLF);
        }
        return stringBuilder.toString().getBytes();
    }

    public enum Command{
        SET,GET
    }
}    

  然后创建一个Client封装set和get方法的调用进行调用:

public class SelfRedisClient {

    private Connection connection;

    public SelfRedisClient(String host, int ip) {
        connection = new Connection(host, ip);
    }

    public String set(String key, String value) {
        String result = connection.sendCommand(
                Protocol.buildRespByte(Protocol.Command.SET, key.getBytes(), value.getBytes()));
        return result;
    }

    public String get(String key) {
        String result = connection.sendCommand(
                Protocol.buildRespByte(Protocol.Command.GET, key.getBytes()));
        return result;
    }
}

  然后调用Main方法:

public class MainTest {
    public static void main(String[] args) {
        SelfRedisClient selfRedisClient = new SelfRedisClient("localhost", 6379);
        System.out.println(selfRedisClient.set("ant", "123456"));
        System.out.println(selfRedisClient.get("ant"));
    }
}

  可以看出结果正常返回,当然我们未对返回结果使用协议解析:

      

3、使用多线程对Redis进行请求

  上面的示例是在单线程的访问情况下进行的测试,那么在多线程情况下会如何呢。接下来我们构建一个线程池,使用多线程对Redis进行请求尝试,构建一个ClientRunnable方法如下:

public class ClientRunnable implements Runnable {

    private SelfRedisClient selfRedisClient;
    private String value;

    public ClientRunnable(SelfRedisClient selfRedisClient, String value) {
        this.selfRedisClient = selfRedisClient;
        this.value = value;
    }
    @Override
    public void run() {
        selfRedisClient.set("ant", value);
    }
}

  main方法如下:

public class MainTest {
    public static void main(String[] args) {
        SelfRedisClient selfRedisClient = new SelfRedisClient("localhost", 6379);
        ExecutorService pool = Executors.newCachedThreadPool();
        for(int i=0;i<20;i++){
            pool.execute(new ClientRunnable(selfRedisClient,"value"+i));
        }
    }
}

  并在set方法中增加输出到控制台:

public String set(String key, String value) {
    String result = connection.sendCommand(
            Protocol.buildRespByte(Protocol.Command.SET, key.getBytes(), value.getBytes()));
    System.out.println("Thread name: " + Thread.currentThread().getName() + "[result]: "
            + result.replace("\r\n", "") + " [value]: " + value);
    return result;
}

  查看结果如下:

        

  发现不但返回结果一次出现了两个甚至多个Redis服务其返回的OK,而且main方法还未执行结束。为什么呢,因为在多线程下Socket是线程不安全的,当多个线程访问Socket的时候,同时发送了请求,然后请求的返回结果会累积,然后被一个线程完全获取的情况,其余发送了请求的线程将一直阻塞等待返回,可是已经被先来的线程截取了流,因此程序无法继续运行。

            

  因此现在就需要一个线程池来管理Connection,每个线程使用一个单独的Connection,对于没有拿到Connection的线程就在阻塞队列等待,直到有线程完成调用,并将Connection释放回线程池,被阻塞的线程才继续进行调用。如下图:

          

4、实现Connection的线程池管理

  首先实现一个阻塞队列用于管理特定数量的Connection,当有Connection使用时就返回Connection,用完Connection后就进行归还。

public class RedisClientPool {

    private LinkedBlockingQueue<SelfRedisClient> linkedBlockingQueue;

    public RedisClientPool(String host,int port ,int connectionCount){
        this.linkedBlockingQueue = new LinkedBlockingQueue<SelfRedisClient>(connectionCount);
        for(int i=0;i<connectionCount;i++){
            SelfRedisClient selfRedisClient = new SelfRedisClient(host,port);
            linkedBlockingQueue.add(selfRedisClient);
        }
    }

    public SelfRedisClient getClient(){
        try{
            return linkedBlockingQueue.take();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return null;
    }

    public void returnClient(SelfRedisClient selfRedisClient) {
        if(selfRedisClient != null){
            linkedBlockingQueue.add(selfRedisClient);
        }
    }
}

  修改ClientRunnable方法,改为从线程池获取Connection进行请求调用:

public class ClientRunnable implements Runnable {

    private RedisClientPool redisClientPool;
    private String value;

    public ClientRunnable(RedisClientPool redisClientPool, String value) {
        this.redisClientPool = redisClientPool;
        this.value = value;
    }

    @Override
    public void run() {
        // 执行前先去管理Connection的阻塞队列中获取封装了Connection的SelfRedisClient
        SelfRedisClient selfRedisClient = redisClientPool.getClient();
        selfRedisClient.set("ant", value);
        // 使用完后进行归还client
        redisClientPool.returnClient(selfRedisClient);
    }
}

  使用Main方法进行请求调用:

public class MainTest {
    public static void main(String[] args) {
        RedisClientPool redisClientPool = new RedisClientPool("localhost",6379,5);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            executorService.execute(new ClientRunnable(redisClientPool,"value"+i));
        }
    }
}

  查看执行结果:    

          

  可以知道成功返回了所有的请求调用,最后也是线程9成功将value值修改为value8。

  因此,可以发现使用一个阻塞队列对Connection资源进行管理不仅近能节省Connection的创建和回收时间,在本例中更核心的功能是实现了线程不安全资源的管理。  

原文地址:https://www.cnblogs.com/jing99/p/11854530.html

时间: 2024-08-28 19:35:48

用BIO手写实现Redis客户端的探究(拒绝Jedis)的相关文章

Redis客户端开发包:Jedis学习-高级应用

事务 Jedis中事务的写法是将redis操作写在事物代码块中,如下所示,multi与exec之间为具体的事务. jedis.watch (key1, key2, ...); Transaction t = jedis.multi(); t.set("foo", "bar"); t.exec(); 另外,在事务内部,是不能通过Jedis对象去获取值的,不过可以通过Transaction对象去获取,如下写法: package cn.edu.hdu.jedisdemo;

Redis客户端之Spring整合Jedis

1.下载相关jar包,并引入工程: jedis-2.4.2.jar commons-pool2-2.0.jar 2.将以下XML配置引入spring <bean id="shardedJedisPool" class="redis.clients.jedis.ShardedJedisPool"> <constructor-arg index="0" ref="jedisPoolConfig"/> <

spring整合redis客户端及缓存接口设计(转)

一.写在前面 缓存作为系统性能优化的一大杀手锏,几乎在每个系统或多或少的用到缓存.有的使用本地内存作为缓存,有的使用本地硬盘作为缓存,有的使用缓存服务器.但是无论使用哪种缓存,接口中的方法都是差不多.笔者最近的项目使用的是memcached作为缓存服务器,由于memcached的一些限制,现在想换redis作为缓存服务器.思路就是把memached的客户端换成redis客户端,接口依然是原来的接口,这样对系统可以无损替换,接口不变,功能不变,只是客户端变了.本文不介绍缓存的用法,不介绍redis

spring整合redis客户端及缓存接口设计

一.写在前面 缓存作为系统性能优化的一大杀手锏,几乎在每个系统或多或少的用到缓存.有的使用本地内存作为缓存,有的使用本地硬盘作为缓存,有的使用缓存服务器.但是无论使用哪种缓存,接口中的方法都是差不多.笔者最近的项目使用的是memcached作为缓存服务器,由于memcached的一些限制,现在想换redis作为缓存服务器.思路就是把memached的客户端换成redis客户端,接口依然是原来的接口,这样对系统可以无损替换,接口不变,功能不变,只是客户端变了.本文不介绍缓存的用法,不介绍redis

手写一个模块化的 TCP 服务端客户端

前面的博客 基于 socket 手写一个 TCP 服务端及客户端 写过一个简单的 TCP 服务端客户端,没有对代码结构进行任何设计,仅仅是实现了相关功能,用于加深对 socket 编程的认识. 这次我们对整个代码结构进行一下优化,使其模块化,易扩展,成为一个简单意义上的“框架”. 对于 Socket 编程这类所需知识偏底层的情况(OS 协议栈的运作机制,TCP 协议的理解,多线程的理解,BIO/NIO 的理解,阻塞函数的运作原理甚至是更底层处理器的中断.网卡等外设与内核的交互.核心态与内核态的切

redis的过期策略都有哪些?内存淘汰机制都有哪些?手写一下LRU代码实现?

redis的过期策略都有哪些? 设置过期时间: set key 的时候,使用expire time,就是过期时间.指定这个key比如说只能存活一个小时?10分钟?指定缓存到期就会失效. redis的过期策略的话 定期删除加惰性删除 定期删除:redis默认是每隔100ms就会随机抽取一些设置了过期时间的key,检查其是否过期,如果过期就删除, 假设redis里放了10万个key,都设置了过期时间,你每隔几百毫秒,就检查10万个key,那redis基本上就死了,cpu负载会很高的,消耗在你的检查过

4.redis 的过期策略都有哪些?内存淘汰机制都有哪些?手写一下 LRU 代码实现?

作者:中华石杉 面试题 redis 的过期策略都有哪些?内存淘汰机制都有哪些?手写一下 LRU 代码实现? 面试官心理分析 如果你连这个问题都不知道,上来就懵了,回答不出来,那线上你写代码的时候,想当然的认为写进 redis 的数据就一定会存在,后面导致系统各种 bug,谁来负责? 常见的有两个问题: 往 redis 写入的数据怎么没了? 可能有同学会遇到,在生产环境的 redis 经常会丢掉一些数据,写进去了,过一会儿可能就没了.我的天,同学,你问这个问题就说明 redis 你就没用对啊.re

手写Tomcat

学习JavaWeb之后,只知道如何部署项目到Tomcat中,而并不了解其内部如何运行,底层原理为何,因此写下此篇博客初步探究一下.学习之前需要知识铺垫已列出:Tomcat目录结构.HTTP协议.IO.网络编程(未完善) 1. Tomcat(正版) 笔者称自己手写的Tomcat为盗版,反之则为正版.在手写简易版Tomcat之前,我们来看看如何使用正版的Tomcat 1.1 创建JavaWeb工程 这里以Myeclipse为例 1.2 新建Servlet 新建MyServlet类继承HttpServ

简单的神经网络算法-手写数字识别

本文通过BP神经网络实现一个简单的手写识别系统. 一.基础知识 1环境 python2.7 需要numpy等库 可利用sudo apt-get install python-安装 2神经网络原理 http://www.hankcs.com/ml/back-propagation-neural-network.html 讲的特别清楚,本实验过程中涉及矩阵运算都用numpy库的函数 3.js的基础知识 http://www.w3school.com.cn/tags/html_ref_canvas.a