@@ -484,18 +389,17 @@ val keyed = words.keyBy( _.word )
{% top %}
-Specifying Transformation Functions
+指定转换函数
--------------------------
-Most transformations require user-defined functions. This section lists different ways
-of how they can be specified
+大多数转换需要用户定义方法。本节列出了如何使用不同的方式来指定它们。
-#### Implementing an interface
+#### 实现一个接口
-The most basic way is to implement one of the provided interfaces:
+最基本的方法是实现提供的接口之一:
{% highlight java %}
class MyMapFunction implements MapFunction {
@@ -504,18 +408,18 @@ class MyMapFunction implements MapFunction {
data.map(new MyMapFunction());
{% endhighlight %}
-#### Anonymous classes
+#### 匿名类
-You can pass a function as an anonymous class:
+您可以将一个函数作为匿名类传递:
{% highlight java %}
data.map(new MapFunction () {
public Integer map(String value) { return Integer.parseInt(value); }
});
{% endhighlight %}
-#### Java 8 Lambdas
+#### Java 8 Lambda表达式
-Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide]({{ site.baseurl }}/dev/java8.html).
+Flink还支持Java API中的Java 8 Lambda表达式.请参阅完整的[Java 8指南]({{ site.baseurl }}/dev/java8.html)
{% highlight java %}
data.filter(s -> s.startsWith("http://"));
@@ -527,8 +431,7 @@ data.reduce((i1,i2) -> i1 + i2);
#### Rich functions
-All transformations that require a user-defined function can
-instead take as argument a *rich* function. For example, instead of
+所有需要用户定义函数的转换都可以被*rich* function取代。例如,替换如下方法
{% highlight java %}
class MyMapFunction implements MapFunction {
@@ -536,7 +439,7 @@ class MyMapFunction implements MapFunction {
});
{% endhighlight %}
-you can write
+你可以写为
{% highlight java %}
class MyMapFunction extends RichMapFunction {
@@ -544,13 +447,14 @@ class MyMapFunction extends RichMapFunction {
});
{% endhighlight %}
-and pass the function as usual to a `map` transformation:
+并像往常一样将函数传递给一个`map`转换函数:
{% highlight java %}
data.map(new MyMapFunction());
{% endhighlight %}
-Rich functions can also be defined as an anonymous class:
+Rich functions也可以定义为匿名类 :
+
{% highlight java %}
data.map (new RichMapFunction() {
public Integer map(String value) { return Integer.parseInt(value); }
@@ -563,8 +467,8 @@ data.map (new RichMapFunction() {
#### Lambda Functions
-As already seen in previous examples all operations accept lambda functions for describing
-the operation:
+正如在前面的例子中已经看到的那样,所有操作都接受lambda函数来描述操作:
+
{% highlight scala %}
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
@@ -579,14 +483,13 @@ data.reduce { _ + _ }
#### Rich functions
-All transformations that take as argument a lambda function can
-instead take as argument a *rich* function. For example, instead of
+所有lambda函数的转换都可以被*rich* function取代。例如,替换如下方法
{% highlight scala %}
data.map { x => x.toInt }
{% endhighlight %}
-you can write
+你可以这样写
{% highlight scala %}
class MyMapFunction extends RichMapFunction[String, Int] {
@@ -594,13 +497,14 @@ class MyMapFunction extends RichMapFunction[String, Int] {
})
{% endhighlight %}
-and pass the function to a `map` transformation:
+并将函数传递给一个`map`转换:
{% highlight scala %}
data.map(new MyMapFunction())
{% endhighlight %}
-Rich functions can also be defined as an anonymous class:
+Rich functions也可以定义为匿名类:
+
{% highlight scala %}
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
@@ -610,46 +514,31 @@ data.map (new RichMapFunction[String, Int] {
-Rich functions provide, in addition to the user-defined function (map,
-reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
-`setRuntimeContext`. These are useful for parameterizing the function
-(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)),
-creating and finalizing local state, accessing broadcast variables (see
-[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime
-information such as accumulators and counters (see
-[Accumulators and Counters](#accumulators--counters)), and information
-on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)).
+Rich functions的提供,除了用户定义的函数(map.reduce等),还有四个方法:`open`, `close`, `getRuntimeContext`和`setRuntimeContext`。这些对参数化函数(请参阅[参数传递给函数]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions) ),创建和终止本地状态,访问广播变量 (请参阅[广播变量]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)),以及访问运行时信息(如累加器和计数器)(请参阅[计数器和累加器](#accumulators--counters))以及迭代(请参阅[迭代]({{ site.baseurl }}/dev/batch/iterations.html)) 都很有用 。
{% top %}
-Supported Data Types
+支持的数据类型
--------------------
-Flink places some restrictions on the type of elements that can be in a DataSet or DataStream.
-The reason for this is that the system analyzes the types to determine
-efficient execution strategies.
+Flink对可以存在于DataSet或DataStream中的元素的类型有一些限制。原因是系统需要分析类型以确定有效的执行策略。
-There are six different categories of data types:
+有六种不同类型的数据类型:
-1. **Java Tuples** and **Scala Case Classes**
+1. **Java 元组** 和**Scala 样本类**
2. **Java POJOs**
-3. **Primitive Types**
-4. **Regular Classes**
-5. **Values**
+3. **原始类型**
+4. **常规类型**
+5. **值**
6. **Hadoop Writables**
-7. **Special Types**
+7. **特殊类型**
#### Tuples and Case Classes
-Tuples are composite types that contain a fixed number of fields with various types.
-The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple
-can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a
-tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic getter method
-`tuple.getField(int position)`. The field indices start at 0. Note that this stands in contrast
-to the Scala tuples, but it is more consistent with Java's general indexing.
+元组是包含固定数量的各种类型的字段的复合类型。Java API提供从`Tuple1`到`Tuple25`的类。。元组的每个字段都可以是包含更多元组的任意Flink类型,从而生成嵌套元组。可以直接使用该字段的名称`tuple.f4`或使用泛型getter方法 来访问元组的字段`tuple.getField(int position)`。字段索引从0开始。请注意,这与Scala元组形成对比,但它与Java的一般索引更加一致。
{% highlight java %}
DataStream> wordCounts = env.fromElements(
@@ -671,7 +560,7 @@ wordCounts.keyBy(0); // also valid .keyBy("f0")
-Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as `_1` for the first field. Case class fields are accessed by their name.
+Scala样本类(以及作为样本类的特例的Scala元组)是包含固定数量的各种类型字段的复合类型。元组字段通过它们的偏移量名称寻址,例如`_1`第一个字段。大小写类字段按名称访问。
{% highlight scala %}
case class WordCount(word: String, count: Int)
@@ -689,21 +578,18 @@ input2.keyBy(0, 1) // key by field positions 0 and 1
-#### POJOs
-
-Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:
-
-- The class must be public.
-
-- It must have a public constructor without arguments (default constructor).
+#### POJOS
-- All fields are either public or must be accessible through getter and setter functions. For a field called `foo` the getter and setter methods must be named `getFoo()` and `setFoo()`.
+如果满足以下要求,Java和Scala类将被Flink视为特殊的POJO数据类型:
-- The type of a field must be supported by Flink. At the moment, Flink uses [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
+- 类必须是公开的。
+- 它必须有一个没有参数的公开的构造函数(默认构造函数)。
+- 所有字段都是公开的,或者必须通过getter和setter函数访问。对于称为`foo`getter和setter方法的字段必须命名`getFoo()`和`setFoo()`。
+- Flink必须支持字段的类型。目前,Flink使用[Avro](http://avro.apache.org)序列化任意对象(如`Date`)。
-Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
+Flink分析POJO类型的结构,即它了解POJO的字段。因此POJO类型比一般类型更易于使用。而且,Flink可以比普通类型更有效地处理POJO。
-The following example shows a simple POJO with two public fields.
+以下示例显示了一个包含两个公用字段的简单POJO。
@@ -747,157 +633,96 @@ input.keyBy("word")// key by field expression "word"
-#### Primitive Types
+#### 原始类型
-Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.
+Flink支持所有Java和Scala的原始类型,如`Integer`,`String`和`Double`。
-#### General Class Types
+#### 基本类型
-Flink supports most Java and Scala classes (API and custom).
-Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native
-resources. Classes that follow the Java Beans conventions work well in general.
+Flink支持大多数Java和Scala类(API和自定义)。但是限制包含无法序列化的字段的类,如文件指针,I / O流或其他本地资源。遵循Java Beans约定的类一般运行良好。
-All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types.
-Flink treats these data types as black boxes and is not able to access their content (i.e., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
+所有未被识别为POJO类型的类(见上面的POJO要求)都由Flink作为一般类类型来处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即进行高效排序)。常规类型使用序列化框架[Kryo](https://github.com/EsotericSoftware/kryo)进行反序列化。
-#### Values
+#### 值
-*Value* types describe their serialization and deserialization manually. Instead of going through a
-general purpose serialization framework, they provide custom code for those operations by means of
-implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using
-a Value type is reasonable when general purpose serialization would be highly inefficient. An
-example would be a data type that implements a sparse vector of elements as an array. Knowing that
-the array is mostly zero, one can use a special encoding for the non-zero elements, while the
-general purpose serialization would simply write all array elements.
+*值*类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过实现`org.apache.flinktypes.Value`接口的`read`和`write`方法来为这些操作提供自定义代码。当通用序列化非常低效时,使用值类型是合理的。例如,一个数据类型,它实现一个稀疏的元素向量作为数组。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化将简单地写入所有数组元素。
-The `org.apache.flinktypes.CopyableValue` interface supports manual internal cloning logic in a
-similar way.
+该`org.apache.flinktypes.CopyableValue`接口以类似的方式支持手动内部克隆逻辑。
-Flink comes with pre-defined Value types that correspond to basic data types. (`ByteValue`,
-`ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, `StringValue`, `CharValue`,
-`BooleanValue`). These Value types act as mutable variants of the basic data types: Their value can
-be altered, allowing programmers to reuse objects and take pressure off the garbage collector.
+Flink带有与基本数据类型相对应的预定义值类型。(`ByteValue`,`ShortValue`,`IntValue`,`LongValue`,`FloatValue`,`DoubleValue`,`StringValue`,`CharValue`, `BooleanValue`)。这些Value类型作为基本数据类型的可变变体:它们的值可以被改变,允许程序员重用对象并从垃圾收集器中释放压力。
#### Hadoop Writables
-You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic
-defined in the `write()`and `readFields()` methods will be used for serialization.
+您可以使用实现`org.apache.hadoop.Writable`接口的类型。在`write()`and `readFields()`方法中定义的序列化逻辑将用于序列化。
#### Special Types
-You can use special types, including Scala's `Either`, `Option`, and `Try`.
-The Java API has its own custom implementation of `Either`.
-Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or *Right*.
-`Either` can be useful for error handling or operators that need to output two different types of records.
+您可以使用特殊类型,包括Scala的`Either`,`Option`和`Try`。Java API有它自己的自定义实现`Either`。与Scala的`Either`类似,它代表了*Left*或*Right*两种可能类型的值。 `Either`可用于错误处理或需要输出两种不同类型记录的操作。
-#### Type Erasure & Type Inference
+#### 类型擦除和类型推断
-*Note: This Section is only relevant for Java.*
+*注意:本节仅与Java相关。*
-The Java compiler throws away much of the generic type information after compilation. This is
-known as *type erasure* in Java. It means that at runtime, an instance of an object does not know
-its generic type any more. For example, instances of `DataStream
` and `DataStream` look the
-same to the JVM.
+Java编译器在编译后会抛弃大部分的泛型类型信息。这被称为Java中的*类型擦除*。这意味着在运行时,对象的一个实例不再知道它的泛型类型。例如,JVM的实例`DataStream`和`DataStream`看起来相同。
-Flink requires type information at the time when it prepares the program for execution (when the
-main method of the program is called). The Flink Java API tries to reconstruct the type information
-that was thrown away in various ways and store it explicitly in the data sets and operators. You can
-retrieve the type via `DataStream.getType()`. The method returns an instance of `TypeInformation`,
-which is Flink's internal way of representing types.
+当Flink准备要执行的程序时(当程序的main方法被调用时),Flink需要类型信息。Flink Java API会尝试重建以各种方式丢弃的类型信息,并将其明确存储在数据集和运算符中。您可以通过检索类型`DataStream.getType()`。该方法返回一个实例`TypeInformation`,这是Flink表示类型的内部方式。
-The type inference has its limits and needs the "cooperation" of the programmer in some cases.
-Examples for that are methods that create data sets from collections, such as
-`ExecutionEnvironment.fromCollection(),` where you can pass an argument that describes the type. But
-also generic functions like `MapFunction` may need extra type information.
+类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的例子是从集合中创建数据集的方法,例如`ExecutionEnvironment.fromCollection(),`可以传递描述类型的参数的地方。但是通用函数`MapFunction`也可能需要额外的类型信息。
-The
-{% gh_link /flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %}
-interface can be implemented by input formats and functions to tell the API
-explicitly about their return type. The *input types* that the functions are invoked with can
-usually be inferred by the result types of the previous operations.
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %} 接口可以通过输入格式和功能来实现明确地告诉API他们的返回类型。函数被调用的*输入类型*通常可以由前面的操作的结果类型推断出来。
{% top %}
-Accumulators & Counters
+累加器和计数器
---------------------------
-Accumulators are simple constructs with an **add operation** and a **final accumulated result**,
-which is available after the job ended.
+累加器是一个简单的结构,具有**添加操作**和**最终累计结果**,在作业结束后可用。
-The most straightforward accumulator is a **counter**: You can increment it using the
-```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial
-results and send the result to the client. Accumulators are useful during debugging or if you
-quickly want to find out more about your data.
+最直接的累加器是一个**计数器**:您可以使用该`Accumulator.add(V value)`方法增加 **计数器**。在工作结束时,Flink将总结(合并)所有部分结果并将结果发送给客户端。在调试过程中或者您很快想要了解有关数据的更多信息时,累加器很有用。
-Flink currently has the following **built-in accumulators**. Each of them implements the
-{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
-interface.
+Flink目前拥有以下**内置累加器**。它们中的每一个都实现了 [累加器](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 接口。
-- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %},
- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %}
- and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}:
- See below for an example using a counter.
-- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}:
- A histogram implementation for a discrete number of bins. Internally it is just a map from Integer
- to Integer. You can use this to compute distributions of values, e.g. the distribution of
- words-per-line for a word count program.
+- [**IntCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java), [**LongCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java) 和[ **DoubleCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java):请参阅下面的示例使用计数器。
+- [**直方图**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java):离散数量箱的直方图实现。内部它只是一个从Integer到Integer的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数分布。
-__How to use accumulators:__
+**如何使用累加器:**
-First you have to create an accumulator object (here a counter) in the user-defined transformation
-function where you want to use it.
+首先,您必须在要使用它的用户定义的转换函数中创建一个累加器对象(这里是一个计数器)。
{% highlight java %}
private IntCounter numLines = new IntCounter();
{% endhighlight %}
-Second you have to register the accumulator object, typically in the ```open()``` method of the
-*rich* function. Here you also define the name.
+其次,您必须注册累加器对象,通常使用*rich* function的`open()`方法 。在这里你也定义了名字。
{% highlight java %}
getRuntimeContext().addAccumulator("num-lines", this.numLines);
{% endhighlight %}
-You can now use the accumulator anywhere in the operator function, including in the ```open()``` and
-```close()``` methods.
+您现在可以在操作中的任何位置使用累加器,包括在`open()`和 `close()`方法中。
{% highlight java %}
this.numLines.add(1);
{% endhighlight %}
-The overall result will be stored in the ```JobExecutionResult``` object which is
-returned from the `execute()` method of the execution environment
-(currently this only works if the execution waits for the
-completion of the job).
+总体结果将存储在`JobExecutionResult,即`从运行环境的`execute()`方法返回的对象中(当前仅当执行等待作业完成时才起作用)。
{% highlight java %}
myJobExecutionResult.getAccumulatorResult("num-lines")
{% endhighlight %}
-All accumulators share a single namespace per job. Thus you can use the same accumulator in
-different operator functions of your job. Flink will internally merge all accumulators with the same
-name.
+所有累加器在每个作业共享一个名称空间。因此,您可以在工作的不同操作功能中使用相同的累加器。Flink将内部合并所有具有相同名称的累加器。
-A note on accumulators and iterations: Currently the result of accumulators is only available after
-the overall job has ended. We plan to also make the result of the previous iteration available in the
-next iteration. You can use
-{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "Aggregators" %}
-to compute per-iteration statistics and base the termination of iterations on such statistics.
+关于累加器和迭代的说明:目前累加器的结果只有在整个工作结束后才可用。我们还计划在下一次迭代中提供前一次迭代的结果。您可以使用 [聚合器](https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98) 来计算每次迭代统计数据,并基于此类统计数据的迭代终止。
-__Custom accumulators:__
+__自定义累加器:__
-To implement your own accumulator you simply have to write your implementation of the Accumulator
-interface. Feel free to create a pull request if you think your custom accumulator should be shipped
-with Flink.
+要实现自己的累加器,只需编写累加器接口的实现。如果您认为您的自定义累加器应与Flink合并到一起,请随时pull request。
-You have the choice to implement either
-{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
-or {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}.
+您可以选择实现 [Accumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 或[SimpleAccumulator](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java)。
-```Accumulator``` is most flexible: It defines a type ```V``` for the value to add, and a
-result type ```R``` for the final result. E.g. for a histogram, ```V``` is a number and ```R``` is
- a histogram. ```SimpleAccumulator``` is for the cases where both types are the same, e.g. for counters.
+`Accumulator`是最灵活的:它定义了`V`要添加的值的类型`R`,以及最终结果的结果类型。例如,对于直方图,`V`是数字并且`R`是直方图。`SimpleAccumulator`适用于两种类型相同的情况,例如柜台。
{% top %}