宜人贷蜂巢API网关技术解密之Netty使用实践

一、背景

宜人贷蜂巢团队,由Michael创立于2013年,通过使用互联网科技手段助力金融生态和谐健康发展。自成立起一直致力于多维度数据闭环平台建设。目前团队规模超过百人,涵盖征信、电商、金融、社交、五险一金和保险等用户授信数据的抓取解析业务,辅以先进的数据分析、挖掘和机器学习等技术对用户信用级别、欺诈风险进行预测评定,全面对外输出金融反欺诈、社交图谱、自动化模型定制等服务或产品。

目前宜人贷蜂巢基于用户授权数据实时抓取解析技术,并结合顶尖大数据技术,快速迭代和自主的创新,已形成了强大而领先的聚合和输出能力。

为了适应完成宜人贷蜂巢强大的服务输出能力,蜂巢设计开发了自己的API网关系统,集中实现了鉴权、加解密、路由、限流等功能,使各业务抓取团队关注其核心抓取和分析工作,而API网关系统更专注于安全、流量、路由等问题,从而更好的保障蜂巢服务系统的质量。今天带着大家解密API网关的Netty线程池技术实践细节。

API网关作为宜人贷蜂巢数据开放平台的统一入口,所有的客户端及消费端通过统一的API来使用各类抓取服务。从面向对象设计的角度看,它与外观模式类似,包装各类不同的实现细节,对外表现出统一的调用形式。

本文首先简要地介绍API网关的项目框架,其次对比BIO和NIO的特点,再引入Netty作为项目的基础框架,然后介绍Netty线程池的原理,最后深入Netty线程池的初始化、ServerBootstrap的初始化与启动及channel与线程池的绑定过程,让读者了解Netty在承载高并发访问的设计路思。

二、项目框架

图1 - API网关项目框架

图中描绘了API网关系统的处理流程,以及与服务注册发现、日志分析、报警系统、各类爬虫的关系。其中API网关系统接收请求,对请求进行编解码、鉴权、限流、加解密,再基于Eureka服务注册发现模块,将请求发送到有效的服务节点上;网关及抓取系统的日志,会被收集到elk平台中,做业务分析及报警处理。

三、BIO vs NIO

API网关承载数倍于爬虫的流量,提升服务器的并发处理能力、缩短系统的响应时间,通信模型的选择是至关重要的,是选择BIO,还是NIO?

Streamvs Buffer & 阻塞 vs 非阻塞

BIO是面向流的,io的读写,每次只能处理一个或者多个bytes,如果数据没有读写完成,线程将一直等待于此,而不能暂时跳过io或者等待io读写完成异步通知,线程滞留在io读写上,不能充分利用机器有限的线程资源,造成server的吞吐量较低,见图2。而NIO与此不同,面向Buffer,线程不需要滞留在io读写上,采用操作系统的epoll模式,在io数据准备好了,才由线程来处理,见图3。

图2 – BIO 从流中读取数据

图3 – NIO 从Buffer中读取数据

Selectors

NIO的selector使一个线程可以监控多个channel的读写,多个channel注册到一个selector上,这个selector可以监测到各个channel的数据准备情况,从而使用有限的线程资源处理更多的连接,见图4。所以可以这样说,NIO极大的提升了服务器接受并发请求的能力,而服务器性能还是要取决于业务处理时间和业务线程池模型。

图4 – NIO 单一线程管理多个连接

而BIO采用的是request-per-thread模式,用一个线程负责接收TCP连接请求,并建立链路,然后将请求dispatch给负责业务逻辑处理的线程,见图5。一旦访问量过多,就会造成机器的线程资源紧张,造成请求延迟,甚至服务宕机。

图5 – BIO 一连接一线程

对比JDK NIO与诸多NIO框架后,鉴于Netty优雅的设计、易用的API、优越的性能、安全性支持、API网关使用Netty作为通信模型,实现了基础框架的搭建。

四、Netty线程池

考虑到API网关的高并发访问需求,线程池设计,见图6。

图6 – API网关线程池设计

Netty的线程池理念有点像ForkJoinPool,不是一个线程大池子并发等待一条任务队列,而是每条线程都有一个任务队列。而且Netty的线程,并不只是简单的阻塞地拉取任务,而是在每个循环中做三件事情:

  • 先SelectKeys()处理NIO的事件
  • 然后获取本线程的定时任务,放到本线程的任务队列里
  • 最后执行其他线程提交给本线程的任务

