Flink1.13架構全集| 一文帶你由淺入深精通Flink方方面面(六)

9.3.1 基本轉換算子

  1. 映射(map)

map是大家非常熟悉的大數據操作算子,主要用於將數據流中的數據進行轉換,形成新的數據流。簡單來說,就是一個「一一映射」,消費一個元素就產出一個元素,如圖所示。

我們只需要基於DataStrema調用map()方法就可以進行轉換處理。方法需要傳入的參數是接口MapFunction的實現;返回值類型還是DataStream,不過泛型(流中的元素類型)可能改變。

下面的代碼用不同的方式,實現了提取Event中的user字段的功能。

publicclass TransMap {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(1);
 
    DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
    );
 
    // 傳入匿名類,實現MapFunction
    stream.map(new MapFunction<Event, String>() {
      @Override
      public String map(Event e) throws Exception {
        return e.user;
      }
    });
 
    // 傳入MapFunction的實現類
    stream.map(new UserExtractor()).print();
 
    env.execute();
  }
  publicstaticclass UserExtractor implements MapFunction<Event, String> {
    @Override
    public String map(Event e) throws Exception {
      return e.user;
    }
  }
}

上面代碼中,MapFunction實現類的泛型類型,與輸入數據類型和輸出數據的類型有關。在實現MapFunction接口的時候,需要指定兩個泛型,分別是輸入事件和輸出事件的類型,還需要重寫一個map()方法,定義從一個輸入事件轉換為另一個輸出事件的具體邏輯。

  1. 過濾(Filterfilter轉換操作,顧名思義是對數據流執行一個過濾,通過一個布爾條件表達式設置過濾條件,對於每一個流內元素進行判斷,若為true則元素正常輸出,若為false則元素被過濾掉,如圖所示。

進行filter轉換之後的新數據流的數據類型與原數據流是相同的。filter轉換需要傳入的參數需要實現FilterFunction接口,而FilterFunction內要實現filter()方法,就相當於一個返回布爾類型的條件表達式。

下面的代碼會將數據流中用戶Mary的瀏覽行為過濾出來 。

publicclass TransFilter {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
 
    DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
    );
 
    // 傳入匿名類實現FilterFunction
    stream.filter(new FilterFunction<Event>() {
      @Override
      public boolean filter(Event e) throws Exception {
        return e.user.equals("Mary");
      }
    });
    // 傳入FilterFunction實現類
    stream.filter(new UserFilter()).print();  
    env.execute();
  }
  publicstaticclass UserFilter implements FilterFunction<Event> {
    @Override
    public boolean filter(Event e) throws Exception {
      return e.user.equals("Mary");
    }
  }
}
  1. 扁平映射(flatmap)

flatMap操作又稱為扁平映射,主要是將數據流中的整體(一般是集合類型)拆分成一個一個的個體使用。消費一個元素,可以產生0到多個元素。flatMap可以認為是「扁平化」(flatten)和「映射」(map)兩步操作的結合,也就是先按照某種規則對數據進行打散拆分,再對拆分後的元素做轉換處理,如圖5-7所示。我們此前WordCount程序的第一步分詞操作,就用到了flatMap。

同map一樣,flatMap也可以使用Lambda表達式或者FlatMapFunction接口實現類的方式來進行傳參,返回值類型取決於所傳參數的具體邏輯,可以與原數據流相同,也可以不同。

flatMap操作會應用在每一個輸入事件上面, FlatMapFunction接口中定義了flatMap方法,用戶可以重寫這個方法,在這個方法中對輸入數據進行處理,並決定是返回0個、1個或多個結果數據。因此flatMap並沒有直接定義返回值類型,而是通過一個「收集器」(Collector)來指定輸出。希望輸出結果時,只要調用收集器的.collect()方法就可以了;這個方法可以多次調用,也可以不調用。所以flatMap方法也可以實現map方法和filter方法的功能,當返回結果是0個的時候,就相當於對數據進行了過濾,當返回結果是1個的時候,相當於對數據進行了簡單的轉換操作。

flatMap的使用非常靈活,可以對結果進行任意輸出,下面就是一個例子:

publicclass TransFlatmap {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
    );
    stream.flatMap(new MyFlatMap()).print();
    env.execute();
  }
  publicstaticclass MyFlatMap implements FlatMapFunction<Event, String> {
    @Override
    public void flatMap(Event value, Collector<String> out) throws Exception {
      if (value.user.equals("Mary")) {
        out.collect(value.user);
      } elseif (value.user.equals("Bob")) {
        out.collect(value.user);
        out.collect(value.url);
      }
    }
  }
}

9.3.2 聚合算子(Aggregation)

