Flink的编程模型
1、获取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加载、创建数据;
DataSet
3、数据转换;
Transformation
4、数据结果存放;
5、触发执行。
env.execution
下面为flink输出wordcount数据:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkMain {
@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{
@SuppressWarnings("rawtypes")
@Override
/**
* @param value 原数据
* @param out 输出的数据
*/
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.split(" ");
for (String token : tokens) {
if(token!=null && token.length()>0){
Tuple2 t = new Tuple2<String, Integer>(token,1);
out.collect(t);
}
}
}
}
public static void main(String[] args) throws Exception {
//创建flink上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//创建数据集
DataSet<String> text = env.fromElements("to be","or no to be","is question");
//对数据集转换
DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
//输出转换后的数据集(print中包含了env.execute执行)
count.print();
System.out.println("-----------------------");
//对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数
count = count.groupBy(0).sum(1);
//控制台输出数据集
count.print();
System.out.println("-----------------------");
}
}
Flink使用sql方式转换数据
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class FlinkMain2 {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
//创建flink上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
List<WordCount> list = new ArrayList();
String workStr = "to be or no to be is question";
String[] tokens = workStr.split(" ");
for (String token : tokens) {
if(token!=null && token.length()>0){
list.add( new WordCount(token,1));
}
}
//创建数据集
DataSet<WordCount> input = env.fromCollection(list);
//注册为数据表wordCount为数据库表,word,frequency为wordCount表字段
tEnv.registerDataSet("wordCount", input, "word, frequency");
Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );
DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
//控制台输出
res.print();
}
public static class WordCount {
public String word;
public long frequency;
public WordCount(){}
public WordCount(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "词语:" + word + ",词频:" + frequency;
}
}
}
原文地址:https://blog.51cto.com/14538371/2436853