Flink输出到Kafka(两种方式)

方式一:读取文件输出到Kafka

1.代码

import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

//温度传感器读取样例类case class SensorReading(id: String, timestamp: Long, temperature: Double)

object KafkaSinkTest {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1)

    import org.apache.flink.api.scala._    val inputStream = env.readTextFile("sensor.txt")    val dataStream = inputStream.map(x => {      val arr = x.split(",")      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString   //转成String方便序列化输出    })

    //sink    dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "sinkTest", new SimpleStringSchema()))    dataStream.print()

    env.execute(" kafka sink test")

  }}

2.启动zookeeper:参考https://www.cnblogs.com/wddqy/p/12156527.html3.启动kafka:参考https://www.cnblogs.com/wddqy/p/12156527.html4.创建kafka消费者观察结果

方式二:Kafka到Kafka

1.代码

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}

//温度传感器读取样例类case class SensorReading(id: String, timestamp: Long, temperature: Double)

object KafkaSinkTest1 {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1)

    import org.apache.flink.api.scala._    //从Kafka到Kafka    val properties = new Properties()    properties.setProperty("bootstrap.servers", "localhost:9092")    properties.setProperty("group.id", "consumer-group")    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")    properties.setProperty("auto.offset.reset", "latest")

    val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))    val dataStream = inputStream.map(x => {      val arr = x.split(",")      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString   //转成String方便序列化输出    })

    //sink    dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "sinkTest", new SimpleStringSchema()))    dataStream.print()

    env.execute(" kafka sink test")

  }}
2.启动zookeeper:参考https://www.cnblogs.com/wddqy/p/12156527.html3.启动kafka:参考https://www.cnblogs.com/wddqy/p/12156527.html4.创建Kafka生产者和消费者,运行代码,观察结果

原文地址:https://www.cnblogs.com/wddqy/p/12172801.html

时间: 2024-08-01 20:08:14

Flink输出到Kafka(两种方式)的相关文章

Python 输出百分比的两种方式

注: 在python3环境下测试. 方式1:直接使用参数格式化:{:.2%} {:.2%}: 显示小数点后2位 显示小数点后2位: >>> print('percent: {:.2%}'.format(42/50)) percent: 84.00% 1 2 不显示小数位:{:.0%},即,将2改为0: >>> print('percent: {:.0%}'.format(42/50)) percent: 84% 1 2 方式2:格式化为float,然后处理成%格式: {

输入一个整数score代表分数,根据分数输出等级(A-E)(用两种方式)

/* A:90~100 B:80~89 C:70~79 D:60~69 E:0~60 */ #include <stdio.h> int main() { // 1.提示输入 printf("请输入分数值:\n"); // 2.接收输入 int score; scanf("%d", &score); // 3.判断等级 (性能最高) if (score>=90 && score<=100) { // [90, 100]

sparkStreaming读取kafka的两种方式

概述 Spark Streaming 支持多种实时输入源数据的读取,其中包括Kafka.flume.socket流等等.除了Kafka以外的实时输入源,由于我们的业务场景没有涉及,在此将不会讨论.本篇文章主要着眼于我们目前的业务场景,只关注Spark Streaming读取Kafka数据的方式. Spark Streaming 官方提供了两种方式读取Kafka数据: 一是Receiver-based Approach.该种读取模式官方最先支持,并在Spark 1.2提供了数据零丢失(zero-d

Spark Streaming 读取 Kafka 数据的两种方式

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同. 1. Receiver-based Approach val kafkaStream = KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 2. Direct Approach (No Receivers) v

创建线程的两种方式

首先我们需要知道什么是线程:是程序执行流的最小单元,包括就绪.阻塞和运行三种基本状态. 举个简单的例子:我们把生活中的两件事吃饭和写作业当作是两个线程,当你正在写作业的时候,爸妈叫你吃饭,你就直接去了,等吃完饭回来后再接着写作业.这就是相当于两个线程其中一个从运行状态转入就绪状态,另一个线程从就绪状态转入运行状态. 创建线程包括继承Thread类和实现Runnable接口两种方式(JDK5.0以后还包括了实现Callable等方式来实现线程,这里不做介绍,感兴趣的小伙伴可以自己查资料),下面介绍

JAVA中Arrays.sort()使用两种方式(Comparable和Comparator接口)对对象或者引用进行排序

一.描述 自定义的类要按照一定的方式进行排序,比如一个Person类要按照年龄进行从小到大排序,比如一个Student类要按照成绩进行由高到低排序. 这里我们采用两种方式,一种是使用Comparable接口:让待排序对象所在的类实现Comparable接口,并重写Comparable接口中的compareTo()方法,缺点是只能按照一种规则排序. 另一种方式是使用Comparator接口:编写多个排序方式类实现Comparator接口,并重写新Comparator接口中的compare()方法,

遍历获得磁盘文件的两种方式

在winform中可能有这种情况,遍历某一个文件夹得到当前文件夹中的所有文件以及子文件夹中的所有文件,以此类推,然后添加到一个TreeView控件中,或者通过控制台输出文件以及文件夹的名称.方法多种多样,下面说的是通过递归和队列的方式来进行.递归其实就是在函数调用的时候进行压栈进行的,所以可以概述为通过栈和队列来实现. 递归方式实现 private void GetAllFile (string strPath,TreeNode parentNode) { //得到当前路径下的所有文件和文件夹

创建线程的两种方式比较Thread VS Runnable

1.首先来说说创建线程的两种方式 一种方式是继承Thread类,并重写run()方法 1 public class MyThread extends Thread{ 2 @Override 3 public void run() { 4 // TODO Auto-generated method stub 5 6 } 7 } 8 //线程使用 9 MyThread mt = new MyThread(); //创建线程 10 mt.start(); //启动线程 另外一种方式是实现Runnabl

Linux 远程桌面的两种方式

在绝多数情况下,Linux 不需要使用到GUI的桌面环境,但是有时在一些特殊的场景如安装Oracle的时候,需要有图形界面进行辅助才可以安装. 如果要使用Linux的图形界面,一般有两种方式: 1.Linux系统安装X Windows图形界面,使用vnc远程. 2.Linux系统启用X协议,配置X Clent,本地有桌面环境的机器(如Windows)配置X Server,获取远程的信息,在本地X server上显示图形界面. 方法一: 本机安装图形界面 这里以CentOS6.5的机器为例,安装图