flink 从mysql 读取数据 放入kafka中 用于搜索全量

接着上一篇,将mysql的数据导入kafka中

public static void main(String[] arg) throws Exception {

        TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO };

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://*:3306/tablename?characterEncoding=utf8")
                .setUsername("*").setPassword("*")
                .setQuery("select LOGIC_CODE from *").setRowTypeInfo(rowTypeInfo).finish();

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Row> s = env.createInput(jdbcInputFormat);
        BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());
        tableEnv.registerDataSet("t2", s);
        Table query = tableEnv.sqlQuery("select * from t2");
        DataSet<String> result = tableEnv.toDataSet(query, Row.class).map(new MapFunction<Row, String>(){
            @Override
              public String map(Row value) throws Exception {
                return value.toString() ;
              }

        });

        logger.info("read db end"); 

        KafkaOutputFormat kafkaOutput = KafkaOutputFormat.buildKafkaOutputFormat()
                .setBootstrapServers("*:9092").setTopic("search_test_whk").setAcks("all").setBatchSize("1000")
                .setBufferMemory("100000").setLingerMs("1").setRetries("2").finish();

        result.output(kafkaOutput);

        logger.info("write kafka end");

        env.execute("Flink add data source");

    }

原文地址:https://blog.51cto.com/12597095/2395983

时间: 2024-10-11 05:54:59

flink 从mysql 读取数据 放入kafka中 用于搜索全量的相关文章

PHP MySQL 读取数据

PHP MySQL 读取数据 从 MySQL 数据库读取数据 SELECT 语句用于从数据表中读取数据: SELECT column_name(s) FROM table_name 如需学习更多关于 SQL 的知识,请访问我们的 SQL 教程. 以下实例中我们从表 MyGuests 读取了 id, firstname 和 lastname 列的数据并显示在页面上: 实例 (MySQLi - 面向对象) <?php$servername = "localhost";$usernam

运用html5+css3+jq+js实现添加的数据放入本地存储和界面

<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <title>历史记录</title> <style> *{ margin: 0; padding: 0; } body{ margin-left: 300px; } ul{ list-style: none; } ul li,div{ width: 250p

.Net中把图片等文件放入DLL中,并在程序中引用

[摘要] 有时我们需要隐藏程序中的一些资源,比如游戏,过关后才能看到图片,那么图片就必须隐藏起来,否则不用玩这个游戏就可以看到你的图片了,呵呵. 本文就讲述了如何把文件(比如图片,WORD文档等等) 隐藏到DLL中,然后在程序中可以自己根据需要导出图片进行处理. 注:本站原创,转载请注明本站网址:http://www.beinet.cn/blog/ [全文] 第1步:我们要生成一个资源文件,先把要隐藏的文件放入到这个资源文件中 (资源文件大致可以存放三种数据资源:字节数组.各种对象和字符串) 首

MyBatis 遍历数组放入in中

必须要遍历出数组的值放入in中 如果直接将"'2','3','4','5','6','7','8'" 字符串放入in中,只会查出 inv_operate_type的值为2的数据,因为myBatis将这个判断成了字符串,逗号失效了.

maven项目放入tomcat中找不到jar包

maven项目放入tomcat中时,总是报错,而且这些jar都是真实存在的,错误如下: maven eclipse tomcat java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderListener 解决办法:

OC--有这么一个 整数 123456789,如何将这个整数的每一位数,从末位开始依次放入数组中,并遍历 倒序输出字符串

有这么一个 整数 123456789,如何将这个整数的每一位数,从末位开始依次放入数组中,并遍历 1 NSInteger num=123456789; 2 NSString *str=[[NSString alloc]initWithFormat:@"%ld",num ]; 3 NSMutableArray *arr=[NSMutableArray array]; 4 for(NSInteger i=str.length-1;i>=0;i--){ 5 //从末位开始截取字符串,每

pyqt字符串分离开,放入列表中

string1 = ''''' the stirng Has many line In THE fIle ''' list_of_string = string1.split() print list_of_string #将字符串分离开,放入列表中 print '*'*50 def case_insensitive_sort(liststring): listtemp = [(x.lower(),x) for x in liststring] listtemp.sort() return [x

【算法与数据结构实战】线性表操作-实现A并B,结果放入A中

//数据结构与算法基础题1:线性表操作,实现A并B,结果放入A中 #include "stdafx.h" #include <iostream> #include <string> #include <vector> #include <algorithm> #pragma warning(disable:4996) using namespace std; int main() { vector<int> set_A, se

python:将一个数逆序列放入列表中,例如1234 => [4,3,2,1]

今天有小伙伴问题了一个题:将一个数逆序列放入列表中,例如1234 => [4,3,2,1].要求用递归实现.粗看这个题的话,很容易实现的:int 转成str ,然后倒序,再把列表里面的 str 转成 int.再来看递归:一般递归里面都是要有个结束条件,这个题的结束条件也很好确定,它是这个列表,列表有长度,每次pop 一个 元素,直到列表的长度 等于 0 的时候,那就可以结束了.先来看个普通的递归方法: def reverse_order_list1(lst:list, tmp=[]): if l