9.3.1 Basic conversion operator
- mapping (map)
map is a very familiar big data operation operator. It is mainly used to convert data in the data stream to form a new data stream. To put it simply, it is a "one-to-one mapping". When one element is consumed, one element is produced, as shown in the figure.

We only need to call the map() method based on DataStrema to perform conversion processing. The parameter that needs to be passed in to the method is the implementation of the interface MapFunction; the return value type is still DataStream, but the generic (element type in the stream) may change.
The code below implements the function of extracting the user field in the Event in different ways.
publicclass TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourceEvent stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L)); // Pass in the anonymous class to implement MapFunctionstream.map(new MapFunctionEvent, String() {@Overridepublic String map(Event e) throws Exception {return e.user;}}); // Pass in the MapFunction implementation class stream.map(new UserExtractor()).print(); env.execute();}publicstaticclass UserExtractor implements MapFunctionEvent, String {@Overridepublic String map(Event e) throws Exception {return e.user;}}}In the above code, the generic type of the MapFunction implementation class is related to the input data type and the output data type. When implementing the MapFunction interface, you need to specify two generic types, which are the types of input events and output events. You also need to rewrite a map() method to define the specific logic for converting from one input event to another output event.
- filtering (Filter) filter conversion operation, as the name suggests, performs a filter on the data stream, sets the filter conditions through an Boolean conditional expression, and judges each element in the stream. If it is true, the element will be output normally, if it is false, the element will be filtered out, as shown in the figure. The data type of the new data stream after filter conversion by

is the same as the original data stream. The parameters passed in for filter conversion need to implement the FilterFunction interface, and the filter() method must be implemented in FilterFunction, which is equivalent to a conditional expression returning a Boolean type.
The code below will filter out the browsing behavior of user Mary in the data stream.
publicclass TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); DataStreamSourceEvent stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L)); // Pass in the anonymous class implementation FilterFunctionstream.filter(new FilterFunctionEvent() {@Overridepublic boolean filter(Event e) throws Exception {return e.user.equals("Mary");}}); // Pass in the FilterFunction implementation class stream.filter(new UserFilter()).print();env.execute();}publicstaticclass UserFilter implements FilterFunctionEvent {@Overridepublic boolean filter(Event e) throws Exception {return e.user.equals("Mary");}}}- Flat mapping (flatmap)
The flatMap operation is also called flat mapping. It is mainly used to split the whole in the data stream (usually a collection type) into individual uses. Consuming an element can produce 0 to multiple elements. flatMap can be thought of as a combination of the two-step operations of "flatten" and "map", that is, the data is first broken up and split according to certain rules, and then the split elements are converted, as shown in Figure 5-7. The first step of the word segmentation operation in our previous WordCount program used flatMap.

Like map, flatMap can also use Lambda expression or FlatMapFunction interface implementation class to pass parameters. The return value type depends on the specific logic of the passed parameters, which can be the same as the original data stream or different. The
flatMap operation will be applied to each input event. The flatMap method is defined in the FlatMapFunction interface. Users can override this method, process the input data in this method, and decide whether to return 0, 1, or multiple result data. Therefore, flatMap does not directly define the return value type, but specifies the output through a "Collector". When you want to output results, just call the .collect() method of the collector; this method can be called multiple times or not. Therefore, the flatMap method can also implement the functions of the map method and the filter method. When the return result is 0, it is equivalent to filtering the data. When the return result is 1, it is equivalent to a simple conversion operation on the data. The use of
flatMap is very flexible, and the results can be output in any way. The following is an example:
publicclass TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));stream.flatMap(new MyFlatMap()).print();env.execute();}publicstaticclass MyFlatMap implements FlatMapFunctionEvent, String {@Overridepublic void flatMap(Event value, CollectorString out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} elseif (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.url);}}}}9.3.2 Aggregation operator (Aggregation)
In practical applications, we often need to count or integrate a large amount of data to extract more useful information. For example, in the previous word count program, the frequency of occurrence of each word was superimposed and counted. For this operation, the calculation result not only depends on the current data, but also is related to the previous data. It is equivalent to bringing all the data together for summary and merging - this is the so-called "aggregation", and also corresponds to the reduce operation in MapReduce.
- Key partition (keyBy)
For Flink, DataStream does not have an API for direct aggregation. Because when we aggregate massive data, we must perform partitioned parallel processing in order to improve efficiency. Therefore, in Flink, to do aggregation, you need to partition first; this operation is completed through keyBy.
keyBy is an operator that must be used before aggregation. keyBy can logically divide a stream into different partitions by specifying a key. The partitions mentioned here are actually subtasks of parallel processing, which correspond to task slots.
keyBy is an operator that must be used before aggregation. keyBy can logically divide a stream into different partitions by specifying a key. The partitions mentioned here are actually subtasks of parallel processing, which correspond to task slots.
Based on different keys, the data in the stream will be allocated to different partitions, as shown in the figure; in this way, all data with the same key will be sent to the same partition, and then the next operator operation will be processed in the same slot. Internally,

