spark源码阅读之network(3)

TransportContext用来创建TransportServer和TransportclientFactory,同时使用TransportChannelHandler用来配置channel的pipelines,TransportClient提供了两种传输协议,一个是数据层(fetch chunk),一个是控制层(rpc)。rpc的处理需要用户提供一个RpcHandler来处理,它负责建立一个用于传输的流, 使用zero-copy以块的形式进行数据传输。TransportServer和TransportClientFactory为每个channel都创建了一个TransportChannelHandler,每个TransportChannelHandler都包含一个TransportClient,这样服务端可以使用该client向客户端发送消息。

该类有两个主要方法一个是创建TransportChannelHandler一个是给channel配置处理器。

  1. private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
  2. TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
  3. TransportClient client = new TransportClient(channel, responseHandler);
  4. TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
  5. rpcHandler);
  6. return new TransportChannelHandler(client, responseHandler, requestHandler,
  7. conf.connectionTimeoutMs(), closeIdleConnections);
  8. }

这个可以看到TransportResponseHandler需要一个Channel,TransportClient需要channel和TransportResponseHandler,TransportRequestHandler需要channel, TransportClient和RpcHandler. TransportChannelHandler需要client,requestHandler,responseHandler. 这里发送channel,client被使用了多次。transportclient的channel可以从responseHandler中获取。这里挺乱的。

  1. public TransportChannelHandler initializePipeline(
  2. SocketChannel channel,
  3. RpcHandler channelRpcHandler) {
  4. try {
  5. TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
  6. channel.pipeline()
  7. .addLast("encoder", encoder)
  8. .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
  9. .addLast("decoder", decoder)
  10. .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
  11. // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
  12. // would require more logic to guarantee if this were not part of the same event loop.
  13. .addLast("handler", channelHandler);
  14. return channelHandler;
  15. } catch (RuntimeException e) {
  16. logger.error("Error while initializing Netty pipeline", e);
  17. throw e;
  18. }
  19. }

用来给channel配置channelHandler.第一个是处理出通道的处理器,后面是处理进通道的处理器。

下面看看TransportServer。构建一个服务端。

  1. private void init(String hostToBind, int portToBind) {
  2. IOMode ioMode = IOMode.valueOf(conf.ioMode());
  3. EventLoopGroup bossGroup =
  4. NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
  5. EventLoopGroup workerGroup = bossGroup;
  6. PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
  7. conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
  8. bootstrap = new ServerBootstrap()
  9. .group(bossGroup, workerGroup)
  10. .channel(NettyUtils.getServerChannelClass(ioMode))
  11. .option(ChannelOption.ALLOCATOR, allocator)
  12. .childOption(ChannelOption.ALLOCATOR, allocator);
  13. if (conf.backLog() > 0) {
  14. bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
  15. }
  16. if (conf.receiveBuf() > 0) {
  17. bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
  18. }
  19. if (conf.sendBuf() > 0) {
  20. bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
  21. }
  22. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. RpcHandler rpcHandler = appRpcHandler;
  26. for (TransportServerBootstrap bootstrap : bootstraps) {
  27. rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
  28. }
  29. context.initializePipeline(ch, rpcHandler);
  30. }
  31. });
  32. InetSocketAddress address = hostToBind == null ?
  33. new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
  34. channelFuture = bootstrap.bind(address);
  35. channelFuture.syncUninterruptibly();
  36. port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
  37. logger.debug("Shuffle server started on port :" + port);
  38. }

这块是netty中构建一个服务器的流程。配置的缓存生成器是内存池分配器。IO使用的是NIO(EPOLL不兼容windows),相关的配置参数看TransportConf

整个spark的network部分的common模块看完了。其余部分有时间在研究。

来自为知笔记(Wiz)

时间: 2024-10-05 12:32:46

spark源码阅读之network(3)的相关文章

spark源码阅读之network(2)

在上节的解读中发现spark的源码中大量使用netty的buffer部分的api,该节将看到netty核心的一些api,比如channel: 在Netty里,Channel是通讯的载体(网络套接字或组件的连接),而ChannelHandler负责Channel中的逻辑处理,channel支持读,写,绑定本地端口,连接远程等,Channel中所有的操作都是异步的,当发生io操作的时候将会返回一个ChannelFutrue的接口,在ChannelFutrue里面可以处理操作成功.失败.取消后的动作.

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp

【Spark】配置Spark源码阅读环境

Scala构建工具(SBT)的使用 SBT介绍 SBT是Simple Build Tool的简称,如果读者使用过Maven,那么可以简单将SBT看做是Scala世界的Maven,虽然二者各有优劣,但完成的工作基本是类似的. 虽然Maven同样可以管理Scala项目的依赖并进行构建,但SBT的某些特性却让人如此着迷,比如: 使用Scala作为DSL来定义build文件(one language rules them all); 通过触发执行(trigger execution)特性支持持续的编译与

Spark源码阅读笔记之Broadcast(一)

Spark源码阅读笔记之Broadcast(一) Spark会序列化在各个任务上使用到的变量,然后传递到Executor中,由于Executor中得到的只是变量的拷贝,因此对变量的改变只在该Executor有效.序列化后的任务的大小是有限制的(由spark.akka.frameSize决定,值为其减去200K,默认为10M-200K),spark会进行检查,超出该限制的任务会被抛弃.因此,对于需要共享比较大的数据时,需要使用Broadcast. Spark实现了两种传输Broadcast的机制:

第3课 Scala函数式编程彻底精通及Spark源码阅读笔记

本课内容: 1:scala中函数式编程彻底详解 2:Spark源码中的scala函数式编程 3:案例和作业 函数式编程开始: def fun1(name: String){ println(name) } //将函数名赋值给一个变量,那么这个变量就是一个函数了. val fun1_v = fun1_ 访问 fun1_v("Scala") 结果:Scala 匿名函数:参数名称用 => 指向函数体 val fun2=(content: String) => println(co

Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括: 类:org.apache.spark.deploy.master.Master 说明:负责整个集群的资源调度及Application的管理. 消息类型: 接收Worker发送的消息 1. RegisterWorker 2. ExecutorStateChanged 3. WorkerSchedulerStateResponse 4. Heartbeat 向Worker发送的消息 1. Registered

Spark源码阅读(1): Stage划分

Spark中job由action动作生成,那么stage是如何划分的呢?一般的解答是根据宽窄依赖划分.那么我们深入源码看看吧 一个action 例如count,会在多次runJob中传递,最终会到一个函数 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)dagScheduler是DAGScheduler的一个实例,因此,后面的工作都交给DAGSchedul

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Spark修炼之道(高级篇)——Spark源码阅读:第二节 SparkContext的创建

博文推荐:http://blog.csdn.net/anzhsoft/article/details/39268963,由大神张安站写的Spark架构原理,使用Spark版本为1.2,本文以Spark 1.5.0为蓝本,介绍Spark应用程序的执行流程. 本文及后面的源码分析都以下列代码为样板 import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String])