Flink的DataSet基本算子总结

Flink为了能够处理有边界的数据集和无边界的数据集 , 提供了对应的DataSet API和DataStream API 。 我们可以开发对应的Java程序或者Scala程序来完成相应的功能 。 下面举例了一些DataSet API中的基本的算子 。
Flink的DataSet基本算子总结文章插图
下面我们通过具体的代码来为大家演示每个算子的作用 。
1、Map、FlatMap与MapPartition//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList data = http://kandian.youth.cn/index/new ArrayList();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource text = env.fromCollection(data);DataSet mapData = http://kandian.youth.cn/index/text.map(new MapFunction>() { public List map(String data) throws Exception {String[] words = data.split(" ");//创建一个ListList result = new ArrayList();for(String w:words){result.add(w);}return result; }});mapData.print();System.out.println("*****************************************");DataSet flatMapData = http://kandian.youth.cn/index/text.flatMap(new FlatMapFunction() { public void flatMap(String data, Collector collection) throws Exception {String[] words = data.split(" ");for(String w:words){collection.collect(w);} }});flatMapData.print();System.out.println("*****************************************");/* new MapPartitionFunction 第一个String:表示分区中的数据元素类型 第二个String:表示处理后的数据元素类型*/DataSet mapPartitionData = http://kandian.youth.cn/index/text.mapPartition(new MapPartitionFunction() { public void mapPartition(Iterable values, Collector out) throws Exception {//针对分区进行操作的好处是:比如要进行数据库的操作 , 一个分区只需要创建一个Connection//values中保存了一个分区的数据Iterator it = values.iterator();while (it.hasNext()) {String next = it.next();String[] split = next.split(" ");for (String word : split) {out.collect(word);}}//关闭链接 }});mapPartitionData.print();2、Filter与Distinct//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList data = http://kandian.youth.cn/index/new ArrayList();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource text = env.fromCollection(data);DataSet flatMapData = http://kandian.youth.cn/index/text.flatMap(new FlatMapFunction() { public void flatMap(String data, Collector collection) throws Exception {String[] words = data.split(" ");for(String w:words){collection.collect(w);} }});//去掉重复的单词flatMapData.distinct().print();System.out.println("*********************");//选出长度大于3的单词flatMapData.filter(new FilterFunction() {public boolean filter(String word) throws Exception {int length = word.length();return length>3?true:false; }}).print();3、Join操作//获取运行的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID姓名ArrayList> data1 = new ArrayList>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList> data2 = new ArrayList>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID姓名所在的程序DataSet> table1 = env.fromCollection(data1);DataSet> table2 = env.fromCollection(data2);table1.join(table2).where(0).equalTo(0)/*第一个Tuple2