Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder

一般情况下我们在使用Dataset<Row>进行groupByKey时,你会发现这个方法最后一个参数需要一个encoder,那么这些encoder如何定义呢?

一般数据类型

static Encoder<byte[]>    BINARY()                           An encoder for arrays of bytes.
static Encoder<Boolean>    BOOLEAN()                         An encoder for nullable boolean type.
static Encoder<Byte>    BYTE()                               An encoder for nullable byte type.
static Encoder<java.sql.Date>    DATE()                      An encoder for nullable date type.
static Encoder<java.math.BigDecimal>    DECIMAL()            An encoder for nullable decimal type.
static Encoder<Double>    DOUBLE()                           An encoder for nullable double type.
static Encoder<Float>    FLOAT()                             An encoder for nullable float type.
static Encoder<Integer>    INT()                             An encoder for nullable int type.
static Encoder<Long>    LONG()                               An encoder for nullable long type.
static Encoder<Short>    SHORT()                             An encoder for nullable short type.
static Encoder<String>    STRING()                           An encoder for nullable string type.
static Encoder<java.sql.Timestamp>    TIMESTAMP()            An encoder for nullable timestamp type.

示例:

== Scala == Encoders are generally created automatically through implicits from a SparkSession, or can be explicitly created by calling static methods on Encoders.
   import spark.implicits._
   val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder)
== Java == Encoders are specified by calling static methods on Encoders.
   List<String> data = Arrays.asList("abc", "abc", "xyz");
   Dataset<String> ds = context.createDataset(data, Encoders.STRING()); 

Class类型:

Or constructed from Java Beans:
   Encoders.bean(MyClass.class); 

Tuple类型:

一般类型的Tuple

   Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING());
   List<Tuple2<Integer, String>> data2 = Arrays.asList(new scala.Tuple2(1, "a");
   Dataset<Tuple2<Integer, String>> ds2 = context.createDataset(data2, encoder2);

Tuple包含类的:

Encoder<Tuple2<String, MyClass>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.bean(MyClass.class));

关于Encoder请参考《http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoder.html》

关于Encoders请参考《http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoders.html》

原文地址:https://www.cnblogs.com/yy3b2007com/p/9551644.html

时间: 2024-11-08 22:44:11

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder的相关文章

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十九)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. Kafka 如何读取offset topic内容 (__consumer_offsets) 原

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)kafka+spark streaming打包好的程序提交时提示虚拟内存不足(Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 G)

异常问题:Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container. spark-submit提交脚本: [[email protected] work]$ more submit.sh #! /bin/bash jars=""

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

如何配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 安装spark的服务器: 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(八)安装zookeeper-3.4.12

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。

Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.>配置好虚拟机,正在使用中,让它强制断电后,启动起来发现ip无法访问,而且重启网络失败: 执行:systemctl restart network.service 出现异常:Failed to start LSB: Br

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave2 具体请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 2. 安装zookeeper分布式集群具体请参考<Kafka:ZK+Kafka+Spark Streaming集群

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(七)针对hadoop2.9.0启动DataManager失败问题

DataManager启动失败 启动过程中发现一个问题:slave1,slave2,slave3都是只启动了DataNode,而DataManager并没有启动: [[email protected] hadoop-2.9.0]$ jps 2497 Jps 2052 DataNode [[email protected] hadoop-2.9.0]$ jps 2497 Jps 2052 DataNode [[email protected] hadoop-2.9.0]$ jps 2497 Jps

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十七)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. 原文地址:https://www.cnblogs.com/yy3b2007com/p/9315

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十一)定制一个arvo格式文件发送到kafka的topic,通过sparkstreaming读取kafka的数据

定制avro schema: { "type": "record", "name": "userlog", "fields": [ {"name": "ip","type": "string"}, {"name": "identity","type":"str