out) throws Exception {
                String connection = DBUtils.getConnection();
                System.out.println("connection = [" + connection + "]");
                DBUtils.returnConnection(connection);
            }
        }).print();
    }first   groupBy sortGroup Scala first表示获取前几个,groupBy表示分组,sortGroup表示分组内排序
def firstFunction(env:ExecutionEnvironment): Unit = {
    val info = ListBuffer[(Int, String)]()
    info.append((1, "hadoop"))
    info.append((1, "spark"))
    info.append((1, "flink"))
    info.append((2, "java"))
    info.append((2, "springboot"))
    info.append((3, "linux"))
    info.append((4, "vue"))
    val data = env.fromCollection(info)
    data.first(3).print()
    //输出:(1,hadoop)
    //(1,spark)
    //(1,flink)
    data.groupBy(0).first(2).print()//根据第一个字段分组,每个分组获取前两个数据
    //(3,linux)
    //(1,hadoop)
    //(1,spark)
    //(2,java)
    //(2,springboot)
    //(4,vue)
    data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根据第一个字段分组,然后在分组内根据第二个字段升序排序,并取出前两个数据
    //输出(3,linux)
    //(1,flink)
    //(1,hadoop)
    //(2,java)
    //(2,springboot)
    //(4,vue)
  } Java     public static void firstFunction(ExecutionEnvironment env) throws Exception {
        List> info = new ArrayList<>();
        info.add(new Tuple2<>(1, "hadoop"));
        info.add(new Tuple2<>(1, "spark"));
        info.add(new Tuple2<>(1, "flink"));
        info.add(new Tuple2<>(2, "java"));
        info.add(new Tuple2<>(2, "springboot"));
        info.add(new Tuple2<>(3, "linux"));
        info.add(new Tuple2<>(4, "vue"));
        DataSource> data = env.fromCollection(info);
        data.first(3).print();
        data.groupBy(0).first(2).print();
        data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
    }   FlatMap Function 获取一个元素,然后产生0个、1个或多个元素
Scala   def flatMapFunction(env: ExecutionEnvironment): Unit = {
    val info = ListBuffer[(String)]()
    info.append("hadoop,spark");
    info.append("hadoop,flink");
    info.append("flink,flink");
    val data = env.fromCollection(info)
    data.flatMap(_.split(",")).print()
  } 输出:
hadoop
spark
hadoop
flink
flink
flink FlatMap将每个元素都用逗号分割,然后变成多个。
经典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print() 将每个元素用逗号分割,然后每个元素做map,然后根据第一个字段分组,然后根据第二个字段求和。
输出结果如下:
(hadoop,2)
(flink,3)
(spark,1) Java 同样实现一个经典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
        List info = new ArrayList<>();
        info.add("hadoop,spark");
        info.add("hadoop,flink");
        info.add("flink,flink");
        DataSource data = env.fromCollection(info);
        data.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String input, Collector out) throws Exception {
                String[] splits = input.split(",");
                for(String split: splits) {
                    //发送出去
                    out.collect(split);
                }
            }
        }).map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return new Tuple2<>(value,1);
            }
        }).groupBy(0).sum(1).print();
    }       Distinct 去重操作
Scala   def distinctFunction(env: ExecutionEnvironment): Unit = {
    val info = ListBuffer[(String)]()
    info.append("hadoop,spark");
    info.append("hadoop,flink");
    info.append("flink,flink");
    val data = env.fromCollection(info)
    data.flatMap(_.split(",")).distinct().print()
  } 这样就将每一个元素都做了去重操作。输出如下:
hadoop
flink
spark Java     public static void distinctFunction(ExecutionEnvironment env) throws Exception {
        List info = new ArrayList<>();
        info.add("hadoop,spark");
        info.add("hadoop,flink");
        info.add("flink,flink");
        DataSource data = env.fromCollection(info);
        data.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String input, Collector out) throws Exception {
                String[] splits = input.split(",");
                for(String split: splits) {
                    //发送出去
                    out.collect(split);
                }
            }
        }).distinct().print();
    }     Join Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
result = input1.join(input2)
               .where(0)       // key of the first input (tuple field 0)
               .equalTo(1);    // key of the second input (tuple field 1)   表示第一个tuple input1中的第0个字段,与第二个tuple input2中的第一个字段进行join。
  def joinFunction(env: ExecutionEnvironment): Unit = {
    val info1 = ListBuffer[(Int, String)]() //编号 名字
    info1.append((1, "hadoop"))
    info1.append((2, "spark"))
    info1.append((3, "flink"))
    info1.append((4, "java"))
    val info2 = ListBuffer[(Int, String)]() //编号 城市
    info2.append((1, "北京"))
    info2.append((2, "上海"))
    info2.append((3, "深圳"))
    info2.append((5, "广州"))
    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)
    data1.join(data2).where(0).equalTo(0).apply((first, second)=>{
      (first._1, first._2, second._2)
    }).print()
  } 输出结果如下:
