【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的实现分析及使用

首先你需要了解JAVA KEYSTORE

该SSL用于Avro Sink到Avro Source之间的数据传输

该场景主要用于分布式Flume之间的数据传输,从分散的各个flume agent到中心汇集节点的flume agent

下面来看下如何实现的?

Avro Sink SSL

在这个传输过程中,sink其实就相当于socket的client端了

flume源码中有个类NettyAvroRpcClient,该类中还有个内部类SSLCompressionChannelFactory

其中定义了如下属性:

private final boolean enableCompression;

private final int compressionLevel;

private final boolean enableSsl;

private final boolean trustAllCerts;

private final String truststore;

private final String truststorePassword;

private final String truststoreType;

private final List excludeProtocols;

1、要使用SSL进行数据传输,首先要将ssl开关打开,true

2、truststore指定生成的keystore文件

3、truststorepassword指定密码(这里注意生成的keypass和storepass一定相同,否则报错)

KeyStore keystore = null;

            if (truststore != null) {
              if (truststorePassword == null) {
                throw new NullPointerException("truststore password is null");
              }
              InputStream truststoreStream = new FileInputStream(truststore);
              keystore = KeyStore.getInstance(truststoreType);
              keystore.load(truststoreStream, truststorePassword.toCharArray());
            }

            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
            // null keystore is OK, with SunX509 it defaults to system CA Certs
            // see http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#X509TrustManager
            tmf.init(keystore);
            managers = tmf.getTrustManagers();

该段代码就去加载了keystore文件

TrustManagerFactory是JDK原生的一个信任管理器工厂,每个新人管理器管理特定类型的由安全套接字使用的信任材料。信任材料是基于keystore或提供者特定源。

init方法通过证书授权源和相关的信任材料初始化此工厂

最后为此信任材料返回一个信任管理器

SSLContext sslContext = SSLContext.getInstance("TLS");
          sslContext.init(null, managers, null);
          SSLEngine sslEngine = sslContext.createSSLEngine();
          sslEngine.setUseClientMode(true);
          List<String> enabledProtocols = new ArrayList<String>();
          for (String protocol : sslEngine.getEnabledProtocols()) {
            if (!excludeProtocols.contains(protocol)) {
              enabledProtocols.add(protocol);
            }
          }
          sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
          logger.info("SSLEngine protocols enabled: " +
              Arrays.asList(sslEngine.getEnabledProtocols()));
          // addFirst() will make SSL handling the first stage of decoding
          // and the last stage of encoding this must be added after
          // adding compression handling above
          pipeline.addFirst("ssl", new SslHandler(sslEngine));

1、返回指定协议的SSLContext对象

TLS安全传输层协议

2、初始化此上下文,初始化参数只有信任管理器

3、初始化SSLEngine,并指定引擎在握手时使用客户端模式

最终这个安全的Socket就建立起来了

Avro Source SSL

source我们可以认为是socket的server端,打开连接后,等待客户端的连接

private static final String PORT_KEY = “port”;

private static final String BIND_KEY = “bind”;

private static final String COMPRESSION_TYPE = “compression-type”;

private static final String SSL_KEY = “ssl”;

private static final String IP_FILTER_KEY = “ipFilter”;

private static final String IP_FILTER_RULES_KEY = “ipFilterRules”;

private static final String KEYSTORE_KEY = “keystore”;

private static final String KEYSTORE_PASSWORD_KEY = “keystore-password”;

private static final String KEYSTORE_TYPE_KEY = “keystore-type”;

private static final String EXCLUDE_PROTOCOLS = “exclude-protocols”;

以上Avro Source的一些配置属性

 try {
        KeyStore ks = KeyStore.getInstance(keystoreType);
        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
      } catch (Exception ex) {
        throw new FlumeException(
            "Avro source configured with invalid keystore: " + keystore, ex);
      }

从上面代码可以看出,source端在configure方法执行的时候就会load该keystore

 if (enableSsl) {
        SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
        sslEngine.setUseClientMode(false);
        List<String> enabledProtocols = new ArrayList<String>();
        for (String protocol : sslEngine.getEnabledProtocols()) {
          if (!excludeProtocols.contains(protocol)) {
            enabledProtocols.add(protocol);
          }
        }
        sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
        logger.info("SSLEngine protocols enabled: " +
            Arrays.asList(sslEngine.getEnabledProtocols()));
        // addFirst() will make SSL handling the first stage of decoding
        // and the last stage of encoding this must be added after
        // adding compression handling above
        pipeline.addFirst("ssl", new SslHandler(sslEngine));
      }

注意这里的SSLEngine就配置了引擎在握手时使用的服务器模式

最终返回对象ChannelPipeline

以上所有内容可能理解起来比较费劲,大家不妨先来看看这篇文章

Channel与Pipeline这里写链接内容

SSL在flume中的使用

首先准备一个keystore文件

Sink配置

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.11.177

a1.sinks.k1.port=9520

