首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache :执行环境和多个接收器

Apache :执行环境和多个接收器
EN

Stack Overflow用户
提问于 2018-06-28 02:44:18
回答 1查看 9.5K关注 0票数 2

我的问题可能会引起一些混乱,所以请先看描述。找出我的问题可能会有帮助。我将在问题的末尾添加我的代码(关于我的代码结构/实现的任何建议也是受欢迎的)。,谢谢您的帮助!

我的问题:

  1. 如何在Flink批处理中定义多个接收器,而不让它从一个源重复获取数据?
  2. createCollectionEnvironment()getExecutionEnvironment()有什么区别?我应该在当地环境中使用哪一种?
  3. env.execute()的用途是什么?我的代码将输出没有这个句子的结果。如果我加上这句话,它会弹出一个例外:

-

代码语言:javascript
复制
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) 
    at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) 
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) 
    at MainClass.main(MainClass.java:114)

Description:编程新手。最近我需要处理一些数据(分组数据、计算标准差等)。使用Flink批量处理。然而,我到了需要输出两个DataSet的地步。这个结构是这样的

从源(数据库) -> DataSet 1(使用zipWithIndex()添加索引) -> DataSet 2(在保持索引的同时做一些计算)-> DataSet 3

首先,我输出DataSet 2,例如,索引从1到10000;然后输出DataSet 3 --索引从10001变为20000,尽管我没有在任何函数中更改值。我的猜测是,当输出DataSet 3而不是使用以前计算过的DataSet 2的结果时,它从再次从数据库获取数据开始,然后执行计算。使用ZipWithIndex()函数不仅给出了错误的索引号,而且增加了与db的连接。

我猜这与执行环境有关,就像我使用

ExecutionEnvironment.createCollectionsEnvironment();= ExecutionEnvironment env

将给出“错误”的指数(10001-20000)和

ExecutionEnvironment.getExecutionEnvironment();= ExecutionEnvironment env

将给出正确的索引号(1-10000),所需时间和数据库连接数不同,打印顺序将颠倒。

OS,DB,其他环境细节和版本: IntelliJ IDEA 2017.3.5 (Community ) Build #IC-173.4674.33,建于2018年3月6日,JRE: 1.8.0_152-release-1024-b15 amd64 JVM: OpenJDK 64位服务器VM by JetBrains s.r.o Windows 1010.0

我的测试代码():

公共静态空主( ExecutionEnvironment.createCollectionsEnvironment();args)抛出异常{ ExecutionEnvironment env =ExecutionEnvironment

代码语言:javascript
复制
    //Table is used to calculate the standard deviation as I figured that there is no such calculation in DataSet.
    BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);

    //Get Data from a mySql database
    DataSet<Row> dbData =
            env.createInput(
                    JDBCInputFormat.buildJDBCInputFormat()
                            .setDrivername("com.mysql.cj.jdbc.Driver")
                            .setDBUrl($database_url)
                            .setQuery("select value from $table_name where id =33")
                            .setUsername("username")
                            .setPassword("password")
                            .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.DOUBLE_TYPE_INFO))
                            .finish()
            );

    // Add index for assigning group (group capacity is 5)
    DataSet<Tuple2<Long, Row>> indexedData = DataSetUtils.zipWithIndex(dbData);

    // Replace index(long) with group number(int), and convert Row to double at the same time
    DataSet<Tuple2<Integer, Double>> rawData = indexedData.flatMap(new GroupAssigner());

    //Using groupBy() to combine individual data of each group into a list, while calculating the mean and range in each group
    //put them into a POJO named GroupDataClass
    DataSet<GroupDataClass> groupDS = rawData.groupBy("f0").combineGroup(new GroupCombineFunction<Tuple2<Integer, Double>, GroupDataClass>() {
        @Override
        public void combine(Iterable<Tuple2<Integer, Double>> iterable, Collector<GroupDataClass> collector) {
            Iterator<Tuple2<Integer, Double>> it = iterable.iterator();
            Tuple2<Integer, Double> var1 = it.next();
            int groupNum = var1.f0;

            // Using max and min to calculate range, using i and sum to calculate mean
            double max = var1.f1;
            double min = max;
            double sum = 0;
            int i = 1;

            // The list is to store individual value
            List<Double> list = new ArrayList<>();
            list.add(max);

            while (it.hasNext())
            {
                double next = it.next().f1;
                sum += next;
                i++;
                max = next > max ? next : max;
                min = next < min ? next : min;
                list.add(next);
            }

            //Store group number, mean, range, and 5 individual values within the group
            collector.collect(new GroupDataClass(groupNum, sum / i, max - min, list));
        }
    });

    //print because if no sink is created, Flink will not even perform the calculation.
    groupDS.print();


    // Get the max group number and range in each group to calculate average range
    // if group number start with 1 then the maximum of group number equals to the number of group
    // However, because this is the second sink, data will flow from source again, which will double the group number
    DataSet<Tuple2<Integer, Double>> rangeDS = groupDS.map(new MapFunction<GroupDataClass, Tuple2<Integer, Double>>() {
        @Override
        public Tuple2<Integer, Double> map(GroupDataClass in) {
            return new Tuple2<>(in.groupNum, in.range);
        }
    }).max(0).andSum(1);

    // collect and print as if no sink is created, Flink will not even perform the calculation.
    Tuple2<Integer, Double> rangeTuple = rangeDS.collect().get(0);
    double range = rangeTuple.f1/ rangeTuple.f0;
    System.out.println("range = " + range);
}

public static class GroupAssigner implements FlatMapFunction<Tuple2<Long, Row>, Tuple2<Integer, Double>> {
    @Override
    public void flatMap(Tuple2<Long, Row> input, Collector<Tuple2<Integer, Double>> out) {

        // index 1-5 will be assigned to group 1, index 6-10 will be assigned to group 2, etc.
        int n = new Long(input.f0 / 5).intValue() + 1;
        out.collect(new Tuple2<>(n, (Double) input.f1.getField(0)));
    }
}
EN

回答 1

Stack Overflow用户

发布于 2018-06-28 11:43:29

  1. 可以将一个源连接到多个接收器,源只执行一次,记录被广播到多个接收器。参见这个问题Flink能否将结果写入多个文件(如Hadoop的MultipleOutputFormat)?
  2. 当您想要运行您的工作时,getExecutionEnvironment是获得环境的正确方法。createCollectionEnvironment是一个很好的方式来玩和测试。见文档
  3. 异常错误消息非常清楚:如果您调用、打印或收集您的数据流,则执行。所以你有两个选择:
  • 要么在数据流结束时调用print/collect,然后执行并打印它。这对测试有用。请记住,每个数据流只能调用一次收集/打印,否则会在未完全定义的情况下多次执行
  • 要么在数据流末尾添加一个接收器,然后调用env.execute()。这就是你想要做的,一旦你的流动处于一个更成熟的状态。
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51073946

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档