(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海) Java     public static void joinFunction(ExecutionEnvironment env) throws Exception {
        List> info1 = new ArrayList<>(); //编号 名字
        info1.add(new Tuple2<>(1, "hadoop"));
        info1.add(new Tuple2<>(2, "spark"));
        info1.add(new Tuple2<>(3, "flink"));
        info1.add(new Tuple2<>(4, "java"));
        List> info2 = new ArrayList<>(); //编号 城市
        info2.add(new Tuple2<>(1, "北京"));
        info2.add(new Tuple2<>(2, "上海"));
        info2.add(new Tuple2<>(3, "深圳"));
        info2.add(new Tuple2<>(5, "广州"));
        DataSource> data1 = env.fromCollection(info1);
        DataSource> data2 = env.fromCollection(info2);
        data1.join(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
            @Override
            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                return new Tuple3(first.f0, first.f1,second.f1);
            }
        }).print();
    }            Tuple2, Tuple2表示两个输入的集合,Tuple3>表示输出的Tuple3   
OuterJoin 上面讲的join是内连接,这个OuterJoin是外连接,包括左外连接,右外连接,全连接在两个数据集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = {
    val info1 = ListBuffer[(Int, String)]() //编号 名字
    info1.append((1, "hadoop"))
    info1.append((2, "spark"))
    info1.append((3, "flink"))
    info1.append((4, "java"))
    val info2 = ListBuffer[(Int, String)]() //编号 城市
    info2.append((1, "北京"))
    info2.append((2, "上海"))
    info2.append((3, "深圳"))
    info2.append((5, "广州"))
    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)
    data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (second == null) {
        (first._1, first._2, "-")
      }else {
        (first._1, first._2, second._2)
      }
    }).print() //左外连接 把左边的所有数据展示出来
  } 左外连接,当左边的数据在右边没有对应的数据时,需要进行处理,否则会出现空指针异常。输出如下:
(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)
(4,java,-) 右外连接:
    data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (first == null) {
        (second._1, "-", second._2)
      }else {
        (first._1, first._2, second._2)
      }
    }).print() 右外连接,输出:
(3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海) 全连接:
    data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (first == null) {
        (second._1, "-", second._2)
      }else if (second == null){
        (second._1, "-", second._2)
      } else {
        (first._1, first._2, second._2)
      }
    }).print() (3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海)
(4,java,-) Java 左外连接:
    public static void outjoinFunction(ExecutionEnvironment env) throws Exception {
        List> info1 = new ArrayList<>(); //编号 名字
        info1.add(new Tuple2<>(1, "hadoop"));
        info1.add(new Tuple2<>(2, "spark"));
        info1.add(new Tuple2<>(3, "flink"));
        info1.add(new Tuple2<>(4, "java"));
        List> info2 = new ArrayList<>(); //编号 城市
        info2.add(new Tuple2<>(1, "北京"));
        info2.add(new Tuple2<>(2, "上海"));
        info2.add(new Tuple2<>(3, "深圳"));
        info2.add(new Tuple2<>(5, "广州"));
        DataSource> data1 = env.fromCollection(info1);
        DataSource> data2 = env.fromCollection(info2);
        data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
            @Override
            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                if(second == null) {
                    return new Tuple3(first.f0, first.f1, "-");
                }
                return new Tuple3(first.f0, first.f1,second.f1);
            }
        }).print();
    }             右外连接:
        data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
            @Override
            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                if (first == null) {
                    return new Tuple3(second.f0, "-", second.f1);
                }
                return new Tuple3(first.f0, first.f1, second.f1);
            }
        }).print();         全连接:
        data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
            @Override
            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
                if (first == null) {
                    return new Tuple3(second.f0, "-", second.f1);
                } else if (second == null) {
                    return new Tuple3(first.f0, first.f1, "-");
                }
                return new Tuple3(first.f0, first.f1, second.f1);
            }
        }).print();          cross function Scala 笛卡尔积,左边与右边交叉处理
  def crossFunction(env: ExecutionEnvironment): Unit = {
    val info1 = List("乔峰", "慕容复")
    val info2 = List(3,1,0)
    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)
    data1.cross(data2).print()
  } 输出:
(乔峰,3)
(乔峰,1)
(乔峰,0)
(慕容复,3)
(慕容复,1)
(慕容复,0) Java public static void crossFunction(ExecutionEnvironment env) throws Exception {
        List info1 = new ArrayList<>();
        info1.add("乔峰");
        info1.add("慕容复");
        List info2 = new ArrayList<>();
        info2.add("3");
        info2.add("1");
        info2.add("0");
        DataSource data1 = env.fromCollection(info1);
        DataSource data2 = env.fromCollection(info2);
        data1.cross(data2).print();
    }     到此,关于“Apache下Flink transformation的用法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
            
            
                         
            网页名称:Apache下Flinktransformation的用法             
            路径分享:http://www.wtcwzsj.com/article/pshoog.html 
        
    
    
    
    
        
            
                关于我们 
                
                    
                 
             
            
                我们的服务 
                
                    
                 
             
            
                网站建设案例 
                
                    
                 
             
            
                新闻动态 
                
                    
                 
             
            
                联系我们 
                
                    
                    135-1821-9792 
                    公司服务热线 
                 
             
         
        
     
    
        
            Copyright © 2009-2022 www.wtcwzsj.com 青羊区广皓图文设计工作室(个体工商户) 版权所有  蜀ICP备19037934号