redis通过pipeline提升吞吐量

案例目标

简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。

案例背景

应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元;
然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现:
一次数据推送会对 redis 产生近30次读写操作!

在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务;
而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标;

优化过程 主要针对业务代码做的优化,其中redis 操作经过大量合并,最终降低到原来的1/5,而系统吞吐量也提升明显。
其中,redis pipeline(管道机制) 的应用是一个关键手段。

pipeline的解释

Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。
管道技术使用广泛,例如许多POP3协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。
Redis很早就支持管道(pipeline)技术。(因此无论你运行的是什么版本,你都可以使用管道(pipelining)操作Redis)

普通请求模型

[图-pipeline1]

Pipeline请求模型

[图-pipeline2]

从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次IO操作等待;
而Pipeline 化之后所有的请求合并为一次IO,除了时延可以降低之外,还能大幅度提升系统吞吐量。

代码实例

说明
本地开启50个线程,每个线程完成1000个key的写入,对比pipeline开启及不开启两种场景下的性能表现。

相关常量

   // 并发任务
    private static final int taskCount = 50;
    // pipeline大小
    private static final int batchSize = 10;
    // 每个任务处理命令数
    private static final int cmdCount = 1000;

    private static final boolean usePipeline = true;

初始化连接

        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxActive(200);
        poolConfig.setMaxIdle(100);
        poolConfig.setMaxWait(2000);
        poolConfig.setTestOnBorrow(false);
        poolConfig.setTestOnReturn(false);

        jedisPool = new JedisPool(poolConfig, host, port);

并发启动任务,统计执行时间

public static void main(String[] args) throws InterruptedException {
        init();

        flushDB();

        long t1 = System.currentTimeMillis();
        ExecutorService executor = Executors.newCachedThreadPool();

        CountDownLatch latch = new CountDownLatch(taskCount);
        for (int i = 0; i < taskCount; i++) {
            executor.submit(new DemoTask(i, latch));
        }

        latch.await();
        executor.shutdownNow();

        long t2 = System.currentTimeMillis();

        System.out.println("execution finish time(s):" + (t2 - t1) / 1000.0);

    }

DemoTask 封装了执行key写入的细节,区分不同场景

    public void run() {
            logger.info("Task[{}] start.", id);
            try {
                if (usePipeline) {
                    runWithPipeline();
                } else {
                    runWithNonPipeline();
                }
            } finally {
                latch.countDown();
            }

            logger.info("Task[{}] end.", id);
     }

不使用Pipeline的场景比较简单,循环执行set操作

            for (int i = 0; i < cmdCount; i++) {
                Jedis jedis = get();
                try {
                    jedis.set(key(i), UUID.randomUUID().toString());
                } finally {
                    if (jedis != null) {
                        jedisPool.returnResource(jedis);
                    }
                }
                if (i % batchSize == 0) {
                    logger.info("Task[{}] process -- {}", id, i);
                }
            }

使用Pipeline,需要处理分段,如10个作为一批命令执行

         for (int i = 0; i < cmdCount;) {
                Jedis jedis = get();

                try {
                    Pipeline pipeline = jedis.pipelined();
                    int j;
                    for (j = 0; j < batchSize; j++) {
                        if (i + j < cmdCount) {
                            pipeline.set(key(i + j), UUID.randomUUID().toString());
                        } else {
                            break;
                        }
                    }
                    pipeline.sync();
                    logger.info("Task[{}] pipeline -- {}", id, i + j);

                    i += j;

                } finally {
                    if (jedis != null) {
                        jedisPool.returnResource(jedis);
                    }
                }

            }

运行结果

不使用Pipeline,整体执行26s;而使用Pipeline优化后的代码,执行时间仅需要3s!

NoPipeline-stat

[图-nopipeline]

Pipeline-stat

[图-pipeline]

注意事项

  • pipeline机制可以优化吞吐量,但无法提供原子性/事务保障,而这个可以通过Redis-Multi等命令实现。
    参考这里
  • 部分读写操作存在相关依赖,无法使用pipeline实现,可利用Script机制,但需要在可维护性方面做好取舍。

扩展阅读

官方文档-Redis-Pipelining
官方文档-Redis-Transaction

原文地址:https://www.cnblogs.com/littleatp/p/8419796.html

时间: 2024-10-08 13:54:14

redis通过pipeline提升吞吐量的相关文章

redis 使用管道提升写入的性能[pipeline]

看了手册的都知道multi这个命令的作用就好比是mysql的事务的功能,但是大家都知道事务吗,就是在操作的过程中,把整个操作当作一个原子来处理,避免由于中途出错而导致最后产生的数据不一致,而产生BUG ,Mysql的事务功能可以做到这点,但是在redis中的multi,手册中说是把多个命令当作个事务来处理,但是在真正的测试之后发现并没有所说的事务的功能,个人经过测试发现,只有把他watch命令结合起来用,方可显现出其具有事务的功能,所以这点很是迷惑,关键是有一点很诧异,当启用了multi命令之后