每个循环里处理NIO事件与其他任务的时间消耗比例,还能通过ioRatio变量来控制,默认是各占50%。可见,Netty的线程根本没有阻塞等待任务的清闲日子,所以也不使用有锁的BlockingQueue来做任务队列了,而是使用无锁的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的缩写)

五、NioEventLoopGroup初始化

下面分析下Netty线程池NioEventLoopGroup的设计与实现细节,NioEventLoopGroup的类层次关系见图7

图7 –NioEvenrLoopGroup类层次关系

其创建过程——方法调用,见下图

图8 –NioEvenrLoopGroup创建调用关系

NioEvenrLoopGroup的创建,具体执行过程是执行类MultithreadEventExecutorGroup的构造方法

/**

 * Create a new instance.

 *

 * @param nThreads          the number of threads that will be used by this instance.

 * @param executor          the Executor to use, or {@code null} if the default should be used.

 * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.

 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call

 */

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

                                        EventExecutorChooserFactory chooserFactory, Object... args) {

    if (nThreads <= 0) {

        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));

    }

    if (executor == null) {

        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {

        boolean success = false;

        try {

            children[i] = newChild(executor, args);

            success = true;

        } catch (Exception e) { 

            throw new IllegalStateException("failed to create a child event loop", e);

        } finally {

            if (!success) {

                for (int j = 0; j < i; j ++) {

                    children[j].shutdownGracefully();

                }

                for (int j = 0; j < i; j ++) {

                    EventExecutor e = children[j];

                    try {

                        while (!e.isTerminated()) {

                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

                        }

                    } catch (InterruptedException interrupted) {

                        // Let the caller handle the interruption.

                        Thread.currentThread().interrupt();

                        break;

                    }

                }

            }

        }

    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {

        @Override

        public void operationComplete(Future<Object> future) throws Exception {

            if (terminatedChildren.incrementAndGet() == children.length) {

                terminationFuture.setSuccess(null);

            }

        }

    };

    for (EventExecutor e: children) {

        e.terminationFuture().addListener(terminationListener);

    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);

    Collections.addAll(childrenSet, children);

    readonlyChildren = Collections.unmodifiableSet(childrenSet);

}

其中,创建细节如下:

  • 线程池中的线程数nThreads必须大于0;
  • 如果executor为null,创建默认executor,executor用于创建线程(newChild方法使用executor对象);
  • 依次创建线程池中的每一个线程即NioEventLoop,如果其中有一个创建失败,将关闭之前创建的所有线程;
  • chooser为线程池选择器,用来选择下一个EventExecutor,可以理解为,用来选择一个线程来执行task;

chooser的创建细节,如下:

DefaultEventExecutorChooserFactory根据线程数创建具体的EventExecutorChooser,线程数如果等于2^n,可使用按位与替代取模运算,节省cpu的计算资源,见源码:

@SuppressWarnings("unchecked")

@Override

public EventExecutorChooser newChooser(EventExecutor[] executors) {

    if (isPowerOfTwo(executors.length)) {

        return new PowerOfTowEventExecutorChooser(executors);

    } else {

        return new GenericEventExecutorChooser(executors);

    }

} 

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {

        private final AtomicInteger idx = new AtomicInteger();

        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {

            this.executors = executors;

        }

        @Override

        public EventExecutor next() {

            return executors[idx.getAndIncrement() & executors.length - 1];

        }

    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {

        private final AtomicInteger idx = new AtomicInteger();

        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {

            this.executors = executors;

        }

        @Override

        public EventExecutor next() {

            return executors[Math.abs(idx.getAndIncrement() % executors.length)];

        }

    }

newChild(executor, args)的创建细节,如下

MultithreadEventExecutorGroup的newChild方法是一个抽象方法,故使用NioEventLoopGroup的newChild方法,即调用NioEventLoop的构造函数。

    @Override

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {

        return new NioEventLoop(this, executor, (SelectorProvider) args[0],

            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);

    }

在这里先看下NioEventLoop的类层次关系

NioEventLoop的继承关系比较复杂,在AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

通常来说, NioEventLoop 肩负着两种任务, 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括调用 select 等待就绪的 IO 事件、读写数据与数据的处理等; 而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

具体的构造过程,如下

创建任务队列tailTasks(内部为有界的LinkedBlockingQueue)

创建线程的任务队列taskQueue(内部为有界的LinkedBlockingQueue),以及任务过多防止系统宕机的拒绝策略rejectedHandler

其中tailTasks和taskQueue均是任务队列,而优先级不同,taskQueue的优先级高于tailTasks,定时任务的优先级高于taskQueue。

六、ServerBootstrap初始化及启动

了解了Netty线程池NioEvenrLoopGroup的创建过程后,下面看下API网关服务ServerBootstrap的是如何使用线程池引入服务中,为高并发访问服务的。

API网关ServerBootstrap初始化及启动代码,如下:

serverBootstrap = new ServerBootstrap();

bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());

workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

        .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())

        .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())

        .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())

        // Memory pooled

        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

        .childHandler(channelInitializer);

ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();