在實際應用中,我們往往需要對大量的數據進行統計或整合,從而提煉出更有用的信息。比如之前word count程序中,要對每個詞出現的頻次進行疊加統計。這種操作,計算的結果不僅依賴當前數據,還跟之前的數據有關,相當於要把所有數據聚在一起進行匯總合併——這就是所謂的「聚合」(Aggregation),也對應着MapReduce中的reduce操作。

  1. 按鍵分區(keyBy)

對於Flink而言,DataStream是沒有直接進行聚合的API的。因為我們對海量數據做聚合肯定要進行分區並行處理,這樣才能提高效率。所以在Flink中,要做聚合,需要先進行分區;這個操作就是通過keyBy來完成的。

keyBy是聚合前必須要用到的一個算子。keyBy通過指定鍵(key),可以將一條流從邏輯上劃分成不同的分區(partitions)。這裡所說的分區,其實就是並行處理的子任務,也就對應着任務槽(task slot)。

keyBy是聚合前必須要用到的一個算子。keyBy通過指定鍵(key),可以將一條流從邏輯上劃分成不同的分區(partitions)。這裡所說的分區,其實就是並行處理的子任務,也就對應着任務槽(task slot)。

基於不同的key,流中的數據將被分配到不同的分區中去,如圖所示;這樣一來,所有具有相同的key的數據,都將被發往同一個分區,那麼下一步算子操作就將會在同一個slot中進行處理了。

在內部,是通過計算key的哈希值(hash code),對分區數進行取模運算來實現的。所以這裡key如果是POJO的話,必須要重寫hashCode()方法。

keyBy()方法需要傳入一個參數,這個參數指定了一個或一組key。有很多不同的方法來指定key:比如對於Tuple數據類型,可以指定字段的位置或者多個位置的組合;對於POJO類型,可以指定字段的名稱(String);另外,還可以傳入Lambda表達式或者實現一個鍵選擇器(KeySelector),用於說明從數據中提取key的邏輯。


我們可以以id作為key做一個分區操作,代碼實現如下:

publicclass TransKeyBy {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
 
    DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
    );
 
    // 使用Lambda表達式
    KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);
 
    // 使用匿名類實現KeySelector
    KeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {
      @Override
      public String getKey(Event e) throws Exception {
        return e.user;
      }
    });
    env.execute();
  }
}

需要注意的是,keyBy得到的結果將不再是DataStream,而是會將DataStream轉換為KeyedStream。KeyedStream可以認為是「分區流」或者「鍵控流」,它是對DataStream按照key的一個邏輯分區,所以泛型有兩個類型:除去當前流中的元素類型外,還需要指定key的類型。

KeyedStream也繼承自DataStream,所以基於它的操作也都歸屬於DataStream API。但它跟之前的轉換操作得到的SingleOutputStreamOperator不同,只是一個流的分區操作,並不是一個轉換算子。KeyedStream是一個非常重要的數據結構,只有基於它才可以做後續的聚合操作(比如sum,reduce)。

  1. 簡單聚合

有了按鍵分區的數據流KeyedStream,我們就可以基於它進行聚合操作了。Flink為我們內置實現了一些最基本、最簡單的聚合API,主要有以下幾種:

sum():在輸入流上,對指定的字段做疊加求和的操作。
 min():在輸入流上,對指定的字段求最小值。
 max():在輸入流上,對指定的字段求最大值。
 minBy():與min()類似,在輸入流上針對指定字段求最小值。不同的是,min()只計算指定字段的最小值,其他字段會保留最初第一個數據的值;而minBy()則會返回包含字段最小值的整條數據。
 maxBy():與max()類似,在輸入流上針對指定字段求最大值。兩者區別與min()/minBy()完全一致。

簡單聚合算子使用非常方便,語義也非常明確。這些聚合方法調用時,也需要傳入參數;但並不像基本轉換算子那樣需要實現自定義函數,只要說明聚合指定的字段就可以了。指定字段的方式有兩種:指定位置,和指定名稱。

對於元組類型的數據,可以使用這兩種方式來指定字段。需要注意的是,元組中字段的名稱,是以f0、f1、f2、…來命名的。

如果數據流的類型是POJO類,那麼就只能通過字段名稱來指定,不能通過位置來指定了。

publicclass TransAggregation {
 public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(1);
   DataStreamSource<Event> stream = env.fromElements(
       new Event("Mary", "./home", 1000L),
       new Event("Bob", "./cart", 2000L)
   );
   stream.keyBy(e -> e.user).max("timestamp");   // 指定字段名稱
   env.execute();
 }
}

簡單聚合算子返回的,同樣是一個SingleOutputStreamOperator,也就是從KeyedStream又轉換成了常規的DataStream。所以可以這樣理解:keyBy和聚合是成對出現的,先分區、後聚合,得到的依然是一個DataStream。而且經過簡單聚合之後的數據流,元素的數據類型保持不變。