redis之pipeline使用

redis之pipeline 我们要完成一个业务,可能会对redis做连续的多个操作,这有很多个步骤是需要依次连续执行的.这样的场景,网络传输的耗时将是限制redis处理量的主要瓶颈. 那么此时就可以引入pipeline了,pipeline管道就是解决执行大量命令时.会产生大量同学次数而导致延迟的技术. 其实原理很简单,pipeline就是把所有的命令一次发过去,避免频繁的发送.接收带来的网络开销,redis在打包接收到一堆命令后,依次执行,然后把结果再打包返回给客户端. 1 public St

Redis利用Pipeline加速查询速度的方法

1. RTT Redis 是一种基于客户端-服务端模型以及请求/响应协议的TCP服务.这意味着通常情况下 Redis 客户端执行一条命令分为如下四个过程: 发送命令 命令排队 命令执行 返回结果 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应.服务端处理命令,并将结果返回给客户端.客户端和服务端通过网络进行连接.这个连接可以很快,也可能很慢.无论网络如何延迟,数据包总是能从客户端到达服务端,服务端返回数据给客户端. 这个时间被称为 RTT (Round

redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

前段时间在做用户画像的时候,遇到了这样的一个问题,记录某一个商品的用户购买群,刚好这种需求就可以用到Redis中的Set,key作为productID,value 就是具体的customerid集合,后续的话,我就可以通过productid来查看该customerid是否买了此商品,如果购买了,就可以有相关的关联推荐,当然这只是系统中 的一个小业务条件,这时候我就可以用到SADD操作方法,代码如下: static void Main(string[] args) { ConnectionMult

redis使用管道pipeline提升批量操作性能(php演示)

Redis是一个TCP服务器,支持请求/响应协议. 在Redis中,请求通过以下步骤完成: 客户端向服务器发送查询,并从套接字读取,通常以阻塞的方式,用于服务器响应. 服务器处理命令并将响应发送回客户端. 如果需要一次执行多个redis命令,以往的方式需要发送多次命令请求,有redis服务器依次执行,并返回结果, 为了解决此类问题,设计者设计出了redis管道命令: 客户端可以向服务器发送多个请求,而不必等待回复,并最终在一个步骤中读取回复,从而大大增加了协议性能 做了测试,使用pipeline

Redis附加功能之Redis流水线pipeline

流水线功能的目的:通过减少客户端与服务器之间的通信次数来提高程序的执行效率. 一.通信 在一般情况下, 用户每执行一个 Redis 命令,客户端与服务器都需要进行一次通信:客户端会将命令请求发送给服务器,而服务器则会将执行命令所得的结果返回给客户端. 当程序执行一些复杂的操作时, 客户端可能需要执行多个命令, 并与服务器进行多次通信. 假设我们正在构建一个为图书打标签(tag)的网站,这个网站上的每本图书都可以被打上任意多个标签.并且为了记录哪些标签的图书是最多人阅览的,我们会为每个标签创建一个

Redis大幅性能提升之Batch批量读写

提示:本文针对的是StackExchange.Redis 一.问题呈现 前段时间在开发的时候,遇到了redis批量读的问题,由于在StackExchange.Redis里面我确实没有找到PipeLine命令,找到的是Batch命令,因此对其用法进行了探究一下. 下面的代码是我之前写的: 1 public List<StudentEntity> Get(List<int> ids) 2 { 3 List<StudentEntity> result = new List&l

php读取文件使用redis的pipeline(管道)导入大批量数据

需求:需要做一个后台上传TXT文件,读取其中的内容,然后导入redis库中.要求速度快,并且支持至少10W以上的数据,而内容也就一个字段存类似openid和QQ 传统做法:我一开始做的时候就老套路,遍历.hset,然后就发现非常的慢,一千条数据就花了30-32秒,当时就觉得不行,于是就请教了一个大佬,然后就得知了方法 我生成了20W的数据用来做测试,文件大小6M多. 话不多说,直接贴代码了 $lines  = file_get_contents($_FILES['file']['tmp_name

U3D:如何使用collision pipeline提升游戏开发优势

如何将事件的执行顺序应用于您的优势 当我在移动设备上工作的时候,我遇到了一个二维的基于网格的动作难题,我遇到了一个复杂的问题,但是使用Unity3D的collision pipeline,我可以用一种可靠的方式来解决它. 好的,想象一下你有一个2D的基于网格的游戏,玩家可以在直线上运行,然后撞到墙上,你需要让玩家在游戏中运行一些基于这样原理的关卡. 我们使用一个带有一个BoxCollider2D的精灵来制作一个"块",并附加一个"块"脚本,当玩家碰到它时,它会将这个