log.info("API-gateway started on port: {}", config.getPort());

future.channel().closeFuture().sync();

API网关系统使用netty自带的线程池,共有三组线程池,分别为bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暂不作介绍)。其中,bossGroup用于接收客户端的TCP连接,workerGroup用于处理I/O、执行系统task和定时任务,executorGroup用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。

七、Channel与线程池的绑定

ServerBootstrap初始化后,通过调用bind(port)方法启动Server,bind的调用链如下:

AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister

其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化过程指定channel为NioServerSocketChannel.class,至此将NioServerSocketChannel与bossGroup绑定到一起,bossGroup负责客户端连接的建立。那么NioSocketChannel是如何与workerGroup绑定到一起的?

调用链AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {

    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {

        try {

            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

                logger.warn("Unknown channel option: " + e);

            }

        } catch (Throwable t) {

            logger.warn("Failed to set a channel option: " + child, t);

        }

    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {

        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

    }

    try {

        childGroup.register(child).addListener(new ChannelFutureListener() {

            @Override

            public void operationComplete(ChannelFuture future) throws Exception {

                if (!future.isSuccess()) {

                    forceClose(child, future.cause());

                }

            }

        });

    } catch (Throwable t) {

        forceClose(child, t);

    }

}

其中,childGroup.register(child)就是将NioSocketChannel与workderGroup绑定到一起,那又是什么触发了ServerBootstrapAcceptor的channelRead方法?

其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages方法

@Override

protected int doReadMessages(List<Object> buf) throws Exception {

    SocketChannel ch = javaChannel().accept();

    try {

        if (ch != null) {

            buf.add(new NioSocketChannel(this, ch));

            return 1;

        }

    } catch (Throwable t) {

        …

    }

    return 0;

}

javaChannel().accept() 会获取到客户端新连接的SocketChannel,实例化为一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .

接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦。

至此,分析了Netty线程池的初始化、ServerBootstrap的启动及channel与线程池的绑定过程,能够看出Netty中线程池的优雅设计,使用不同的线程池负责连接的建立、IO读写等,为API网关项目的高并发访问提供了技术基础。

八、总结

至此,对API网关技术的Netty实践分享就到这里,各位如果对中间的各个环节有什么疑问和建议,欢迎大家指正,我们一起讨论,共同学习提高。

参考

http://tutorials.jenkov.com/java-nio/nio-vs-io.html

http://netty.io/wiki/user-guide-for-4.x.html

http://netty.io/

http://www.tuicool.com/articles/mUFnqeM

https://segmentfault.com/a/1190000007403873

https://segmentfault.com/a/1190000007283053

作者:蜂巢团队

来源:宜信技术学院

原文地址:https://blog.51cto.com/14159827/2405694

时间: 2024-10-11 13:25:19

宜人贷蜂巢API网关技术解密之Netty使用实践的相关文章

浅析阿里云API网关的产品架构和常见应用场景

自上世纪60年代计算机网络发展开始,API(Application Programming Interface )随之诞生,API即应用程序接口,是实现系统间衔接的桥梁.时至今日,API市场已经形成了一个庞大的生态体系,在拥抱API经济的过程当中,API网关这一个组件起到了至关重要的作用. 什么是API网关 API 网关提供完整的 API 托管服务,辅助用户将能力.服务.数据以 API 的形式开放给合作伙伴,也可以发布到 API 市场供更多的开发者采购使用. 1.提供防攻击.防重放.请求加密.身

开源API网关,你选对了么?

开源API网关,你选对了么? api网关的本质 不用扯那么多,也不用画图,一句话说清楚 api网关:流量总入口,得以集中控制! 就这么简单 api网关协议上最基本要支持HTTP 和 WebSocket,功能强大点的更会支持tcp/udp的负载均衡接入 正因为支持的是http协议,所以api网关不仅仅可以作为 RESTful API 接入,接入带页面的web都可以的,完全可以当成一个web负载均衡器使用 api网关的作用 解决:认证.鉴权.安全.流量管控.缓存.服务路由,协议转换.服务编排.熔断.

宜人贷PaaS数据服务平台Genie:技术架构及功能