一個聚合算子,會為每一個key保存一個聚合的值,在Flink中我們把它叫作「狀態」(state)。所以每當有一個新的數據輸入,算子就會更新保存的聚合結果,並發送一個帶有更新後聚合值的事件到下游算子。對於無界流來說,這些狀態是永遠不會被清除的,所以我們使用聚合算子,應該只用在含有有限個key的數據流上。

  1. 歸約聚合(reduce)

如果說簡單聚合是對一些特定統計需求的實現,那麼reduce算子就是一個一般化的聚合統計操作了。它可以對已有的數據進行歸約處理,把每一個新輸入的數據和當前已經歸約出來的值,再做一個聚合計算。

與簡單聚合類似,reduce操作也會將KeyedStream轉換為DataStream。它不會改變流的元素數據類型,所以輸出類型和輸入類型是一樣的。

調用KeyedStream的reduce方法時,需要傳入一個參數,實現ReduceFunction接口。接口在源碼中的定義如下:

publicinterface ReduceFunction<T> extends Function, Serializable {
  T reduce(T value1, T value2) throws Exception;
}

ReduceFunction接口裡需要實現reduce()方法,這個方法接收兩個輸入事件,經過轉換處理之後輸出一個相同類型的事件;所以,對於一組數據,我們可以先取兩個進行合併,然後再將合併的結果看作一個數據、再跟後面的數據合併,最終會將它「簡化」成唯一的一個數據,這也就是reduce「歸約」的含義。在流處理的底層實現過程中,實際上是將中間「合併的結果」作為任務的一個狀態保存起來的;之後每來一個新的數據,就和之前的聚合狀態進一步做歸約。

我們可以單獨定義一個函數類實現ReduceFunction接口,也可以直接傳入一個匿名類。當然,同樣也可以通過傳入Lambda表達式實現類似的功能。

與簡單聚合類似,reduce操作也會將KeyedStream轉換為DataStrema。它不會改變流的元素數據類型,所以輸出類型和輸入類型是一樣的。

下面我們來看一個稍複雜的例子。

我們將數據流按照用戶id進行分區,然後用一個reduce算子實現sum的功能,統計每個用戶訪問的頻次;進而將所有統計結果分到一組,用另一個reduce算子實現maxBy的功能,記錄所有用戶中訪問頻次最高的那個,也就是當前訪問量最大的用戶是誰。

publicclass TransReduce {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 這裡的ClickSource()使用了之前自定義數據源小節中的ClickSource()
    env.addSource(new ClickSource())
        // 將Event數據類型轉換成元組類型
        .map(new MapFunction<Event, Tuple2<String, Long>>() {
          @Override
          public Tuple2<String, Long> map(Event e) throws Exception {
            return Tuple2.of(e.user, 1L);
          }
        })
        .keyBy(r -> r.f0) // 使用用戶名來進行分流
        .reduce(new ReduceFunction<Tuple2<String, Long>>() {
          @Override
          public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
            // 每到一條數據,用戶pv的統計值加1
            return Tuple2.of(value1.f0, value1.f1 + value2.f1);
          }
        })
        .keyBy(r -> true) // 為每一條數據分配同一個key,將聚合結果發送到一條流中去
        .reduce(new ReduceFunction<Tuple2<String, Long>>() {
          @Override
          public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
           // 將累加器更新為當前最大的pv統計值,然後向下游發送累加器的值
            return value1.f1 > value2.f1 ? value1 : value2;
          }
        })
        .print();
    env.execute();
  }
}

reduce同簡單聚合算子一樣,也要針對每一個key保存狀態。因為狀態不會清空,所以我們需要將reduce算子作用在一個有限key的流上。

9.3.3 用戶自定義函數(UDF)

在前面的介紹我們可以發現,Flink的DataStream API編程風格其實是一致的:基本上都是基於DataStream調用一個方法,表示要做一個轉換操作;方法需要傳入一個參數,這個參數都是需要實現一個接口。很容易發現,這些接口有一個共同特點:全部都以算子操作名稱 + Function命名,例如源算子需要實現SourceFunction接口,map算子需要實現MapFunction接口,reduce算子需要實現ReduceFunction接口。

這些接口又都繼承自Function接口。所以我們不僅可以通過自定義函數類或者匿名類來實現接口,也可以直接傳入Lambda表達式。這就是所謂的用戶自定義函數(user-defined function,UDF)。

接下來我們就對這幾種編程方式做一個梳理總結。

  1. 函數類(Function Classes)

對於大部分操作而言,都需要傳入一個用戶自定義函數(UDF),實現相關操作的接口,來完成處理邏輯的定義。Flink暴露了所有UDF函數的接口,具體實現方式為接口或者抽象類,例如MapFunction、FilterFunction、ReduceFunction等

