From 099cc783d5ede15626c357547fa8477dd1cfc18b Mon Sep 17 00:00:00 2001 From: LinZhaoguan <17186784453@163.com> Date: Fri, 18 May 2018 17:37:06 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E7=BF=BB=E8=AF=91=E7=AC=AC=E4=B8=80?= =?UTF-8?q?=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 ++++---- docs/dev/api_concepts.md | 11 ++++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index a58f093..e7c883d 100644 --- a/README.md +++ b/README.md @@ -83,19 +83,19 @@ Apache Flink官网:[http://flink.apache.org](http://flink.apache.org/index.htm - [gaving2016](https://github.com/gaving2016) - [397090770](https://github.com/397090770) - [coder-zjh](https://github.com/coder-zjh) - + - [LinZhaoguan](https://github.com/LinZhaoguan) + *翻译* *审校* *项目经理* - [tuhaihe](https://github.com/tuahihe) - [lviiii](https://github.com/lviiii) - + **项目贡献排名:点击[此处](https://github.com/Apache-Flink-Docs-ZH/Apache-Flink-Docs-ZH-translation/graphs/contributors),可以看到各自贡献排名,加油~** +--- - --- - ## 支持社区 感谢下面技术社区对本翻译项目的支持(排名不分先后): diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index b3618aa..6f4c106 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -1,5 +1,5 @@ --- -title: "Basic API Concepts" +title: "基础的API概念" nav-parent_id: dev nav-pos: 1 nav-show_overview: true @@ -32,6 +32,8 @@ collections). Results are returned via sinks, which may for example write the da Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines. +Flink是实现分布式数据集合转换的通用程序(例如过滤、映射、更新状态、连结、分组、定义窗口、聚合等)。数据集合最先是从源(例如文件、kafka主题或者本地内存数据集合)创建。经过接收器之后得到结果,例如可以把数据写入到(分布式)文件中,或者执行标准输出(例如,命令行终端)。Flink程序支持各种运行方式,可以独立运行,也可以嵌入到其他程序中。可以在本地JVM或设备集群上执行。 + Depending on the type of data sources, i.e. bounded or unbounded sources, you would either write a batch program or a streaming program where the DataSet API is used for batch and the DataStream API is used for streaming. This guide will introduce the basic concepts @@ -40,6 +42,9 @@ that are common to both APIs but please see our [Batch Guide]({{ site.baseurl }}/dev/batch/index.html) for concrete information about writing programs with each API. +根据数据源的类型,即有界或无界的数据源,你可以使用DataSet API编写批处理程序或者DataStream API编写流处理程序。本篇将会介绍这两种API共同拥有的基本概念,关于每一种编写程序的具体信息请参阅我们的[Streaming Guide]({{ site.baseurl }}/dev/datastream_api.html) and +[Batch Guide]({{ site.baseurl }}/dev/batch/index.html)。 + **NOTE:** When showing actual examples of how the APIs can be used we will use `StreamingExecutionEnvironment` and the `DataStream` API. The concepts are exactly the same in the `DataSet` API, just replace by `ExecutionEnvironment` and `DataSet`. @@ -707,9 +712,9 @@ public class WordWithCount { public String word; public int count; - + public WordWithCount() {} - + public WordWithCount(String word, int count) { this.word = word; this.count = count; From 03765a74974703f3a6f4ff4917e6cf807a6a0922 Mon Sep 17 00:00:00 2001 From: LinZhaoguan <17186784453@163.com> Date: Fri, 25 May 2018 10:02:26 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E7=BF=BB=E8=AF=91=E5=9F=BA=E6=9C=AC?= =?UTF-8?q?=E6=A6=82=E5=BF=B5=E7=AB=A0=E8=8A=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/dev/api_concepts.md | 485 +++++++++++++-------------------------- 1 file changed, 155 insertions(+), 330 deletions(-) diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index 6f4c106..9a3c860 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -24,73 +24,45 @@ specific language governing permissions and limitations under the License. --> -Flink programs are regular programs that implement transformations on distributed collections -(e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating). -Collections are initially created from sources (e.g., by reading from files, kafka topics, or from local, in-memory -collections). Results are returned via sinks, which may for example write the data to -(distributed) files, or to standard output (for example, the command line terminal). -Flink programs run in a variety of contexts, standalone, or embedded in other programs. -The execution can happen in a local JVM, or on clusters of many machines. - Flink是实现分布式数据集合转换的通用程序(例如过滤、映射、更新状态、连结、分组、定义窗口、聚合等)。数据集合最先是从源(例如文件、kafka主题或者本地内存数据集合)创建。经过接收器之后得到结果,例如可以把数据写入到(分布式)文件中,或者执行标准输出(例如,命令行终端)。Flink程序支持各种运行方式,可以独立运行,也可以嵌入到其他程序中。可以在本地JVM或设备集群上执行。 -Depending on the type of data sources, i.e. bounded or unbounded sources, you would either -write a batch program or a streaming program where the DataSet API is used for batch -and the DataStream API is used for streaming. This guide will introduce the basic concepts -that are common to both APIs but please see our -[Streaming Guide]({{ site.baseurl }}/dev/datastream_api.html) and -[Batch Guide]({{ site.baseurl }}/dev/batch/index.html) for concrete information about -writing programs with each API. - -根据数据源的类型,即有界或无界的数据源,你可以使用DataSet API编写批处理程序或者DataStream API编写流处理程序。本篇将会介绍这两种API共同拥有的基本概念,关于每一种编写程序的具体信息请参阅我们的[Streaming Guide]({{ site.baseurl }}/dev/datastream_api.html) and -[Batch Guide]({{ site.baseurl }}/dev/batch/index.html)。 +根据数据源的类型,即有界或无界的数据源,你可以使用DataSet API编写批处理程序或者DataStream API编写流处理程序。本篇将会介绍这两种API共有的基本概念,但如果你想获取每种API具体的编写教程,请参阅我们的[流处理教程]({{ site.baseurl }}/dev/datastream_api.html) 和[批处理教程]({{ site.baseurl }}/dev/batch/index.html)。 **NOTE:** When showing actual examples of how the APIs can be used we will use `StreamingExecutionEnvironment` and the `DataStream` API. The concepts are exactly the same in the `DataSet` API, just replace by `ExecutionEnvironment` and `DataSet`. +**注意:**当我们在实际的例子中展示各个API如何使用事,我们将会使用`StreamingExecutionEnvironment` 和`DataStream` API。这和使用`DataSet` API很相似,直接替换成`ExecutionEnvironment` 和`DataSet`即可。 + * This will be replaced by the TOC {:toc} -DataSet and DataStream +批量数据和 流式数据 ---------------------- -Flink has the special classes `DataSet` and `DataStream` to represent data in a program. You -can think of them as immutable collections of data that can contain duplicates. In the case -of `DataSet` the data is finite while for a `DataStream` the number of elements can be unbounded. +Flink 有特定的`DataSet` 和`DataStream` 类来表示程序中的数据。你可以把它们视为能包含重复项的不可变数据集合。在这种情况下`DataSet` 的数据是有限的, 而对于`DataStream` ,元素的数量可以试无限的。 -These collections differ from regular Java collections in some key ways. First, they -are immutable, meaning that once they are created you cannot add or remove elements. You can also -not simply inspect the elements inside. +这些集合在一些关键方面与Java集合不同,首先,它们是不可变的,这就说明一旦它们被创建了,就不再允许你新增或删除元素。同时,你不能简单的检查里面的元素。 -A collection is initially created by adding a source in a Flink program and new collections are -derived from these by transforming them using API methods such as `map`, `filter` and so on. +一个集合最初是通过在Flink程序里添加一个源来创建的,通过使用API方法(如`map`,`filter` 等)把它们进行转换,从而派生出新的集合。 -Anatomy of a Flink Program +Flink程序剖析 -------------------------- -Flink program programs look like regular programs that transform collections of data. -Each program consists of the same basic parts: - -1. Obtain an `execution environment`, -2. Load/create the initial data, -3. Specify transformations on this data, -4. Specify where to put the results of your computations, -5. Trigger the program execution +Flink程序编码看起来就像常规的数据集合转换编码。每个程序由相同的基本部分组成: +1. 获取一个运行环境`execution environment`, +2. 加载/创建初始数据, +3. 指定这些数据的转换, +4. 指定放置计算结果的位置, +5. 触发程序执行
+现在我们将概述其中的每一个步骤,请参阅各个章节以获取更多详细信息。请注意,Java DataSet API的所有核心类可在{% gh_link /flink-java/src/main/java/org/apache/flink/api/java "org.apache.flink.api.java" %}包中找到 ,而Java DataStream API的类可在 {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api "org.apache.flink.streaming.api" %}中找到。 -We will now give an overview of each of those steps, please refer to the respective sections for -more details. Note that all core classes of the Java DataSet API are found in the package -{% gh_link /flink-java/src/main/java/org/apache/flink/api/java "org.apache.flink.api.java" %} -while the classes of the Java DataStream API can be found in -{% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api "org.apache.flink.streaming.api" %}. - -The `StreamExecutionEnvironment` is the basis for all Flink programs. You can -obtain one using these static methods on `StreamExecutionEnvironment`: +`StreamExecutionEnvironment` 是所有Flink程序的基础。你可以使用以下静态方法获得一个`StreamExecutionEnvironment` : {% highlight java %} getExecutionEnvironment() @@ -100,19 +72,9 @@ createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles) {% endhighlight %} -Typically, you only need to use `getExecutionEnvironment()`, since this -will do the right thing depending on the context: if you are executing -your program inside an IDE or as a regular Java program it will create -a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the -[command line]({{ site.baseurl }}/setup/cli.html), the Flink cluster manager -will execute your main method and `getExecutionEnvironment()` will return -an execution environment for executing your program on a cluster. +通常情况下,您只需要使用`getExecutionEnvironment()`方法,因为它将会根据上下文做正确的事情:如果您在IDE内执行程序或作为常规Java程序执行,它将创建一个本地环境,以在本地计算机上执行您的程序。如果您从程序创建JAR文件并通过[命令行]({{ site.baseurl }}/setup/cli.html)调用它 ,则Flink集群管理器将执行您的main方法,`getExecutionEnvironment()`将会返回在集群下执行程序的运行环境。 -For specifying data sources the execution environment has several methods -to read from files using various methods: you can just read them line by line, -as CSV files, or using completely custom data input formats. To just read -a text file as a sequence of lines, you can use: +为了指定数据源,运行环境有多种方法可以使用各种方法来通过不同的方式读取文件:你可以逐行读取它们,以CSV文件的形式读取它们,或使用完全自定义的数据输入格式。要仅将文本文件作为一系列行读取,可以使用: {% highlight java %} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -120,11 +82,9 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn DataStream text = env.readTextFile("file:///path/to/file"); {% endhighlight %} -This will give you a DataStream on which you can then apply transformations to create new -derived DataStreams. +这将为您提供一个DataStream,然后您可以将其转换来创建新的派生的DataStream。 -You apply transformations by calling methods on DataStream with a transformation -functions. For example, a map transformation looks like this: +您可以通过调用DataStream的转换函数来进行转换,例如,一个map转换如下所示: {% highlight java %} DataStream input = ...; @@ -137,11 +97,9 @@ DataStream parsed = input.map(new MapFunction() { }); {% endhighlight %} -This will create a new DataStream by converting every String in the original -collection to an Integer. +这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。 -Once you have a DataStream containing your final results, you can write it to an outside system -by creating a sink. These are just some example methods for creating a sink: +一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法: {% highlight java %} writeAsText(String path) @@ -152,14 +110,9 @@ print()
-We will now give an overview of each of those steps, please refer to the respective sections for -more details. Note that all core classes of the Scala DataSet API are found in the package -{% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala "org.apache.flink.api.scala" %} -while the classes of the Scala DataStream API can be found in -{% gh_link /flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala "org.apache.flink.streaming.api.scala" %}. +现在我们将概述其中的每一个步骤,请参阅各个章节以获取更多详细信息。请注意,Scala DataSet API的所有核心类都可以在{% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala "org.apache.flink.api.scala" %}包中找到, 而Scala DataStream API的类可以在{% gh_link /flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala "org.apache.flink.streaming.api.scala" %}中找到。 -The `StreamExecutionEnvironment` is the basis for all Flink programs. You can -obtain one using these static methods on `StreamExecutionEnvironment`: +`StreamExecutionEnvironment` 是所有Flink程序的基础,您可以使用以下静态方法获得一个`StreamExecutionEnvironment`: {% highlight scala %} getExecutionEnvironment() @@ -169,19 +122,9 @@ createLocalEnvironment() createRemoteEnvironment(host: String, port: Int, jarFiles: String*) {% endhighlight %} -Typically, you only need to use `getExecutionEnvironment()`, since this -will do the right thing depending on the context: if you are executing -your program inside an IDE or as a regular Java program it will create -a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the -[command line]({{ site.baseurl }}/apis/cli.html), the Flink cluster manager -will execute your main method and `getExecutionEnvironment()` will return -an execution environment for executing your program on a cluster. +通常情况下,您只需要使用`getExecutionEnvironment()`方法,因为它将会根据上下文做正确的事情:如果您在IDE内执行程序或作为常规Java程序执行,它将创建一个本地环境,以在本地计算机上执行您的程序。如果您从程序创建JAR文件并通过[命令行]({{ site.baseurl }}/setup/cli.html)调用它 ,则Flink集群管理器将执行您的main方法,`getExecutionEnvironment()`将会返回在集群下执行程序的运行环境。 -For specifying data sources the execution environment has several methods -to read from files using various methods: you can just read them line by line, -as CSV files, or using completely custom data input formats. To just read -a text file as a sequence of lines, you can use: +为了指定数据源,运行环境有多种方法可以使用各种方法来通过不同的方式读取文件:你可以逐行读取它们,以CSV文件的形式读取它们,或使用完全自定义的数据输入格式。要仅将文本文件作为一系列行读取,可以使用: {% highlight scala %} val env = StreamExecutionEnvironment.getExecutionEnvironment() @@ -189,11 +132,9 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val text: DataStream[String] = env.readTextFile("file:///path/to/file") {% endhighlight %} -This will give you a DataStream on which you can then apply transformations to create new -derived DataStreams. +这将为您提供一个DataStream,然后您可以将其转换来创建新的派生的DataStream。 -You apply transformations by calling methods on DataSet with a transformation -functions. For example, a map transformation looks like this: +您可以通过调用DataStream的转换函数来进行转换,例如,一个map转换如下所示: {% highlight scala %} val input: DataSet[String] = ... @@ -201,11 +142,9 @@ val input: DataSet[String] = ... val mapped = input.map { x => x.toInt } {% endhighlight %} -This will create a new DataStream by converting every String in the original -collection to an Integer. +这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。 -Once you have a DataStream containing your final results, you can write it to an outside system -by creating a sink. These are just some example methods for creating a sink: +一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法: {% highlight scala %} writeAsText(path: String) @@ -216,48 +155,34 @@ print()
-Once you specified the complete program you need to **trigger the program execution** by calling -`execute()` on the `StreamExecutionEnvironment`. -Depending on the type of the `ExecutionEnvironment` the execution will be triggered on your local -machine or submit your program for execution on a cluster. +当您按照上面几步写好完整的程序,您现在需要调用`StreamExecutionEnvironment`的`execute()`方法来触发程序执行。根据`ExecutionEnvironment` 的类型,程序将会在您的本地设备或被提交到集群运行环境中触发执行。 -The `execute()` method is returning a `JobExecutionResult`, this contains execution -times and accumulator results. +该`execute()`方法返回一个`JobExecutionResult`,它包含执行时间和累加器的结果。 -Please see the [Streaming Guide]({{ site.baseurl }}/dev/datastream_api.html) -for information about streaming data sources and sink and for more in-depths information -about the supported transformations on DataStream. +请参阅[流数据处理指南]({{ site.baseurl }}/dev/datastream_api.html)获取关于流数据源和接收器的信息以及更深入的关于DataStream支持的转换等信息。 -Check out the [Batch Guide]({{ site.baseurl }}/dev/batch/index.html) -for information about batch data sources and sink and for more in-depths information -about the supported transformations on DataSet. +请参阅[批数据处理指南]({{ site.baseurl }}/dev/batch/index.html)获取关于批数据源和接收器的信息以及更深入的关于DataSet支持的转换等信息。 {% top %} -Lazy Evaluation +惰性算法 --------------- -All Flink programs are executed lazily: When the program's main method is executed, the data loading -and transformations do not happen directly. Rather, each operation is created and added to the -program's plan. The operations are actually executed when the execution is explicitly triggered by -an `execute()` call on the execution environment. Whether the program is executed locally -or on a cluster depends on the type of execution environment +所有Flink程序都会被惰性执行:当程序的main方法被执行时,数据加载和转换不会直接发生。相反,每个操作都会创建并添加到程序的计划中。当运行环境调用`execute()`方法时才会触发执行实际操作。程序是在本地执行还是在群集上执行取决于运行环境的类型。 -The lazy evaluation lets you construct sophisticated programs that Flink executes as one -holistically planned unit. +惰性算法让您可以构建Flink作为整体计划单元执行的复杂程序。 {% top %} -Specifying Keys +指定键 --------------- -Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on -a collection of elements. Other transformations (Reduce, GroupReduce, -Aggregate, Windows) allow data being grouped on a key before they are -applied. +一些转换(join, coGroup, keyBy, groupBy)需要定义键的集合。其他转换(Reduce, GroupReduce, +Aggregate, Windows)允许数据在应用之前被一个键分组。 + +如下所示一个DataSet被分组 -A DataSet is grouped as {% highlight java %} DataSet<...> input = // [...] DataSet<...> reduced = input @@ -265,7 +190,8 @@ DataSet<...> reduced = input .reduceGroup(/*do something*/); {% endhighlight %} -while a key can be specified on a DataStream using +而使用DataStream时可以指定一个键 + {% highlight java %} DataStream<...> input = // [...] DataStream<...> windowed = input @@ -273,19 +199,14 @@ DataStream<...> windowed = input .window(/*window specification*/); {% endhighlight %} -The data model of Flink is not based on key-value pairs. Therefore, -you do not need to physically pack the data set types into keys and -values. Keys are "virtual": they are defined as functions over the -actual data to guide the grouping operator. +Flink的数据模型不基于键值对。因此,您不需要将数据集类型物理地打包成键和值。这里keys分组的概念是“虚拟的”:它们被定义为实际数据之上的函数来进行分组操作。 -**NOTE:** In the following discussion we will use the `DataStream` API and `keyBy`. -For the DataSet API you just have to replace by `DataSet` and `groupBy`. +**注意:**在接下来的讨论中我们将会使用`DataStream` API和`keyBy`。如果想使用DataSet API请您直接替换为 `DataSet` 和`groupBy`即可。 -### Define keys for Tuples +### 定义元组的键 {:.no_toc} -The simplest case is grouping Tuples on one or more -fields of the Tuple: +最简单的情况是根据元组的一个或多个字段上将元组进行分组:
@@ -302,8 +223,7 @@ val keyed = input.keyBy(0)
-The tuples are grouped on the first field (the one of -Integer type). +元组根据第一个字段(Integer类型的一个字段)进行分组。
@@ -320,28 +240,28 @@ val grouped = input.groupBy(0,1)
-Here, we group the tuples on a composite key consisting of the first and the -second field. +在这里,我们使用元组的第一个和第二个字段组成的组合键将元组进行分组 -A note on nested Tuples: If you have a DataStream with a nested tuple, such as: +关于嵌套元组的说明:如果您有一个嵌套元组的DataStream,例如: {% highlight java %} DataStream,String,Long>> ds; {% endhighlight %} -Specifying `keyBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use field expression keys which are explained below. +指定`keyBy(0)`将导致系统将整个`Tuple2`用作键(以Integer和Float为键)。如果你想“导航”到嵌套的`Tuple2`里,你必须使用下面介绍的字段表达式来定义键。 -### Define keys using Field Expressions +### 使用字段表达式定义键 {:.no_toc} -You can use String-based field expressions to reference nested fields and define keys for grouping, sorting, joining, or coGrouping. +您可以使用基于字符串的字段表达式来引用嵌套字段并定义用于分组,排序,连接或共同组的键。 -Field expressions make it very easy to select fields in (nested) composite types such as [Tuple](#tuples-and-case-classes) and [POJO](#pojos) types. +字段表达式使我们可以很容易的选择(嵌套)复杂类型的字段,例如[元组](#tuples-and-case-classes)和[Java对象](#pojos)
-In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `keyBy()` function. +下面的例子中,我们有一个对象`WC`,它有 "word" 和"count"两个字段,如果根据字段`word`进行分组,我们可以直接把它的名字传入`keyBy()`函数中。 + {% highlight java %} // some ordinary POJO (Plain old Java Object) public class WC { @@ -352,17 +272,14 @@ DataStream words = // [...] DataStream wordCounts = words.keyBy("word").window(/*window specification*/); {% endhighlight %} -**Field Expression Syntax**: - -- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type. - -- Select Tuple fields by their field name or 0-offset field index. For example `"f0"` and `"5"` refer to the first and sixth field of a Java Tuple type, respectively. - -- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"f1.user.zip"` or `"user.f3.1.zip"`. +**字段表达式分析:** -- You can select the full type using the `"*"` wildcard expressions. This does also work for types which are not Tuple or POJO types. +- 按字段名称选择POJO字段。例如,`"user"`指的是POJO类型的“用户”字段。 +- 按字段名称或0偏移量字段索引选择元组字段。例如`"f0"`和`"5"`参考Java Tuple类型的第一个和第六个字段。 +- 您可以选择POJO和元组中的嵌套字段。例如,`"user.zip"`指的是存储在POJO类型的“用户”中的“zip”字段。支持POJO和元组的任意嵌套和混合,例如`"f1.user.zip"`或`"user.f3.1.zip"`。 +- 您可以使用`"*"`通配符表达式来选择完整类型。这也适用于不是Tuple或POJO类型的类型。 -**Field Expression Example**: +**字段表达式示例:** {% highlight java %} public static class WC { @@ -384,21 +301,17 @@ public static class ComplexNestedClass { } {% endhighlight %} -These are valid field expressions for the example code above: - -- `"count"`: The count field in the `WC` class. - -- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`. - -- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`. +这些是以上示例代码的有效字段表达式: -- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type. +- `"count"`:`WC`类中的count 字段。 +- `"complex"`:递归地选择POJO类型ComplexNestedClass里的所有字段。 +- `"complex.word.f2":`选择嵌套的 `Tuple3`类型的最后一个字段。 +- `"complex.hadoopCitizen"`:选择Hadoop `IntWritable`类型。
-In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `keyBy()` function. -{% highlight java %} +下面的例子中,我们有一个对象`WC`,它有 "word" 和"count"两个字段,如果根据字段`word`进行分组,我们可以直接把它的名字传入`keyBy()`函数中。{% highlight java %} // some ordinary POJO (Plain old Java Object) class WC(var word: String, var count: Int) { def this() { this("", 0L) } @@ -412,17 +325,13 @@ val words: DataStream[WC] = // [...] val wordCounts = words.keyBy("word").window(/*window specification*/) {% endhighlight %} -**Field Expression Syntax**: - -- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type. - -- Select Tuple fields by their 1-offset field name or 0-offset field index. For example `"_1"` and `"5"` refer to the first and sixth field of a Scala Tuple type, respectively. - -- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"_2.user.zip"` or `"user._4.1.zip"`. +**字段表达式分析:** -- You can select the full type using the `"_"` wildcard expressions. This does also work for types which are not Tuple or POJO types. +- 按字段名称选择POJO字段。例如,`"user"`指的是POJO类型的“用户”字段。 +- 按1开始递增的字段名称或按0开始递增的字段索引来选择元组字段。例如,`"_1"`和`"5"`分别指定的是Scala Tuple类型的第一个和第六个字段。 +- 您可以选择POJO和元组中的嵌套字段。例如,`"user.zip"`指定为一个POJO的"zip" 字段,而这个POJO作为"user"字段存储在另一个POJO。支持POJO和元组的任意嵌套和混合,例如`"_2.user.zip"` o或`"user._4.1.zip"。 -**Field Expression Example**: +**字段表达式示例:** {% highlight scala %} class WC(var complex: ComplexNestedClass, var count: Int) { @@ -438,26 +347,22 @@ class ComplexNestedClass( } {% endhighlight %} -These are valid field expressions for the example code above: - -- `"count"`: The count field in the `WC` class. - -- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`. - -- `"complex.word._3"`: Selects the last field of the nested `Tuple3`. +这些是以上示例代码的有效字段表达式: -- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type. +- `"count"`:`WC`类中的count字段。 +- `"complex"`:递归地选择POJO类型ComplexNestedClass里的所有字段。 +- `"complex.word._3":`选择嵌套的 `Tuple3`类型的最后一个字段。 +- `"complex.hadoopCitizen"`:选择Hadoop `IntWritable`类型。
-### Define keys using Key Selector Functions +### 使用键选择器方法来定义键 {:.no_toc} -An additional way to define keys are "key selector" functions. A key selector function -takes a single element as input and returns the key for the element. The key can be of any type and be derived from arbitrary computations. +另一种定义键的方法是“键选择器”功能。键选择器函数将单个元素作为输入并返回该元素的键。键可以是任何类型,并且可以从任意计算中派生。 -The following example shows a key selector function that simply returns the field of an object: +下面的例子展示了一个简单的返回对象字段的键选择器函数:
@@ -484,18 +389,17 @@ val keyed = words.keyBy( _.word ) {% top %} -Specifying Transformation Functions +指定转换函数 -------------------------- -Most transformations require user-defined functions. This section lists different ways -of how they can be specified +大多数转换需要用户定义方法。本节列出了如何使用不同的方式来指定它们。
-#### Implementing an interface +#### 实现一个接口 -The most basic way is to implement one of the provided interfaces: +最基本的方法是实现提供的接口之一: {% highlight java %} class MyMapFunction implements MapFunction { @@ -504,18 +408,18 @@ class MyMapFunction implements MapFunction { data.map(new MyMapFunction()); {% endhighlight %} -#### Anonymous classes +#### 匿名类 -You can pass a function as an anonymous class: +您可以将一个函数作为匿名类传递: {% highlight java %} data.map(new MapFunction () { public Integer map(String value) { return Integer.parseInt(value); } }); {% endhighlight %} -#### Java 8 Lambdas +#### Java 8 Lambda表达式 -Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide]({{ site.baseurl }}/dev/java8.html). +Flink还支持Java API中的Java 8 Lambda表达式.请参阅完整的[Java 8指南]({{ site.baseurl }}/dev/java8.html) {% highlight java %} data.filter(s -> s.startsWith("http://")); @@ -527,8 +431,7 @@ data.reduce((i1,i2) -> i1 + i2); #### Rich functions -All transformations that require a user-defined function can -instead take as argument a *rich* function. For example, instead of +所有需要用户定义函数的转换都可以被*rich* function取代。例如,替换如下方法 {% highlight java %} class MyMapFunction implements MapFunction { @@ -536,7 +439,7 @@ class MyMapFunction implements MapFunction { }); {% endhighlight %} -you can write +你可以写为 {% highlight java %} class MyMapFunction extends RichMapFunction { @@ -544,13 +447,14 @@ class MyMapFunction extends RichMapFunction { }); {% endhighlight %} -and pass the function as usual to a `map` transformation: +并像往常一样将函数传递给一个`map`转换函数: {% highlight java %} data.map(new MyMapFunction()); {% endhighlight %} -Rich functions can also be defined as an anonymous class: +Rich functions也可以定义为匿名类 : + {% highlight java %} data.map (new RichMapFunction() { public Integer map(String value) { return Integer.parseInt(value); } @@ -563,8 +467,8 @@ data.map (new RichMapFunction() { #### Lambda Functions -As already seen in previous examples all operations accept lambda functions for describing -the operation: +正如在前面的例子中已经看到的那样,所有操作都接受lambda函数来描述操作: + {% highlight scala %} val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") } @@ -579,14 +483,13 @@ data.reduce { _ + _ } #### Rich functions -All transformations that take as argument a lambda function can -instead take as argument a *rich* function. For example, instead of +所有lambda函数的转换都可以被*rich* function取代。例如,替换如下方法 {% highlight scala %} data.map { x => x.toInt } {% endhighlight %} -you can write +你可以这样写 {% highlight scala %} class MyMapFunction extends RichMapFunction[String, Int] { @@ -594,13 +497,14 @@ class MyMapFunction extends RichMapFunction[String, Int] { }) {% endhighlight %} -and pass the function to a `map` transformation: +并将函数传递给一个`map`转换: {% highlight scala %} data.map(new MyMapFunction()) {% endhighlight %} -Rich functions can also be defined as an anonymous class: +Rich functions也可以定义为匿名类: + {% highlight scala %} data.map (new RichMapFunction[String, Int] { def map(in: String):Int = { in.toInt } @@ -610,46 +514,31 @@ data.map (new RichMapFunction[String, Int] {
-Rich functions provide, in addition to the user-defined function (map, -reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and -`setRuntimeContext`. These are useful for parameterizing the function -(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)), -creating and finalizing local state, accessing broadcast variables (see -[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime -information such as accumulators and counters (see -[Accumulators and Counters](#accumulators--counters)), and information -on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)). +Rich functions的提供,除了用户定义的函数(map.reduce等),还有四个方法:`open`, `close`, `getRuntimeContext`和`setRuntimeContext`。这些对参数化函数(请参阅[参数传递给函数]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions) ),创建和终止本地状态,访问广播变量 (请参阅[广播变量]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)),以及访问运行时信息(如累加器和计数器)(请参阅[计数器和累加器](#accumulators--counters))以及迭代(请参阅[迭代]({{ site.baseurl }}/dev/batch/iterations.html)) 都很有用 。 {% top %} -Supported Data Types +支持的数据类型 -------------------- -Flink places some restrictions on the type of elements that can be in a DataSet or DataStream. -The reason for this is that the system analyzes the types to determine -efficient execution strategies. +Flink对可以存在于DataSet或DataStream中的元素的类型有一些限制。原因是系统需要分析类型以确定有效的执行策略。 -There are six different categories of data types: +有六种不同类型的数据类型: -1. **Java Tuples** and **Scala Case Classes** +1. **Java 元组** 和**Scala 样本类** 2. **Java POJOs** -3. **Primitive Types** -4. **Regular Classes** -5. **Values** +3. **原始类型** +4. **常规类型** +5. **值** 6. **Hadoop Writables** -7. **Special Types** +7. **特殊类型** #### Tuples and Case Classes
-Tuples are composite types that contain a fixed number of fields with various types. -The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple -can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a -tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic getter method -`tuple.getField(int position)`. The field indices start at 0. Note that this stands in contrast -to the Scala tuples, but it is more consistent with Java's general indexing. +元组是包含固定数量的各种类型的字段的复合类型。Java API提供从`Tuple1`到`Tuple25`的类。。元组的每个字段都可以是包含更多元组的任意Flink类型,从而生成嵌套元组。可以直接使用该字段的名称`tuple.f4`或使用泛型getter方法 来访问元组的字段`tuple.getField(int position)`。字段索引从0开始。请注意,这与Scala元组形成对比,但它与Java的一般索引更加一致。 {% highlight java %} DataStream> wordCounts = env.fromElements( @@ -671,7 +560,7 @@ wordCounts.keyBy(0); // also valid .keyBy("f0")
-Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as `_1` for the first field. Case class fields are accessed by their name. +Scala样本类(以及作为样本类的特例的Scala元组)是包含固定数量的各种类型字段的复合类型。元组字段通过它们的偏移量名称寻址,例如`_1`第一个字段。大小写类字段按名称访问。 {% highlight scala %} case class WordCount(word: String, count: Int) @@ -689,21 +578,18 @@ input2.keyBy(0, 1) // key by field positions 0 and 1
-#### POJOs - -Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements: - -- The class must be public. - -- It must have a public constructor without arguments (default constructor). +#### POJOS -- All fields are either public or must be accessible through getter and setter functions. For a field called `foo` the getter and setter methods must be named `getFoo()` and `setFoo()`. +如果满足以下要求,Java和Scala类将被Flink视为特殊的POJO数据类型: -- The type of a field must be supported by Flink. At the moment, Flink uses [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`). +- 类必须是公开的。 +- 它必须有一个没有参数的公开的构造函数(默认构造函数)。 +- 所有字段都是公开的,或者必须通过getter和setter函数访问。对于称为`foo`getter和setter方法的字段必须命名`getFoo()`和`setFoo()`。 +- Flink必须支持字段的类型。目前,Flink使用[Avro](http://avro.apache.org)序列化任意对象(如`Date`)。 -Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types. +Flink分析POJO类型的结构,即它了解POJO的字段。因此POJO类型比一般类型更易于使用。而且,Flink可以比普通类型更有效地处理POJO。 -The following example shows a simple POJO with two public fields. +以下示例显示了一个包含两个公用字段的简单POJO。
@@ -747,157 +633,96 @@ input.keyBy("word")// key by field expression "word"
-#### Primitive Types +#### 原始类型 -Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`. +Flink支持所有Java和Scala的原始类型,如`Integer`,`String`和`Double`。 -#### General Class Types +#### 基本类型 -Flink supports most Java and Scala classes (API and custom). -Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native -resources. Classes that follow the Java Beans conventions work well in general. +Flink支持大多数Java和Scala类(API和自定义)。但是限制包含无法序列化的字段的类,如文件指针,I / O流或其他本地资源。遵循Java Beans约定的类一般运行良好。 -All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types. -Flink treats these data types as black boxes and is not able to access their content (i.e., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo). +所有未被识别为POJO类型的类(见上面的POJO要求)都由Flink作为一般类类型来处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即进行高效排序)。常规类型使用序列化框架[Kryo](https://github.com/EsotericSoftware/kryo)进行反序列化。 -#### Values +#### 值 -*Value* types describe their serialization and deserialization manually. Instead of going through a -general purpose serialization framework, they provide custom code for those operations by means of -implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using -a Value type is reasonable when general purpose serialization would be highly inefficient. An -example would be a data type that implements a sparse vector of elements as an array. Knowing that -the array is mostly zero, one can use a special encoding for the non-zero elements, while the -general purpose serialization would simply write all array elements. +*值*类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过实现`org.apache.flinktypes.Value`接口的`read`和`write`方法来为这些操作提供自定义代码。当通用序列化非常低效时,使用值类型是合理的。例如,一个数据类型,它实现一个稀疏的元素向量作为数组。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化将简单地写入所有数组元素。 -The `org.apache.flinktypes.CopyableValue` interface supports manual internal cloning logic in a -similar way. +该`org.apache.flinktypes.CopyableValue`接口以类似的方式支持手动内部克隆逻辑。 -Flink comes with pre-defined Value types that correspond to basic data types. (`ByteValue`, -`ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, `StringValue`, `CharValue`, -`BooleanValue`). These Value types act as mutable variants of the basic data types: Their value can -be altered, allowing programmers to reuse objects and take pressure off the garbage collector. +Flink带有与基本数据类型相对应的预定义值类型。(`ByteValue`,`ShortValue`,`IntValue`,`LongValue`,`FloatValue`,`DoubleValue`,`StringValue`,`CharValue`, `BooleanValue`)。这些Value类型作为基本数据类型的可变变体:它们的值可以被改变,允许程序员重用对象并从垃圾收集器中释放压力。 #### Hadoop Writables -You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic -defined in the `write()`and `readFields()` methods will be used for serialization. +您可以使用实现`org.apache.hadoop.Writable`接口的类型。在`write()`and `readFields()`方法中定义的序列化逻辑将用于序列化。 #### Special Types -You can use special types, including Scala's `Either`, `Option`, and `Try`. -The Java API has its own custom implementation of `Either`. -Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or *Right*. -`Either` can be useful for error handling or operators that need to output two different types of records. +您可以使用特殊类型,包括Scala的`Either`,`Option`和`Try`。Java API有它自己的自定义实现`Either`。与Scala的`Either`类似,它代表了*Left*或*Right*两种可能类型的值。 `Either`可用于错误处理或需要输出两种不同类型记录的操作。 -#### Type Erasure & Type Inference +#### 类型擦除和类型推断 -*Note: This Section is only relevant for Java.* +*注意:本节仅与Java相关。* -The Java compiler throws away much of the generic type information after compilation. This is -known as *type erasure* in Java. It means that at runtime, an instance of an object does not know -its generic type any more. For example, instances of `DataStream` and `DataStream` look the -same to the JVM. +Java编译器在编译后会抛弃大部分的泛型类型信息。这被称为Java中的*类型擦除*。这意味着在运行时,对象的一个实例不再知道它的泛型类型。例如,JVM的实例`DataStream`和`DataStream`看起来相同。 -Flink requires type information at the time when it prepares the program for execution (when the -main method of the program is called). The Flink Java API tries to reconstruct the type information -that was thrown away in various ways and store it explicitly in the data sets and operators. You can -retrieve the type via `DataStream.getType()`. The method returns an instance of `TypeInformation`, -which is Flink's internal way of representing types. +当Flink准备要执行的程序时(当程序的main方法被调用时),Flink需要类型信息。Flink Java API会尝试重建以各种方式丢弃的类型信息,并将其明确存储在数据集和运算符中。您可以通过检索类型`DataStream.getType()`。该方法返回一个实例`TypeInformation`,这是Flink表示类型的内部方式。 -The type inference has its limits and needs the "cooperation" of the programmer in some cases. -Examples for that are methods that create data sets from collections, such as -`ExecutionEnvironment.fromCollection(),` where you can pass an argument that describes the type. But -also generic functions like `MapFunction` may need extra type information. +类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的例子是从集合中创建数据集的方法,例如`ExecutionEnvironment.fromCollection(),`可以传递描述类型的参数的地方。但是通用函数`MapFunction`也可能需要额外的类型信息。 -The -{% gh_link /flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %} -interface can be implemented by input formats and functions to tell the API -explicitly about their return type. The *input types* that the functions are invoked with can -usually be inferred by the result types of the previous operations. +{% gh_link /flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %} 接口可以通过输入格式和功能来实现明确地告诉API他们的返回类型。函数被调用的*输入类型*通常可以由前面的操作的结果类型推断出来。 {% top %} -Accumulators & Counters +累加器和计数器 --------------------------- -Accumulators are simple constructs with an **add operation** and a **final accumulated result**, -which is available after the job ended. +累加器是一个简单的结构,具有**添加操作**和**最终累计结果**,在作业结束后可用。 -The most straightforward accumulator is a **counter**: You can increment it using the -```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial -results and send the result to the client. Accumulators are useful during debugging or if you -quickly want to find out more about your data. +最直接的累加器是一个**计数器**:您可以使用该`Accumulator.add(V value)`方法增加 **计数器**。在工作结束时,Flink将总结(合并)所有部分结果并将结果发送给客户端。在调试过程中或者您很快想要了解有关数据的更多信息时,累加器很有用。 -Flink currently has the following **built-in accumulators**. Each of them implements the -{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %} -interface. +Flink目前拥有以下**内置累加器**。它们中的每一个都实现了 [累加器](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 接口。 -- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %}, - {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %} - and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}: - See below for an example using a counter. -- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}: - A histogram implementation for a discrete number of bins. Internally it is just a map from Integer - to Integer. You can use this to compute distributions of values, e.g. the distribution of - words-per-line for a word count program. +- [**IntCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java), [**LongCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java) 和[ **DoubleCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java):请参阅下面的示例使用计数器。 +- [**直方图**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java):离散数量箱的直方图实现。内部它只是一个从Integer到Integer的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数分布。 -__How to use accumulators:__ +**如何使用累加器:** -First you have to create an accumulator object (here a counter) in the user-defined transformation -function where you want to use it. +首先,您必须在要使用它的用户定义的转换函数中创建一个累加器对象(这里是一个计数器)。 {% highlight java %} private IntCounter numLines = new IntCounter(); {% endhighlight %} -Second you have to register the accumulator object, typically in the ```open()``` method of the -*rich* function. Here you also define the name. +其次,您必须注册累加器对象,通常使用*rich* function的`open()`方法 。在这里你也定义了名字。 {% highlight java %} getRuntimeContext().addAccumulator("num-lines", this.numLines); {% endhighlight %} -You can now use the accumulator anywhere in the operator function, including in the ```open()``` and -```close()``` methods. +您现在可以在操作中的任何位置使用累加器,包括在`open()`和 `close()`方法中。 {% highlight java %} this.numLines.add(1); {% endhighlight %} -The overall result will be stored in the ```JobExecutionResult``` object which is -returned from the `execute()` method of the execution environment -(currently this only works if the execution waits for the -completion of the job). +总体结果将存储在`JobExecutionResult,即`从运行环境的`execute()`方法返回的对象中(当前仅当执行等待作业完成时才起作用)。 {% highlight java %} myJobExecutionResult.getAccumulatorResult("num-lines") {% endhighlight %} -All accumulators share a single namespace per job. Thus you can use the same accumulator in -different operator functions of your job. Flink will internally merge all accumulators with the same -name. +所有累加器在每个作业共享一个名称空间。因此,您可以在工作的不同操作功能中使用相同的累加器。Flink将内部合并所有具有相同名称的累加器。 -A note on accumulators and iterations: Currently the result of accumulators is only available after -the overall job has ended. We plan to also make the result of the previous iteration available in the -next iteration. You can use -{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "Aggregators" %} -to compute per-iteration statistics and base the termination of iterations on such statistics. +关于累加器和迭代的说明:目前累加器的结果只有在整个工作结束后才可用。我们还计划在下一次迭代中提供前一次迭代的结果。您可以使用 [聚合器](https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98) 来计算每次迭代统计数据,并基于此类统计数据的迭代终止。 -__Custom accumulators:__ +__自定义累加器:__ -To implement your own accumulator you simply have to write your implementation of the Accumulator -interface. Feel free to create a pull request if you think your custom accumulator should be shipped -with Flink. +要实现自己的累加器,只需编写累加器接口的实现。如果您认为您的自定义累加器应与Flink合并到一起,请随时pull request。 -You have the choice to implement either -{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %} -or {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}. +您可以选择实现 [Accumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 或[SimpleAccumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java)。 -```Accumulator``` is most flexible: It defines a type ```V``` for the value to add, and a -result type ```R``` for the final result. E.g. for a histogram, ```V``` is a number and ```R``` is - a histogram. ```SimpleAccumulator``` is for the cases where both types are the same, e.g. for counters. +`Accumulator`是最灵活的:它定义了`V`要添加的值的类型`R`,以及最终结果的结果类型。例如,对于直方图,`V`是数字并且`R`是直方图。`SimpleAccumulator`适用于两种类型相同的情况,例如柜台。 {% top %}