a1.sinks.k1.channel=c1

a1.sinks.k1.ssl=true

a1.sinks.k1.truststore=/home/flume/keystore/chiwei.keystore

a1.sinks.k1.truststore-type=JKS

a1.sinks.k1.truststore-password=123456

Source端配置

a1.sources.r1.type = avro

a1.sources.r1.channels=c1

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=9520

a1.sources.r1.ssl=true

a1.sources.r1.keystore=/home/flume/keystore/chiwei.keystore

a1.sources.r1.keystore-password=123456

a1.sources.r1.keystore-type=JKS

a1.sources.r1.ipFilter=true

a1.sources.r1.ipFilterRules=allow:ip:192.168.11.176

望各位网友不吝指教!!

时间: 2024-08-02 20:14:16

【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的实现分析及使用的相关文章

Flume Avro Source 远程连接拒绝的解决办法

昨天做了一个Java连接虚拟机,实现Flume Avro Source 的远程连接,确报了一个这样的错,经过了一晚上,终于找到了解决的方案. 我来给大家分享一下! 报错如下: Exception in thread "main" org.apache.flume.FlumeException:NettyAvroRpcClient{ host:xxxx,port:xxxx}:RPC connection error 解决的办法是: 把配置文件中a1.sources.r1.bind必须设置

Flume内置channel,source,sink汇总

由于经常会使用到Flume的一些channel,source,sink,于是为了方便将这些channel,source,sink汇总出来,也共大家访问. Component Interface Type Alias Implementation Class *.Channel memory *.channel.MemoryChannel *.Channel jdbc *.channel.jdbc.JdbcChannel *.Channel file *.channel.file.FileChan

flume简介与监听文件目录并sink至hdfs实战

场景 1. flume是什么 1.1 背景 flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 1

flume使用之exec source收集各端数据汇总到另外一台服务器

转载:http://blog.csdn.net/liuxiao723846/article/details/78133375 一.场景一描述: 线上api接口服务通过log4j往本地磁盘上打印日志,在接口服务器上安装flume,通过exec source收集日志,然后通过avro sink发送到汇总服务器上的flume:汇总服务器上的flume通过avro source接收日志,然后通过file_roll sink写到本地磁盘. 假设:api接口服务器两台 10.153.140.250和10.1

WARN conf.FlumeConfiguration: Could not configure sink sink1 due to: No channel configured for sink: sink1 org.apache.flume.conf.ConfigurationException: No channel configured for sink: sink1

1.错误如下所示,启动flume采集文件到hdfs案例的时候,出现如下所示的错误: 大概是说No channel configured for sink,所以应该是sink哪里配置出现了错误,百度了一下,然后检查了一下自己的配置: 1 18/04/24 08:31:02 WARN conf.FlumeConfiguration: Could not configure sink sink1 due to: No channel configured for sink: sink1 2 org.a

[Flume] - flume安装

Apache Flume是一个分布式的.可靠的.高效的系统,可以将不同来源的数据收集.聚合并移动到集中的数据存储中心上.Apache Flume不仅仅只是用到日志收集中.由于数据来源是可以定制的,flume可以使用传输大量的自定义event数据,包括但不限于网站流量信息.社会媒体信息.email信息以及其它可能的数据.Flume是Apache软件基金组织的顶级项目.官网http://flume.apache.org/. 一.安装 flume提供了二进制安装版本,所有我们可以选择直接下载二进制安装

Ubuntu中使用source报错处理办法

最近一段时间在使用Bash on Ubuntu on Windows做shell脚本调试时发现在脚本中使用source时会报错,上网查了下才了解到原来是在Ubuntu中使用的并不是bash,而是使用优化过的dash,而在dash中是没有source,而在bash中才有source,这就有点麻烦了,平时在写脚本时有时会调用系统的function和一些自定义的function,没有source确实有点不太方便,而在此时可以在使用dpkg-reconfigure来修改配置dash,在平时可以用以下方法

linux中fork, source和exec的区别

转:linux中fork, source和exec的区别 shell的命令可以分为内部命令和外部命令. 内部命令是由特殊的文件格式.def实现的,如cd,ls等.而外部命令是通过系统调用或独立程序实现的,如awk,sed. source和exec都是内部命令. fork   使用 fork 方式运行 script 时, 就是让 shell(parent process) 产生一个 child process 去执行该 script, 当 child process 结束后, 会返回 parent

VC中利用多线程技术实现线程之间的通信

文章来源:[url]http://www.programfan.com/article/showarticle.asp?id=2951[/url] 当前流行的Windows操作系统能同时运行几个程序(独立运行的程序又称之为进程),对于同一个程序,它又可以分成若干个独立的执行流,我们称之为线程,线程提供了多任务处理的能力.用进程和线程的观点来研究软件是当今普遍采用的方法,进程和线程的概念的出现,对提高软件的并行性有着重要的意义.现在的大型应用软件无一不是多线程多任务处理,单线程的软件是不可想象的.