所以最簡單直接的方式,就是自定義一個函數類,實現對應的接口。之前我們對於API的練習,主要就是基於這種方式。

下面例子實現了FilterFunction接口,用來從用戶的點擊數據中篩選包含「home」的內容:

publicclass TransFunctionUDF {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
DataStream<String> clicks = env.readTextFile("clicks.csv");    
DataStream<String> stream = clicks.filter(new FlinkFilter());
 stream.print();
env.execute();
  }
  publicstaticclass FlinkFilter implements FilterFunction<String> {
    @Override
    public boolean filter(String value) throws Exception {
      return value.contains("home");
    }
  }

當然還可以通過匿名類來實現FilterFunction接口:

DataStream<String> stream = clicks.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
  return value.contains("home");
}
});

為了類可以更加通用,我們還可以將用於過濾的關鍵字"home"抽象出來作為類的屬性,調用構造方法時傳進去。

DataStream<String> clicks = env.readTextFile("clicks.csv");
DataStream<String> stream = clicks.filter(new KeyWordFilter("home"));
publicstaticclass KeyWordFilter implements FilterFunction<String> {
  private String keyWord;
  KeyWordFilter(String keyWord) { this.keyWord = keyWord; }
  @Override
  public boolean filter(String value) throws Exception {
    return value.contains(this.keyWord);
  }
}
  1. 匿名函數(Lambda)

匿名函數(Lambda表達式)是Java 8 引入的新特性,方便我們更加快速清晰地寫代碼。Lambda 表達式允許以簡潔的方式實現函數,以及將函數作為參數來進行傳遞,而不必聲明額外的(匿名)類。

Flink 的所有算子都可以使用 Lambda 表達式的方式來進行編碼,但是,當 Lambda 表達式使用 Java 的泛型時,我們需要顯式的聲明類型信息。

下例演示了如何使用Lambda表達式來實現一個簡單的 map() 函數,我們使用 Lambda 表達式來計算輸入的平方。在這裡,我們不需要聲明 map() 函數的輸入 i 和輸出參數的數據類型,因為 Java 編譯器會對它們做出類型推斷。

publicclass TransFunctionLambda {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    DataStreamSource<Event> clicks = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
    );
    //map函數使用Lambda表達式,不需要進行類型聲明
    DataStream<String> stream = clicks.map(event -> event.url);
    stream.print();
    env.execute();
  }
}

由於 OUT 是 Integer 類型而不是泛型,所以 Flink 可以從函數簽名 OUT map(IN value) 的實現中自動提取出結果的類型信息。

但是對於像 flatMap() 這樣的函數,它的函數簽名 void flatMap(IN value, Collectorout) 被 Java 編譯器編譯成了 void flatMap(IN value, Collector out),也就是說將Collector的泛型信息擦除掉了。這樣 Flink 就無法自動推斷輸出的類型信息了。

同樣地,使用map()時只要涉及到泛型擦除也會有同樣的問題。比如返回Flink自定義的元組類型,在WordCount程序中就遇到了類似的問題。

一般來說,可以顯式地調用.returns()方法,通過明確指定返回類型來解決這個問題:

DataStream<String> stream = clicks
.map(event -> Tuple2.of(event.user, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

當然,也可以用類或者匿名類的方式來解決。

  1. 富函數類(Rich Function Classes)

「富函數類」也是DataStream API提供的一個函數類的接口,所有的Flink函數類都有其Rich版本。富函數類一般是以抽象類的形式出現的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。

與常規函數類的不同主要在於,富函數類可以獲取運行環境的上下文,並擁有一些生命周期方法,所以可以實現更複雜的功能。

Rich Function有生命周期的概念。典型的生命周期方法有:

open()方法,是Rich Function的初始化方法,也就是會開啟一個算子的生命周期。當一個算子的實際工作方法例如map()或者filter()方法被調用之前,open()會首先被調用。所以像文件IO的創建,數據庫連接的創建,配置文件的讀取等等這樣一次性的工作,都適合在open()方法中完成。。
close()方法,是生命周期中的最後一個調用的方法,類似於解構方法。一般用來做一些清理工作。
需要注意的是,這裡的生命周期方法,對於一個並行子任務來說只會調用一次;而對應的,實際工作方法,例如RichMapFunction中的map(),在每條數據到來後都會觸發一次調用。

來看一個例子:

publicclass RichFunctionExample {
public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);
  env
      .fromElements(1,2,3,4)
      .map(new RichMapFunction<Integer, Integer>() {
        @Override
        public void open(Configuration parameters) throws Exception {
          super.open(parameters);
          System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任務的生命周期開始");
        }
        @Override
        public Integer map(Integer integer) throws Exception {
          return integer + 1;
        }
        @Override
        public void close() throws Exception {
          super.close();
          System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任務的生命周期結束");
        }
      })
      .print();
  env.execute();
}
}