上篇:架构及组件 一.数据平台的发展 1.1 背景介绍 随着数据时代的到来,数据量和数据复杂度的增加推动了数据工程领域的快速发展.为了满足各类数据获取/计算等需求,业内涌现出了诸多解决方案.但大部分方案都遵循以下原则: 降低数据处理成本 合理提高数据使用/计算效率 提供统一的编程范式 宜人贷的数据服务平台也是遵循这三个原则.本人有幸亲身经历了宜人贷数据平台Genie的整个发展过程,纵观宜人贷和业内,可以说Genie的发展是工业界数据平台发展的缩影. Google 的三大论文和Apache Had

京东手机商品详情页技术解密

京东手机商品详情页技术解密 作者:陈保安,2011年加入京东,目前主要负责手机京东核心业务(搜索.商品.购物车.结算.收银台.我的京东)的后端研发工作.带领团队在一线奋战多年,积累了非常丰富的大促备战经验,也见证了核心系统从一分钟几千单到几十万单的质的蜕变. 京东手机单品页在每次大促时承载所有流量的入口,它被天然赋予的一个标签就是抗压,对系统的稳定性.性能方面要求极其苛刻,另外单品页本身业务复杂度较高,单品页有几十种垂直流程业务,并且展示上都要求个性化的单品页,加上依赖有50+的基础服务,稍有抖

API网关之业界漫谈

参考链接: http://www.infoq.com/cn/news/2016/07/API-background-architecture-floo APIGW除了要保证数据的交换外,还要实现对接入客户端的身份认证.防报文重放与防数据篡改.功能调用的业务鉴权.响应数据的脱敏.流量与并发控制,甚至基于API调用的计量或者计费. 一.普元大咖 1.应用场景分类 1.1.面向Web App 这类场景,在物理形态上类似前后端分离,此时的Web App已经不是全功能的Web App,而是根据场景定制.场

Net分布式系统之六:微服务之API网关

本人建立了个人技术.工作经验的分享微信号,计划后续公众号同步更新分享,比在此更多具体.欢迎有兴趣的同学一起加入相互学习.基于上篇微服务架构分享,今天分享其中一个重要的基础组件“API网关”. 一.引言 随着互联网的快速发展,当前以步入移动互联.物联网时代.用户访问系统入口也变得多种方式,由原来单一的PC客户端,变化到PC客户端.各种浏览器.手机移动端及智能终端等.同时系统之间大部分都不是单独运行,经常会涉及与其他系统对接.共享数据的需求.所以系统需要升级框架满足日新月异需求变化,支持业务发展,并

怎么用API网关构建微服务

选择将应用程序构建为微服务时,需要确定应用程序客户端如何与微服务交互.在单体应用程序中,只有一组端点.而在微服务架构中,每个微服务都会暴露一组通常是细粒度的端点.在本文中,我们将讨论一下这对客户端与应用程序之间的通信有什么影响,并提出一种使用API网关的方法. 当选择将应用程序构建为一组微服务时,需要确定应用程序客户端如何与微服务交互.在单体应用程序中,只有一组(通常是重复的.负载均衡的)端点.然而,在微服务架构中,每个微服务都会暴露一组通常是细粒度的端点.在本文中,我们将讨论一下这对客户端与应

十年京东,十年技术发展—畅读《京东技术解密》

<京东技术解密>试读章节共71页,我花了两天时间仔细读完,读了过后感到意犹未尽,非常想一口气把整本读完,然而只能将试读章节反复读了好几遍,收获颇多,遂有此文,借此总结京东十年来的技术变迁和迅速发展. 之所以对这本书感兴趣基于两个原因:一是自己最近刚好在读一本书<不战斗不成功:刘强东和京东商城的"野蛮"奋斗史>,见识到了刘强东本人丰富的创业经历,与当当网拼图书.与淘宝网拼百货.与苏宁易购拼家电,京东真是什么都卖,这份处处竞争的心也值得佩服.二是自己一直对京东印象不

API网关

微服务之API网关 一.引言 随着互联网的快速发展,当前以步入移动互联.物联网时代.用户访问系统入口也变得多种方式,由原来单一的PC客户端,变化到PC客户端.各种浏览器.手机移动端及智能终端等.同时系统之间大部分都不是单独运行,经常会涉及与其他系统对接.共享数据的需求.所以系统需要升级框架满足日新月异需求变化,支持业务发展,并将框架升级为微服务架构."API网关"核心组件是架构用于满足此些需求. 很多互联网平台已基于网关的设计思路,构建自身平台的API网关,国内主要有京东.携程.唯品会