Flink批处理之读写Mysql

1、添加Maven坐标

<dependency>
       <groupId>mysql</groupId>
       <artifactId>mysql-connector-java</artifactId>
       <version>5.1.48</version>
</dependency>

 <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_2.12</artifactId>
         <version>1.8.0</version>
 </dependency>

2、建表

CREATE TABLE `temp` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `time` varchar(255) DEFAULT NULL,
  `type` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

3、 Show Code

package com.fwmagic.flink.batch;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

import java.util.concurrent.TimeUnit;

public class BatchDemoOperatorMysql {
    public static void main(String[] args) throws Exception {

        String driverClass = "com.mysql.jdbc.Driver";
        String dbUrl = "jdbc:mysql://localhost:3306/test";
        String userNmae = "root";
        String passWord = "123456";
        String sql = "insert into test.temp (name,time,type) values (?,?,?)";

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        /**
         * 文件内容:
         * 关羽,2019-10-14 00:00:01,1
         * 张飞,2019-10-14 00:00:02,2
         * 赵云,2019-10-14 00:00:03,3
         */

        String filePath = "/Users/temp/data.csv";

        //读csv文件内容,转成Row对象
        DataSet<Row> outputData = env.readCsvFile(filePath).fieldDelimiter(",").types(String.class, String.class, Long.class).map(new MapFunction<Tuple3<String, String, Long>, Row>() {
            @Override
            public Row map(Tuple3<String, String, Long> t) throws Exception {
                Row row = new Row(3);
                row.setField(0, t.f0.getBytes("UTF-8"));
                row.setField(1, t.f1.getBytes("UTF-8"));
                row.setField(2, t.f2.longValue());
                return row;
            }
        });

        //将Row对象写到mysql
        outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername(driverClass)
                .setDBUrl(dbUrl)
                .setUsername(userNmae)
                .setPassword(passWord)
                .setQuery(sql)
                .finish());

        //触发执行
        env.execute("insert data to mysql");

        System.out.println("mysql写入成功!");

        TimeUnit.SECONDS.sleep(6);

        //读mysql
        DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername(driverClass)
                .setDBUrl(dbUrl)
                .setUsername(userNmae)
                .setPassword(passWord)
                .setQuery("select * from temp")
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))
                .finish());

        //获取数据并打印
        dataSource.map(new MapFunction<Row, String>() {
            @Override
            public String map(Row value) throws Exception {
                System.out.println(value);
                return value.toString();
            }
        }).print();

    }
}

4、注意事项

  • 数据写入mysql的DataSet泛型要求是row,需要转换;
  • 数据读取的结果也是row类型,不能直接print,需要转换;
  • 数据写入后一定要加上env.execute(),触发任务执行;
  • 涉及到中文的,需要转换成UTF-8,不然数据库中会出现乱码。

原文地址:https://blog.51cto.com/simplelife/2443000

时间: 2024-10-10 17:25:08

Flink批处理之读写Mysql的相关文章

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

Flink批处理优化器之成本估算

成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成.在Flink中成本估算依赖于每个不同的运算符所提供的自己的"预算",本篇我们将分析什么是成本.运算符如何提供自己的预算以及如何基于预算估算成本. 什么是成本 Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加.减.乘.除)以及对这些因素未知值的认定与校验. "cost"一词也有译作:开销.代价,将其视为同义即可. Flink当前将成本估算

R语言使用RMySQL连接及读写Mysql数据库 测试通过

R语言使用RMySQL连接及读写Mysql数据库 简单说下安装过程,一般不会有问题,重点是RMySQL的使用方式. 系统环境说明 Redhat系统:Linux 460-42.6.32-431.29.2.el6.x86_64 系统编码:LANG=zh_CN.UTF-8(中文UTF-8格式) MySQL版本:mysql  Ver 14.14 Distrib 5.1.73, forredhat-linux-gnu (x86_64) using readline 5.1   安装mysql 1.    

JDBC读写MySQL的大字段数据

JDBC读写MySQL的大字段数据 不管你是新手还是老手,大字段数据的操作常常令你感到很头痛.因为大字段有些特殊,不同数据库处理的方式不一样,大字段的操作常常是以流的方式 来处理的.而非一般的字段,一次即可读出数据.本人以前用到Spring+iBatis架构来操作大字段,结果以惨烈失败而告终,在网上寻求解决方案,也 没找到答案.最终以JDBC来实现了大字段操作部分. 本文以MySQL为例,通过最基本的JDBC技术来处理大字段的插入.读取操作. 环境: MySQL5.1 JDK1.5 一.认识My

Python多进程爬虫东方财富盘口异动数据+Python读写Mysql与Pandas读写Mysql效率对比

先上个图看下网页版数据.mysql结构化数据 通过Python读写mysql执行时间为:1477s,而通过Pandas读写mysql执行时间为:47s,方法2速度几乎是方法1的30倍.在于IO读写上,Python多线程显得非常鸡肋,具体分析可参考:https://cuiqingcai.com/3325.html 1.Python读写Mysql # -*- coding: utf-8 -*- import pandas as pd import tushare as ts import pymys

浅谈Flink批处理优化器之Join优化

跟传统的关系型数据库类似,Flink提供了优化器"hint"(提示)以告诉优化器选择一些执行策略.目前优化提示主要针对批处理中的连接(join).在批处理中共有三个跟连接有关的转换函数: join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.wikipedia.org/wiki/Join_(SQL): outerJoin:外连接,具体细分为left-outer join.right-outer join.full-

Flink批处理优化器之范围分区重写

为最终计划应用范围分区重写 Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env); ds.p

R语言使用RMySQL连接及读写Mysql数据库

简单说下安装过程,一般不会有问题,重点是RMySQL的使用方式. 系统环境说明 Redhat系统:Linux 460-42.6.32-431.29.2.el6.x86_64 系统编码:LANG=zh_CN.UTF-8(中文UTF-8格式) mysql版本号:mysql  Ver 14.14 Distrib 5.1.73, forredhat-linux-gnu (x86_64) using readline 5.1 安装mysql 1.      查看是否安装 yum list installe

批处理备份WINDOS mysql

先在服务中找到mysql服务 本例 mysqlzt 在D盘新建备份文件夹     本例 D:\BACKUP 找到mysql 数据库文件,  本例 D:\xampp\mysql\data 批处理文件 先停止服务 全盘备份到以日期命名的文件夹中 再启用服务 %date:~0,10%  以时间命令的文件夹 c: cd / net stop mysqlzt xcopy D:\xampp\mysql\data /S D:\BACKUP\%date:~0,10%\ /y net start mysqlzt