is implemented by calculating the hash value (hash code) of the key and performing the modulo operation on the number of partitions. So if the key here is POJO, the hashCode() method must be rewritten. The
keyBy() method needs to pass in a parameter, which specifies one or a group of keys.There are many different ways to specify the key: for example, for the Tuple data type, you can specify the position of the field or a combination of multiple positions; for the POJO type, you can specify the name of the field (String); in addition, you can also pass in a Lambda expression or implement a key selector (KeySelector) to explain the logic of extracting keys from the data.
We can use id as the key to perform a partition operation. The code is implemented as follows:
publicclass TransKeyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); DataStreamSourceEvent stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L)); // Use Lambda expression KeyedStreamEvent, String keyedStream = stream.keyBy(e - e.user); // Use anonymous class to implement KeySelectorKeyedStreamEvent, String keyedStream1 = stream.keyBy(new KeySelectorEvent, String() {@Overridepublic String getKey(Event e) throws Exception {return e.user;}});env.execute();}}It should be noted that the result obtained by keyBy will no longer be a DataStream, but will convert the DataStream into a KeyedStream. KeyedStream can be considered as a "partition stream" or "keyed stream". It is an logical partition of the DataStream according to the key, so there are two types of generics: in addition to the element type in the current stream, the key type also needs to be specified.
KeyedStream also inherits from DataStream, so operations based on it also belong to the DataStream API. But it is different from the SingleOutputStreamOperator obtained by the previous conversion operation. It is just a partition operation of the stream, not a conversion operator. KeyedStream is a very important data structure. Only based on it can subsequent aggregation operations (such as sum, reduce) be performed.
- simple aggregation
With the data stream KeyedStream partitioned by key, we can perform aggregation operations based on it. Flink has built-in some of the most basic and simple aggregation APIs for us, mainly including the following:
sum(): On the input stream, perform overlay and sum operations on the specified fields. min(): On the input stream, find the minimum value of the specified field. max(): On the input stream, find the maximum value of the specified field. minBy(): Similar to min(), finds the minimum value for the specified field on the input stream. The difference is that min() only calculates the minimum value of the specified field, and other fields will retain the value of the first data; while minBy() will return the entire data including the minimum value of the field. maxBy(): Similar to max(), it finds the maximum value for the specified field on the input stream. The difference between the two is exactly the same as min()/minBy(). The simple aggregation operator is very easy to use and has very clear semantics. When calling these aggregation methods, parameters also need to be passed in; however, unlike the basic conversion operator, it is not necessary to implement a custom function. It only needs to specify the fields specified by the aggregation. There are two ways to specify a field: specifying a location, and specifying a name.
For tuple type data, you can use these two methods to specify fields. It should be noted that the names of the fields in the tuple are named after f0, f1, f2, ....
If the type of data flow is a POJO class, it can only be specified by field name, not by position.
publicclass TransAggregation { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourceEvent stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L) ); stream.keyBy(e - e.user).max("timestamp"); // Specify the field name env.execute(); }}What is returned by the simple aggregation operator is also a SingleOutputStreamOperator, which is converted from KeyedStream to a regular DataStream.So it can be understood this way: keyBy and aggregation appear in pairs, partition first and then aggregate, and the result is still a DataStream. Moreover, in the data stream after simple aggregation, the data type of the elements remains unchanged.
is an aggregation operator that saves an aggregated value for each key. In Flink, we call it "state" (state). So whenever there is a new data input, the operator will update the saved aggregation result and send an event with the updated aggregate value to the downstream operator. For unbounded streams, these states will never be cleared, so the aggregation operator we use should only be used on data streams containing a limited number of keys.
- Reduction aggregation (reduce)
If simple aggregation is the realization of some specific statistical requirements, then the reduce operator is a generalized aggregation statistical operation. It can perform reduction processing on existing data, and perform an aggregation calculation on each newly input data and the currently reduced value.
Similar to simple aggregation, the reduce operation will also convert KeyedStream to DataStream. It does not change the element data type of the stream, so the output type is the same as the input type. When
calls the reduce method of KeyedStream, it needs to pass in a parameter to implement the ReduceFunction interface. The interface is defined in the source code as follows:
publicinterface ReduceFunctionT extends Function, Serializable {T reduce(T value1, T value2) throws Exception;} The reduce() method needs to be implemented in the ReduceFunction interface. This method receives two input events and outputs an event of the same type after conversion processing; therefore, for a set of data, we can first merge two, then treat the merged result as one data, and then merge it with the following data. Finally, it will be "simplified" into a single piece of data, which is the meaning of "reduce". In the underlying implementation of stream processing, the intermediate "merged result" is actually saved as a state of the task; every time new data comes in, it is further reduced to the previous aggregation state.
We can define a separate function class to implement the ReduceFunction interface, or we can directly pass in an anonymous class. Of course, similar functions can also be achieved by passing in Lambda expressions.
Similar to simple aggregation, the reduce operation will also convert KeyedStream to DataStrema. It does not change the element data type of the stream, so the output type is the same as the input type.
Let’s look at a slightly more complex example.
We partition the data flow according to the user ID, and then use a reduce operator to implement the sum function to count the frequency of visits by each user; then group all statistical results into a group, and use another reduce operator to implement the maxBy function to record the one with the highest access frequency among all users, that is, the user with the most visits currently.
publicclass TransReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// ClickSource() here uses the ClickSource()env.addSource(new in the previous custom data source section) ClickSource())//Convert Event data type to tuple type.map(new MapFunctionEvent, Tuple2String, Long() {@Overridepublic Tuple2String, Long map(Event e) throws Exception {return Tuple2.of(e.user, 1L);}}).keyBy(r - r.f0) //Use user name for offloading.reduce(new ReduceFunctionTuple2String, Long() {@Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {//Every time a piece of data is received, the statistical value of the user's pv is increased by 1 return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).keyBy(r - true) // Assign the same key to each piece of data and send the aggregation result to a stream.reduce(new ReduceFunctionTuple2String, Long() {@Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception { // Update the accumulator to the current maximum pv statistical value, and then send the value of the accumulator downstream return value1.f1 value2.f1 ? value1: value2;}}).print();env.execute();}}reduce, like the simple aggregation operator, also needs to save the state for each key. Because the state will not be cleared, we need to apply the reduce operator to a stream of limited keys.
9.3.3 user-defined function (UDF)
In the previous introduction, we can find that the programming style of Flink's DataStream API is actually the same: basically calling a method based on DataStream, indicating that a conversion operation is to be performed; the method needs to pass in a parameter, and this parameter needs to implement an interface. It is easy to find that these interfaces have a common feature: they are all named after the operator operation name + Function. For example, the source operator needs to implement the SourceFunction interface, the map operator needs to implement the MapFunction interface, and the reduce operator needs to implement the ReduceFunction interface. These interfaces
all inherit from the Function interface. So we can not only implement the interface through custom function classes or anonymous classes, but also directly pass in Lambda expressions. This is what is called a user-defined function (UDF).
Next, we will make a summary of these programming methods.
- Function Classes (Function Classes)
For most operations, a user-defined function (UDF) needs to be passed in to implement the interface of the relevant operation to complete the definition of the processing logic. Flink exposes the interfaces of all UDF functions, and the specific implementation methods are interfaces or abstract classes, such as MapFunction, FilterFunction, ReduceFunction, etc.
So the simplest and most direct way is to customize a function class and implement the corresponding interface. Our previous exercises on API were mainly based on this method.
The following example implements the FilterFunction interface to filter content containing "home" from the user's click data:
publicclass TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamString clicks = env.readTextFile("clicks.csv");DataStreamString stream = clicks.filter(new FlinkFilter()); stream.print();env.execute();}publicstaticclass FlinkFilter implements FilterFunctionString {@Overridepublic boolean filter(String value) throws Exception {return value.contains("home");}}Of course, the FilterFunction interface can also be implemented through anonymous classes:
DataStreamString stream = clicks.filter(new FilterFunctionString() {@Overridepublic boolean filter(String value) throws Exception {return value.contains("home");}});In order to make the class more versatile, we can also abstract the keyword "home" used for filtering as an attribute of the class and pass it in when calling the constructor.
DataStreamString clicks = env.readTextFile("clicks.csv");DataStreamString stream = clicks.filter(new KeyWordFilter("home"));publicstaticclass KeyWordFilter implements FilterFunctionString {private String keyWord;KeyWordFilter(String keyWord) { this.keyWord = keyWord; }@Overridepublic boolean filter(String value) throws Exception {return value.contains(this.keyWord);}}- Anonymous function (Lambda)
Anonymous function (Lambda expression) is a new feature introduced in Java 8, which makes it easier for us to write code more quickly and clearly. Lambda expressions allow functions to be implemented in a concise way and passed as parameters without having to declare additional (anonymous) classes. All operators of
Flink can be encoded using Lambda expressions. However, when Lambda expressions use Java generics, we need to explicitly declare type information.
The following example demonstrates how to use Lambda expressions to implement a simple map() function. We use Lambda expressions to calculate the square of the input. Here, we do not need to declare the data types of the input i and output parameters of the map() function because the Java compiler will make type inference for them.
publicclass TransFunctionLambda {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent clicks = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));//The map function uses Lambda expressions and does not require type declarations DataStreamString stream = clicks.map(event - event.url);stream.print();env.execute();}}Since OUT is an Integer type instead of a generic, Flink can automatically extract the type information of the result from the implementation of the function signature OUT map(IN value).
But for functions like flatMap(), its function signature void flatMap(IN value, Collectorout) is compiled by the Java compiler into void flatMap(IN value, Collector out), which means that the generic information of Collector is erased. In this way, Flink cannot automatically infer the output type information.
Similarly, when using map(), the same problem will occur as long as generic erasure is involved. For example, when returning Flink's custom tuple type, we encountered a similar problem in the WordCount program.
Generally speaking, you can explicitly call the .returns() method to solve this problem by explicitly specifying the return type:
DataStreamString stream = clicks.map(event - Tuple2.of(event.user, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));Of course, it can also be solved using classes or anonymous classes.
- Rich Function Classes
"Rich Function Classes" is also a function class interface provided by the DataStream API. All Flink function classes have their Rich versions. Rich function classes generally appear in the form of abstract classes. For example: RichMapFunction, RichFilterFunction, RichReduceFunction, etc. The main difference between
and conventional function classes is that rich function classes can obtain the context of the running environment and have some life cycle methods, so they can implement more complex functions.
Rich Function has the concept of life cycle. Typical life cycle methods are:
open() method, which is the initialization method of Rich Function, that is, it will start the life cycle of an operator. When an operator's actual working method such as map() or filter() method is called, open() will be called first. Therefore, one-time tasks such as file IO creation, database connection creation, configuration file reading, etc. are all suitable to be completed in the open() method. . The close() method is the last method called in the life cycle, similar to the destructuring method. Generally used to do some cleaning work. It should be noted that the life cycle method here will only be called once for a parallel subtask; correspondingly, the actual working method, such as map() in RichMapFunction, will trigger a call after each piece of data arrives. Let’s take a look at an example:
publicclass RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1,2,3,4).map(new RichMapFunctionInteger, Integer() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("The index is: " + getRuntimeContext().getIndexOfThisSubtask() + " The life cycle of the task begins");}@Overridepublic Integer map(Integer integer) throws Exception {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println("The index is: " + getRuntimeContext().getIndexOfThisSubtask() + "The end of the task's life cycle");}}).print();env.execute();}}