Flink 1.9 学习思维导图
2020-03-13 18:11:07 29 举报
AI智能生成
flink1.9 学习思维导图
作者其他创作
大纲/内容
Flink
DataStream API
DataStream 编程模型
DataSource数据接入
内置数据源
文件数据源
readTextFile
直接读取
readFile
指定文件的InputFormat读取
FileProcessingMode
Socket数据源
socketTextStream
集合数据源
将java、scala的集合类(Collection)转换成DataStream数据集
本质:将本地集合中的个数据分发到远端并行执行的节点中
fromElements
Java 数组
Java List
外部数据源
数据源连接器
KAFKA
netty
Twitter
hdfs
...
自定义数据源连接器
DataStream 转换操作
从一个或多个DataStream生成新的DataStream 的过程称为转换
转换类型
Simgle-DataStream
Map[DataStream->DataStream]
常用作对数据集内的数据进行清洗和转换
FlatMap[DataStream->DataStream]
用于一个输入产生一个或多个元素的场景。比如wordCount
Filter[DataStream->DataStream]
筛选符合条件的数据
KeyBy[DataStream->KeyedStream]
将数据根据指定的key从DataStream->KeyedStream
对数据集进行partition操作,将key相同的数据放置在相同的分区中
不能使用KeyBy对数据充分去的情况
使用POJOs类型数据,但是没有复写hashCode()
任何数据类型的数组结构
Aggregations[KeyedStream->DataStream]
根据指定的算子进行聚合操作,滚动的产生一些列数据聚合的结果
本质上是对Reduce中的函数进行了封装
Multi-DataStream
Union[ DataStream-> DataStream]
将两个或多个输入的数据集合并成一个数据集
要保证两个数据集的格式一致
Connect、CoMap、CoFlatMap[DataStream->DataStream]
合并两种或多种不同数据类型的数据集
就是join的各种实现
keyBy()
将key相同的数据路由到同一个Operator中
broadcast()
在计算之前将该数据集广播到所有并行计算的Operator中
案例
将数据集安条件进行拆分,形成一个SplitStream
仅仅是对数据进行了标记,给元素指定了一个标记,并没有真正的拆分
配合select达到Fliter的效果,某些时候也可以看作是Union的逆过程
iterate
迭代相关操作(后续补充)
物理分区操作
根据指定的分区策略将数据重新分配到不同节点的Task实例上执行
常见分区策略
Random Partition-rebalance
将数据随机分配到下游的每个分区中。
分区相对均衡
容易失去原有数据结构
RoundRobin Partition-rebalance
通过循环的方式对数据进行重分区,尽可能保证每个分区数据平衡
此方法比较有效,但是也容易失去原有的数据结构
Rescale Partition-rescale
rescale 和 rebalance 类似,也可以将数据以 round-robin 的方式发送给下游任务
rebalance 会将数据发送发送给所有下游任务
rescale 仅会对上下游继承的算子数据进行重平衡
广播操作-broadcast
复制所有的输入数据,然后将所有数据都复制到下游并行的Task实例中。
下有算子中的Task可以直接从本地内存中获取广播数据集,不再需要依赖于网络传输
适合小数据集,比如大表关联小表
自定义分区- partitionCustom
当所有分区策略都不满意时,可以自定义分区策略。
partitionCustom() 方法接收一个 Partitioner 对象
我们只需要实现一个 Partitioner,定义我们自己的分区策略
DataSink 数据输出
将DataStream数据输出到外部系统的过程被定义为DataSink操作
输出类型
基本数据输出
实现过程中不需要依赖其他第三方库
writeAsCsv
writeAsText
writeToSocket
print
第三方数据输出
通过第三方连接器输出到外部系统中
时间概念与WaterMark
时间的三种概念
事件生成时间(Event Time)
事件时间是每个事件在其生产设备上发生的时间。这个时间通常是在记录进入Flink之前嵌入的,可以从每个记录中提取事件时间戳。
时间的进展取决于数据本身,而不是其他的时钟
事件时间程序必须指定如何生成事件时间水印( Event Time Watermarks),这是事件时间进展的信号机制。
除非知道事件是按顺序到达的(通过时间戳),否则在等待无序事件时,事件时间处理会导致一些延迟。由于只能等待有限的一段时间,这就限制了应用程序的确定性事件时间。
事件接入时间(Ingestion Time)
摄入时间概念上介于事件时间和处理时间之间。
与处理时间相比,它稍微好资源一些,但是提供了更可预测的结果。摄取时间使用稳定的时间戳(在源处分配一次),对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个窗口操作可以将记录分配到不同的窗口。
与事件时间相比,摄取时间程序不能处理任何无序的事件或延迟的数据,但程序不必指定如何生成水印。
事件处理时间(Processing Time)
当流程序在 processing time上运行时,所有基于时间的操作(如windows)将使用运行各个操作符的机器的系统时钟。
处理时间是最简单的时间概念,不需要流和机器之间的协调。
它提供了最好的性能和最低的延迟。
在分布式和异步环境中,处理时间并不能提供确定性,它容易受到记录到达系统的速度(例如,从消息队列)、记录在系统内部操作符之间流动的速度以及停机(计划的或其他方式)的影响。
设定时间特性
Flink DataStream程序的第一部分通常设置基本时间特性。
Event Time and Watermarks
支持事件时间的流处理器需要一种方法来度量事件时间的进度。告诉窗口操作属于该窗口的数据已经全部接收,以便关闭正在运行的窗口。
事件时间可以独立于处理时间
分配时间戳
直接在数据流源中分配
要直接向源中的元素分配时间戳,源必须使用SourceContext上的collectWithTimestamp(…)方法。要生成水印,源程序必须调用emitWatermark(Watermark)函数。
通过 timestamp assigner / watermark generator
时间戳分配程序获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经具有时间戳和/或水印,则timestamp assigner将覆盖它们。
Timestamp assigners程序通常在数据源之后立即指定,但并不严格要求这样做。
watermarks的生成方式
在Flink中测量事件时间进展的机制是水印(watermarks)。
周期性的(With Periodic Watermarks)
周期性的触发watermark的生成和发送,默认是100ms
时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定
每隔N秒自动向流里注入一个WATERMARK
可以定义一个最大允许乱序的时间,这种比较常用
实现AssignerWithPeriodicWatermarks接口
根据某些特殊条件(With Punctuated Watermarks)
基于某些事件触发watermark的生成和发送
基于事件向流里注入一个WATERMARK,每一个元素都有机会判断是否生成一个WATERMARK.
如果得到的WATERMARK 不为空并且比之前的大就注入流中
实现AssignerWithPunctuatedWatermarks接口
Timestamps per Kafka Partition
Windows计算
Windows Assigner
Keyed?
Keyed
根据key 分时统计,某一个段,某用户登陆的网站数量
No-Keyed
全量分时统计,如:某一时段网站所有请求数
窗口类型
基于时间
滚动窗口
Event Time
window
timeWindow
Process Time
滑动窗口
会话窗口
withGap
withDynamicGap
全局窗口
GlobalWindows
基于数量
自定义窗口分配器
根据自己的需要进行数据的的窗口划分
需要实现
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner#WindowAssigner
实现案例
Windows Function
增量聚合
ReduceFunction
AggregateFunction
FoldFunction
全量聚合
ProcessWindowFunction
pv、uv统计demo
Incremental Aggregation + ProcessWindowFunction
求窗口中指标的最大值、窗口终止时间
Trigger 窗口触发器
EventTimeTrigger
通过Watermark和EndTime确定是否触发窗口
ProcessingTimeTrigger
通过ProcessTime和窗口EndTime确定是否触发窗口
ContinuousEventTimeTrigger
基于给定时间间隔 或 EndTime小于Watermark 时触发窗口
ContinuousProcessingTimeTrigger
指定的时间间隔触发窗口
CountTrigger
根据接入的数据量是否超过设定的阀值确定是否触发窗口
DeltaTrigger
提供 delta 函数和历史 datapoint 存储,每个元素消费时触发 delta 函数计算
PurgingTrigger
NeverTrigger
StateCleaningCountTrigger
继承Trigger 自定义触发器
实现
org.apache.flink.streaming.api.windowing.triggers.Trigger#Trigger
按照指定周期时间或数据量进行窗口的触发计算
子主题
数据剔除器 Evictor
CountEvictor
DeltaEvictor
TimeEvictor
实现Evictor自定义剔除器
延迟数据处理
flink 虽然提供了watermark机制,但是但是却只能在一定程度上解决数据乱序的问题。
某些情况下数据的延迟可能会非常严重,watermark 机制也无法保证所有数据都进入了窗口,而这些数据将会被丢弃
flink 使用allowedLateness来决定是否对延迟数据进行处理,传入的时间Time(T) 表示允许延迟的最大时延
allowedLateness与watermark
allowedLateness
flink窗口计算过程中会将窗口的结束时间加上T,座位窗口最后被释放的结束时间P
当接入事件的EventTime不超过P,但Watermark超过窗口的结束时间,直接触发窗口计算
如果接入事件的EventTime大于P,则对数据进行丢弃处理
watermark
Table API & SQL
概述
简介
FLink提供两个关系型API用于统一流处理和批处理,即:Table API & SQL
Table API 是用于 scala 和 java 的语言继承API,允许以一种非常直观地方式组合来自关系的操作符(如选择、筛选、关联等)
SQL是基于Apache Calcite实现的SQL标准,让我们能够以SQL的形式简化开发的难度
无论是批处理输入(DataSet)还是流出里输入(DataStream),在这两个接口中具有相同语义的查询具有相同的结果
构建依赖
Planner
Planner:负责将关系操作符转换为可执行的、优化的Flink作业。
Planner分类
说明
从Flink 1.9开始,Flink提供了两种不同的planner实现,用于评估Table & SQL API程序(Blink planner、Old planner)
这两种计划器都提供了不同的优化规则和运行时类。它们还可能在支持的特性集上有所不同。
Old planner
Blink planner
两种planner的不同
Blink将批处理作业视为流的一种特殊情况。所以不支持 Table and DataSet的转换,batch 作业将不会转换为DateSet程序,而是转换为DataStream程序,与流作业相同。
Blink planner不支持BatchTableSource,使用有界StreamTableSource 代替它。
Blink planner只支持全新的目录,不支持已被弃用的ExternalCatalog。
二者的FilterableTableSource实现是不兼容的:Old planne 把PlannerExpressions下推到FilterableTableSource;而Blink planner 将之下推到 Expressions
仅Blink planner 支持基于key-value的配置操作
PlannerConfig 在两者中的实现是不同的
Blink planne将多个接收器优化为一个DAG(仅在TableEnvironment上支持,而在StreamTableEnvironment上不支持)。Old planner总是优化每个下沉到一个新的DAG,所有DAG是相互独立的。
使用流程
创建执行环境(TableEnvironment)
创建TableEnvironment
注册表
TableEnvironment维护按名称注册的表的目录。
表的类型
输入表
输入表可以在Table API和SQL查询中引用,并提供输入数据
输入表可以来自多种数据源:1. 现有的Table 对象,通常是Table API或SQL查询的结果。2. 来自TableSource,它访问外部数据,如文件、数据库或消息传递系统3. 来自流处理或批处理的 DataStream 或 DataSet 转换而来
输出表
输出表可用于将Table API或SQL查询的结果发送到外部系统。
注册Table
注册TableSource
注册TableSink
注册一个外部目录(可选)
外部目录可以提供关于外部数据库和表的信息:比如它们的名称、模式、统计信息以及如何访问存储在外部数据库、表或文件中的数据的信息。
一旦在TableEnvironment中注册,在ExternalCatalog中定义的所有表都可以通过表API或SQL查询指定它们的完整路径来访问:比如catalog.database.table。
注意: Blink planner 目前不支持外部目录
查询表
Table API
Table API 是用于 scala和java语言的集成查询API,对比SQL,它采用的是一步一步组合的形式,而不是一句sql字符串语句
Table API 文档中描述的所有Table API operations 支持流处理和批处理
注意:Scala Table API 使用Scala符号(')来引用标的属性(字段);Table API 使用了Scala implicits,确保导入了一下依赖:org.apache.flink.api.scala._ org.apache.flink.table.api.scala._以使用scala的隐式转换
SQL
Flink的SQL集成基于Apache Calcite,它实现了SQL标准。SQL查询被指定为常规字符串。
译文:SQL文档描述了Flink对流和批处理表的SQL支持。
这两个API 可以混用,返回的本质都是Table objects
提交(发出)一个表
Table API通过把数据写入TableSink将之提交出。TableSink是一个通用接口:支持各种文件格式(例如CSV、Apache Parquet、Apache Avro)、存储系统(例如JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息传递系统(例如Apache Kafka、RabbitMQ)。
Table.insertInto(String tableName) 将Table 发送到注册的TableSink;该方法会通过名称从目录中查询TableSink 并校验Table 的schema 是否于TableSink完全一致
转换或执行查询的流程
Table API and SQL 查询根据输入类型会被转换成 DataStream 或 DataSet,在内部,查询表示为一个查询逻辑计划,并且通过两个阶段进行转换
转换过程
优化查询逻辑计划
将逻辑计划转换成DataStream or DataSet program
Table API or SQL 查询转换的时机
将Table提交到一个TableSink的时候,比如:调用Table.insertInto() 的时候
当制定一个SQL更新查询的时候,比如: 调用TableEnvironment.sqlUpdate() 的时候
将Table 转换成DataStream or DataSet的时候(后续介绍)
转换完成
一旦转换完成, 处理Table API or SQL 查询就跟处理常规DataStream or DataSet 程序一样,当调用StreamExecutionEnvironment.execute() or ExecutionEnvironment.execute()的时候执行该查询
Blink planner
不管输入数据是什么类型,Table API and SQL都会被转换成 DataStream programs,在内部,与Old planner类似,查询表示为一个查询逻辑计划,并且通过两个阶段进行转换
将逻辑计划转换成 DataStream (or DataSet) program
TableEnvironment和StreamTableEnvironment,转换查询的行为是不同的。
StreamTableEnvironment
TableEnvironment
当执行TableEnvironment.execute()的时候会进行查询转换,因为TableEnvironment会将多个sink优化为一个dag
一旦转换完成, 处理Table API or SQL 查询就跟处理常规DataStream程序一样,当调用TableEnvironment.execute() or StreamExecutionEnvironment.execute()的时候执行该查询
Table API & SQL 与 DataStream & DataSet API 的集成
Table API and SQL的查询很容易与 DataStream and DataSet 程序集成
通过这种方式,我们可以对一些external table (for example from a RDBMS),做一些预处理,比如过滤、聚合、关联元数据等操作然后使用构建在DataStream or DataSet API上的库 比如(CEP or Gelly)
DataStream or DataSet -> Table
直接将DataStream or DataSet 注册为Table
通过TableEnvironment可以将一个DataStream or DataSet注册为TableTable 的 schema 依赖于DataStream or DataSet的数据类型
将DataStream or DataSet转换成Table
除了通过TableEnvironment进行转换,还可以直接将DataStream or DataSet转换成Table
Table -> DataStream or DataSet
Table 可以转换成DataStream or DataSet,由此,Table的查询结果可以使用DataStream or DataSetAPI
注意,在进行Table -> DataStream or DataSet的转换的时候,我们需要指定DataStream or DataSet的数据类型即:表中行需要转换的数据类型,通常最方便的是类型设为Row
常用可选类型的特性
Row
字段按位置映射,任意数量的字段,支持空值,没有类型安全的访问
POJO
字段按名称(POJO字段必须命名为表字段)、任意数量的字段、支持空值、类型安全的访问进行映射。
Case Class
字段按位置映射,不支持空值,类型安全访问。
Tuple
字段按位置映射,限制为22 (Scala)或25 (Java)字段,不支持空值,类型安全访问。
Atomic Type
表必须有单个字段,不支持空值,类型安全的访问。
Table -> DataStream
将一个Table转成streaming query是一个动态更新的过程,当一个新的记录到达查询的输入流的时候,它会发生变化所以,此类动态查询转而来的DataStream 需要对Table的更新进行编辑
Table -> DataStream的两种模式
Append Mode
只有当 dynamic Table仅通过插入更改进行修改时记录,才能使用此模式
也就是说,它只用于追加,以前发出的结果永远不会更新。
Retract Mode
这个模式可以一直使用。它使用布尔标志对插入和删除更改进行编码。详情后续介绍
Table -> DataSet
数据类型到Table Schema的映射
在我们进行DataStream -> Table转换的时候,这些数据类型就会转转成内部行的形式表示
两种数据类型转换内部行的方式
Position-based
基于位置的映射可以位子段提供更有意义的名称,同时保持字段的顺序。
POJO类型必须使用field names进行映射
注意:当我们使用基于位置的映射关系的时候,指定的名称不能是原本数据类型中存在的名称否则,API将会以基于名称的形式进行映射如果没有指定名称,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用f0。
Name-based
基于名称的方式可以映射任意数据类型,包括POJOs。是定义Table Schema 最灵活的方式,映射的字段可以安名称引用、也可以通过 as 进行重命名,同时调整字段的顺序
注意:如果没有指定字段名,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用f0。
数据类型映射
Atomic Types
atomic type的DataStream or DataSet被转换成具有单个属性的Table
属性的类型是从原子类型推断出来的,也可以指定属性的名称
Tuples (Scala and Java) 、Case Classes (Scala only)
Flink支持Scala的内置元组并未Java提供了自己的元组类,这两种元组的 DataStreams and DataSets都可以转换成Table
默认采用基于位置的映射模式,当默认字段名被引用,API则会采用基于名称的映射方式,此时可以对字段通过as重命名和排列
POJO (Java and Scala)
Flink 以复合数据类型的方式支持POJOs
对POJO DataStream or DataSet转换成Table、并且没有指定字段名称时,将会使用原始POJO字段名。名称映射需要原始名称,不能用基于位置的映射。
可以通过as重命名和重现排列字段顺序
Row数据类型支持任意数量的字段和具有空值的字段
字段名可以通过RowTypeInfo指定也可以在将 Row DataStream or DataSet转换成Table的时候指定
支持按位置和名称映射字段
字段可以通过为所有字段提供名称来重命名(基于位置的映射)
或者单独选择用于投影/排序/重命名的字段(基于名称的映射)
总结
一般来说,只要你映射到Table的字段名不是原本DataStream or DataSet中的名称,都是基于位置映射的,此时需要注意顺序当映射到Table 的字段名是原本DataStream or DataSet中的名称时候,是基于名称映射的,此时可以调整顺序,重命名等。
查询优化
Old planner
Flink利用Apache Calcite来优化和转换
目前Old planner支持的优化包括:投影(此处应该是投影消除)和过滤下推、子查询解关联以及其他类型的查询重写。
Old planner还没有优化joins的顺序(瞄了一下1.11好像也没有)而是按照查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。
通过提供CalciteConfig对象,可以调整在不同阶段应用的优化规则集。
Apache Flink利用并扩展了Apache Calcite来执行复杂的查询优化。这包括一系列规则和基于成本的优化
基于 Apache Calcite的子查询解关联
投影裁减(消除)
分区裁减
过滤下推
子计划重复数据删除以避免重复计算
特殊子查询重写,包括两部分
将IN and EXISTS转换成left semi-joins
将 NOT IN and NOT EXISTS转换成left anti-join
可选join重排
通过启用table.optimizer.join-reorder-enabled
Explaining a Table
flink 也提供了explain方法供我们查看表的逻辑计划和优化我们的查询
通过TableEnvironment.explain(table)获得对应表查询计划
注意在1.9中只有Blink planner才有explain
explain的内容
关系查询的抽象语法树,即未优化的逻辑查询计划,
优化的逻辑查询计划
实际执行计划
Dynamic Tables
Dynamic table是Flink’s Table API and SQL支持流数据的核心概念,与表示批数据的静态表不同,动态表是随时间变化的。查询动态表会产生连续的查询,并且产生一个动态表作为结果。查询会不断地更新动态结果表一响应输入表的改变。简单的可以理解为:动态表上的拉in徐查询类似定义一个物化视图的查询。
stream 被转换成 dynamic table
对动态表进行持续查询并产生应新的动态表
生成的动态表被转换回stream
查询的限制
状态的大小
连续查询是在无界流上计算的,通常需要运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新以前发出的结果的查询需要维护所有发出的行,以便能够更新它们。
例如我们又一个网站浏览数据,需要统计每个用户访问url的数量,那么我们就需要记录每个用户url的技术,以便在输入表接收到新行时能够增加计数并发送新的结果。如果只跟踪注册用户(本身注册用户数不多的情况下),则要维护的计数数可能不会太高
计算更新
有些查询需要重新计算和更新大部分发出的结果行,即使只添加或更新了一条输入记录。显然,这样的查询不适合作为连续查询执行。
例如,我们要根据某个用户最后一次操作时间进行排序,一旦click表接收到新行,就会更新用户的lastAction,并且必须计算新的排行。但是,由于两个行不能具有相同的排序,所以所有较低的行也需要更新。
动态表到流的转换
与常规数据库表一样,动态表可以通过插入、更新和删除更改不断修改。它可能是一个具有单行(经常更新)的表,也可能是一个只有插入的表,没有更新和删除修改,或者介于两者之间的任何东西。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。
三种编码方式
Append-only stream
仅通过插入更改修改的动态表可以通过发出插入的行来转换为流。
Retract stream
retract stream包含两中类型的信息
add messages
将INSERT更改编码为add messages
将更新(新)行的UPDATE change编码为add message
retract messages
将DELETE更改编码为retract messages
将更新(旧)行的UPDATE change编码为retract messages
注意
UPDATE change 此处是分为两个消息编码的,与Upsert stream形成对比
Upsert stream
转换成upsert stream需要一个唯一键(也许是复合键)
流消费操作符需要知道唯一的键属性,以便正确应用消息。
包含两种信息
upsert messages
将INSERT and UPDATE更改编码成upsert messages
delete messages
DELETE 更改编码成delete messages
与Retract stream 区别
UPDATE changes通过单个消息编码,所以比retract stream更高效
时间属性
说明
在 Table API and SQL中,基于时间的操作(window)都需要关于时间的概念及其起源的信息
因此,tables 可以提供逻辑时间属性,用于指示时间和访问表程序中相应的时间戳。
定义时间属性的时机
从DataStream创建table的时候
使用TableSource预定义tables的时候
一旦在开始时定义了时间属性,就可以将其引用为字段,并可在基于时间的操作中使用。
只要时间属性没有被修改,只是从查询的一个部分转发到另一个部分,它仍然是一个有效的时间属性。
时间属性的行为类似于常规的时间戳,可以用于计算。如果在计算中使用了时间属性,那么它将被物化并成为一个常规的时间戳。
常规的时间戳与Flink的时间和水印系统不兼容,因此不能再用于基于时间的操作。
三种时间属性
Processing time
Processing time允许table program根据本地机器的时间产生结果。
它是最简单的时间概念,但不提供决定论。它既不需要提取时间戳,也不需要生成水印。
定义方法
DataStream-to-Table转换的时候
处理时间属性是在模式定义期间用.proctime属性定义的。
time属性只能通过附加的逻辑字段扩展物理模式。因此,只能在模式定义的末尾定义它。
使用TableSource
processing time 属性通过实现DefinedProctimeAttribute 接口的TableSource实现
逻辑时间属性附加到由表源的返回类型定义的物理模式中。
Event time
Event time 采用记录中制定的时间,可以保证即使发生无序事件或延迟时间结果也能保持一致。
同时还确保从持久存储读取记录时,table的结果可重放。
此外,事件时间允许在batch和stream 环境中对表程序使用统一的语法。stream环境中的时间属性可以是batch处理环境中的记录的常规字段。
为了处理无序的事件并区分流中的准时事件和延迟事件,Flink需要从事件中提取时间戳,并在时间上进行某种进展(所谓的水印)。
定义的方法
事件时间属性是在模式定义期间使用.rowtime属性定义的。必须在转换的DataStream 中分配了时间戳和水印。
根据是否指定的.rowtime字段名,有两种方式定义时间属性
作为一个新的字段追加到schema
替换已经存在的字段
通过实现DefinedRowtimeAttributes接口定义
getRowtimeAttributeDescriptors()方法返回用于描述时间属性的最终名称的RowtimeAttributeDescriptor列表
确保getDataStream()方法返回的DataStream与所定义的时间属性保持一致
JOIN操作
Regular Joins
常规联接是最通用的联接类型,其中联接输入的任何新记录或更改都是可见的,并且会影响整个联接结果。
但是,这个操作有一个重要的问题:它需要将连接输入的两边永远保持在Flink的状态。
因此,如果一个或两个输入表都在持续增长,那么资源使用也会无限增长。
Time-windowed Joins
带时间窗口的join是由join为此定义的
它会检查输入记录的时间属性是否在一定的时间限制内,即时间窗口。
与常规连接操作相比,这种连接只支持具有时间属性的 append-only table。时间属性是准单次递增的,Flink可以在不影响结果正确性的情况下从其保存的状态中移除旧值(过期的状态值)。
Join with a Temporal Table Function
Flink 状态管理和容错
状态计算
有状态计算
定义
程序在计算过程中,在Flink内部存储计算产生的中间结果,并提供给后续的Function或算子计算使用
状态数据存储
本地存储
Flink堆内内存
Flink堆外内存
第三方存储介质
RocksDB
自定义缓存系统
Flink 状态类型及应用
状态类型
根据数据集是否根据key进行分区,将状态分为Keyed State 和 Operator State(Non-Keyed State)
类型
Keyed State
Keyed State 是 Operator State 的一种特例
Keyed State 实现按照key对数据集进行了分区,每个Keyed State仅针对一个Operator和Key的组合
Keyed State 可以通过Key Groups进行管理
主要用于算子的并行度发生变化时,自动重新分布Keyed State的数据
在系统运行过程中,一个Keyed算子实例可能运行一个或多个Key Groups 的keys
Operator State
Operator State 只和并行算子绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素的一部分状态数据
Operator State 支持当算子实例的并行发生变化是自动重新分配数据状态
Keyed State和Operator State 均有两种形式
托管状态(Managed State)
由Flink控制和管理状态数据,并将状态数据转换成内存Hash Tables 或RocksDB的对性存储
然后将这些状态数据通过内部的接口持久化到CheckPoints中
任务异常时,可以通过这些状态数据恢复任务
原生状态(Raw State)
由算子自己管理数据结构
在触发CheckPoint是,Flink并不知道状态数据的数据结构,只负责将数据转换成bytes数据保存到CheckPoint中
当从CheckPoints中恢复任务的时候反序列化出状态的数据结构
Managed Keyed State
ValueState[T]
与Key对应单个值的状态,比如 每个统计用户的交易次数,都需要在状态值count上在进行更新
value()
返回状态的当前值。
update(T value)
更新状态方法
ListState[T]
与Key对应元素列表的状态,状态中存放的是元素的List列表,如定义ListState[T]存储用户常访问IP
Iterable get()
获取元素
update(List<T> values)
更新元素
add(IN value)/addAll(List<T> values)
添加元素
ReducingState[T]
与key相关的数据元素单个聚合值的状态,存储经过指定ReduceFunction计算之后的指标,需要指定ReduceFunction完成状态数据的聚合
输入和输出类型必须相同
add(IN value)
OUT get()
与key相关的数据元素单个聚合值的状态,存储经过指定AggregateFunction计算之后的指标,需要指定AggregateFunction完成状态数据的聚合
输入类型和输出类型不一定相同
与key对应键值对的状态,用于维护具有key-value结构类型的状态数据
UV get(UK key)
类HaspMap API
remove(UK key)
contains(UK key)
Iterable<UK> keys()
Iterable<UV> values()
Managed Operator State
Flink中可以实现CheckpointedFunction 或 ListCheckpointed<T extends Serializable> 两个接口来定义操作Managed Operator State 的函数
CheckpointedFunction
在每个算子中Managed Operator State都是以List的形式存储
算子和算子之间的状态数据是相互独立的
List 存储适合状态数据的重新分布
需要实现的方法
snapshotState(FunctionSnapshotContext context)
触发checkpoint的时候将调用此方法
这充当了到函数的挂钩,保所有状态都是通过在函数初始化时通过FunctionInitializationContext先前提供的方法公开的,或者现在通过FunctionSnapshotContext本身提供的方法公开的。
initializeState(FunctionInitializationContext context)
此方法在分布式执行期间创建并行函数实例时调用。
通常在此方法中设置其状态存储数据结构。
重分布策略
Even-split Redistribution
每个算子实例含有部分状态元素的List列表
整个状态数据是所有List列表的合计
当触发resyore/redistribution时,通过将状态数据平均分成与算子并行度相同数量的List列表
每个task实例中有一个List,其中包含零个或多个元素
Union Redistribution
每个算子实例含有所有状态元素的List列表
当触发resyore/redistribution时,每个算子都能获得完整的状态元素列表
实现代码,见备注
并行度测试
并行度为1;env.setParallelism(1)
代码案例
并行度为4;env.setParallelism(1)
ListCheckpointed<T extends Serializable>
只支持List类型的状态
数据恢复时只支持Even-split Redistribution策略
获取函数的当前状态。状态必须反映此函数之前所有调用的结果。
restoreState(state: util.List[Long])
将函数或运算符的状态还原为前一个检查点的状态。
测试后续补充,报错了
StateDescriptor
Flink通过创建StateDescriptor来获取相印的Sate操作类
StateDescriptor定义了状态的名称、状态中数据的类型参数、状态自定义函数
各种状态对应的实现
ValueStateDescriptor<T>
ListStateDescriptor<T>
ReducingStateDescriptor<T>
Stateful Function 定义
完成对输入数据最大uid的获取
State生命周期
任何类型的Keyed State都可以设定状态的生命周期(TTL),确保能够在规定的时间内及时的清理状态数据
设置状态的生命周期
TTL更新操作
setUpdateType
Disabled
TTL是禁用的。状态不会过期
OnCreateAndWrite
创建和写入时候更新TTL
OnReadAndWrite
创建、读取和写入都更新TTL
存在问题
所有的数据状态的TTL都是通过读取或者写入的时间进行更新的
如果某个状态指标一直不被使用或更新,则永远不会触发对该状态数据的清理,可能导致状态数据越来越大
解决
cleanupFullSnapshot
通过cleanupFullSnapshot设置触发STATE SNAPSHOT的时候清楚状态数据
不适用于RockDB做增量Checkpointing的操作
cleanupInBackground()
可见性问题
返回条件
过期数据是否被清理
过期数据未被清理是否返回
setStateVisibility
ReturnExpiredIfNotCleanedUp
如果未清除过期的值,则返回该值
NeverReturnExpired
永远不要返回过期的值
Scala DataStream API直接使用状态
mapWithState
实例
filterWithState
flatMapWithState
无状态计算
不会储存计算过程中计算的结果
也不会将结果用于下一步的计算过程
程序只会在当前的计算流中完成计算,计算完后就输出结果,然后接入下一条数据继续处理
使用场景
只要不需要使用到中间计算结果的
GEP(复杂事件处理)
获取符合一定规则的事件,状态计算可以将接入的事件进行存储,然后等待符合规则的事件触发
按照分、时、天统计pv、uv
需要利用状态来维护当前计算过程中产生的结果,比如范围内总次数、总用户数
机器学习的模型训练
在Stream上实现机器学习的模型训练,有状态计算可以帮助用户维护当前版本模型使用的参数
Checkpoints 和 Savepoint
Checkpoints 检查点机制
Flinkz中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制
可以将统一时间点task/operator的状态数据(Keyed State、Operator State)进行全局统一快照处理
当应用出现异常时,operator可以根据上一次快照恢复所有算子之前的状态,保证数据的一致性
快照产生的过程非常轻量,高频创建且对Flink性能影响较小
Checkpoints过程中间状态一般保存在可配置环境中,通常是JobManger结点或HDFS上
配置
env.enableCheckpointing(10000)
每隔10s进行启动一个检查点
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
设置模式为:exactly_one,仅一次语义(默认)
env.getCheckpointConfig.setCheckpointTimeout(60000)
检查点必须在1分钟之内完成,或者被丢弃【checkpoint超时时间】 默认10分钟
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
确保检查点之间有1s的时间间隔【checkpoint最小间隔】
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
同一时间只允许进行一次检查点
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
设置周期性的外部检查点
将状态数据持久化到外部系统中,此时不会在任务正常挺值得时候清理检查点数据
此时可通过外部检查点对任务进行恢复
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
设置可容忍的检查点故障数,默认值为0,表示不容忍任何检查点故障。
env.getCheckpointConfig.setFailOnCheckpointingErrors() 已经弃用
Savepoint 机制
Savepoint 是Checkpoint的一种特殊实现,底层也是Checkpoint实现的
Savepoint 是用户以手工命令的方式触发Checkpoints ,并将结果持久化到指定路径
在升级或维护急群众保存系统中的状态数据,避免数据丢失或失去恰好一次语义保证
Operator ID 配置
如果您不手动指定id,它们将自动生成。
只要这些id不变,就可以从保存点自动恢复。
生成的id取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些id。
Savepoint 状态
Savepoint 操作
可以用命令行客户端来触发Savepoint 、取消具有Savepoint 的作业、从Savepoint 恢复,以及释放Savepoint
触发Savepoint
Triggering Savepoints
当触发一个保存点时,将创建一个新的保存点目录,存储数据和元数据。
可以通过配置一个默认的目标目录或者使用触发器命令指定一个自定义目标目录
目标目录必须是JobManager和TaskManager都可以访问的位置,例如分布式文件系统上的位置。
操作
触发一个Savepoint
bin/flink savepoint :jobId [:targetDirectory]
这将触发ID:jobId作业的保存点,并返回创建的保存点的路径。
用YARN触发Savepoint
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发带触发ID:jobId和YARN应用程序ID:yarnAppId的作业的,并返回创建的Savepoint的路径。
使用Savepoint取消作业
bin/flink cancel -s [:targetDirectory] :jobId
触发ID:jobid作业的Savepoint ,并取消作业
从Savepoint中恢复任务
bin/flink run -s :savepointPath [:runArgs]
从指定Savepoint 中恢复任务
允许Non-Restored状态
某些情况下应用的算子和Savepoint中的算子状态可能不一致,可能出现无法恢复的状况
可以通过--allowNonRestoredState (short: -n) 来忽略无法匹配的问题
释放Savepoints数据
bin/flink savepoint -d :savepointPath
配置一个默认的保存点目标目录
state.savepoints.dir: hdfs:///flink/savepoints
状态管理器
Querable State
外部读写
TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction 提取了两阶段提交协议的通用逻辑,基于此将 Flink 和支持事务的外部系统结合,构建端到端的 Exactly-Once 成为可能。
基于输出到文件的简单示例
beginTransaction
在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
preCommit
在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个 checkpoint 的任何后续文件写入启动一个新的事务。
commit
在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
abort
在中止阶段,我们删除临时文件。
到Mysql的Exactly-Once
思路
checkpoint每10s进行一次,用FlinkKafkaConsumer
数据处理完后进行一次预提交数据库的操作
预提交成功
进行真正的插入数据库操作
插入成功
进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中
插入失败
数据回滚,恢复到上一个cp点
预提交出错
Flink程序就会进入不断的重启中,重启的策略可以在配置中设置
checkpoint记录的还是上一次成功消费的offset,下一次的checkpoint也不会做
本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了
此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。
只有将异常数据处理完后,重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据
Flink 介绍
数据架构的衍变
传统数据架构
传统单体数据架构
微服务架构
大数据数据架构
批计算(离线)
实时计算
有状态流计算架构
状态:计算过程中产生的中间结构
每次计算新的数据进入到流系统中都是给予中间状态结果的基础上计算。
Flink的优势
支持高吞吐、低延迟、高性能
支持事件时间概念
支持有状态计算
支持高度灵活的窗口操作
基于轻量级分布式快照实现容错
给予JVM实现独立的内存管理
Save Point保存点
Flink应用场景
实时智能推荐
复杂事件处理
实时欺诈检测
实时数据仓库与ETL
流数据分析
实时报表分析
Flink 基本架构
基本组件栈
API&Libraries层
GEP(复杂事件处理库)
SQL&Table库
DataSetAPI
FlinkML
Gelly
Runtime核心层
负责对上层不同接口提供基础服务,是Flink分布式计算框架的核心实现层
物理部署层
本地
集群
Standalone
Yarn
云
GEC
EC2
Kubenetes
基本架构
遵循Master-Slave架构设计原则,主要包含JobManager、TaskManager两个组件
组件间的通信协议:Akka框架
组成
Flink 客户端
负责提交任务到集群
与JobMananager建立Akka连接
JobMananager
集群任务的调度以及资源的管理
与Flink 客户端 、TaskManager通信,保证任务的执行
任务完成后,反馈结果给客户端并释放资源
TaskManager
负责具体执行任务和对应任务节点上的资源深情与管理
数据类型和序列化
Flink中的类型处理
通过引用字段名(如dataSet.keyBy(\"username\")来使用pojo类型和分组/连接/聚合它们
类型信息允许Flink尽早检查(打印错误和类型兼容性),而不是在运行时失败。
Flink对数据类型的了解越多,序列化和数据布局方案就越好。
最后,它还使用户在大多数情况下不必担心序列化框架和必须注册类型。
最常见的问题
注册子类型(Registering subtypes)
如果函数签名只描述超类型,但是它们实际上在执行期间使用了这些超类型的子类型,那么让Flink意识到这些子类型可能会大大提高性能。
为此,在StreamExecutionEnvironment或ExecutionEnvironment上为每个子类型调用 .registerType(clazz)。
注册自定义序列化器(Registering custom serializers)
对于自己不能透明地处理的类型,Flink会返回到Kryo。
但是并不是所有类型都可以由Kryo(也就是由Flink)无缝地处理。
解决方案是为引起问题的类型注册额外的序列化器。
许多库中都提供了额外的Kryo序列化器。
添加类型提示(Adding Type Hints)
当Flink无法推断出泛型类型时,有时用户必须传递类型提示。
这通常只在Java API中需要。
手动创建类型信息(Manually creating a TypeInformation)
这对于某些API调用可能是必要的,因为Java的泛型类型擦除导致Flink无法推断数据类型。
TypeInformation
TypeInformation 是所有类型描述符的基类。
它揭示了类型的一些基本属性,可以生成序列化器,在专门化中,还可以生成类型的比较器。
Flink 数据类型
基本类型
所有Java原语及其装箱形式
void、String、Date、BigDecimal和BigInteger。
基本数组和对象数组
复合类型
Flink Java Tuples (Flink Java API的一部分)
最多25个字段,不支持空字段
Scala case类(包括Scala元组)
不支持空字段
具有任意数量的字段和支持空字段的元组
POJOs
遵循某种类似于bean的模式的类
辅助类型
泛型
这些不会被Flink本身序列化,而是被Kryo序列化。
POJO类型的规则
类是公共的并且是独立的(没有非静态的内部类)
该类有一个公共的无参数构造函数
类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(和非final的),要么有一个公共的getter和setter方法,该方法遵循getter和setter的Java bean命名约定。
注意,当用户定义的数据类型不能被识别为POJO类型时,必须将其处理为GenericType并使用Kryo进行序列化。
DataSet API 介绍与使用
DataSet 概述
DataSet 最初是从某些来源(例如,通过读取文件,或从本地集合)创建的。
结果通过sink返回,例如将数据写入(分布式)文件、写入标准输出(例如命令行终端)。
DataSet API
DataSet API 用于处理批量数据,Flink将接入的数据转换成DataSet数据集
开发相关依赖包,见注释
一般开发流程
创建ExecutionEnvironment环境
读取数据
利用DataSet API 提供的Transformation进行数据转换
输出结果
Data Sources 数据接入
创建DataSet的机制一般抽象于InputFormat
Flink提供了几种内置格式,用于从常见的文件格式创建数据集,其中许多可以在ExecutionEnvironment找到快捷方法
数据接入的种类
File-based 文件系统
readTextFile(path) / TextInputFormat
按行读取文件并将其作为DataSet[String]返回。
readTextFileWithValue(path) / TextValueInputFormat
按行读取文件并将其作为StringValues返回。StringValues是可变的字符串。
通过StringValue存储文本数据可以有效降低Strin对象创建数量,减少性能开销
readCsvFile(path) / CsvInputFormat
解析逗号(或其他字符)分隔字段的文件。
返回一个tuples或POJOs数据集。支持基本的java类型及其对应的值作为字段类型。
使用给定的分隔符解析新行(或另一个字符序列)分隔的基本数据类型(如字符串或整数)。
Collection-based
fromCollection(Seq)
从一个Seq创建一个数据集
fromCollection(Iterable)
从一个Iterable创建一个数据集。由Iterable返回的所有元素必须是相同类型的。
fromCollection(Iterator)
从一个Iterable创建一个数据集。由Iterator返回的所有元素必须是相同类型的。
fromElements(elements: _*)
根据给定的对象序列创建数据集。所有对象必须具有相同的类型。
fromParallelCollection(SplittableIterator)
从iterator并行地创建数据集。该类指定迭代器返回的元素的数据类型。
并行地生成给定区间内的数字序列。
Generic 通用类型
DataSet API中提供了InputFormat 通用的数据接口,以接入不同的数据源和格式类型的数据
接口类型
基于文件类型
基于通用数据类型接口如:RDBMS、NoSQL
createInput(inputFormat) / InputFormat
接口
自定义文件类型输入源,将指定格式文件读取并转换成DataSet
自定义通用型数据源,将读取的数据转换成DataSet数据集
案例:读取mysql数据做为数据源
一些特殊输入案例
递归遍历输入路径目录
对于基于文件的输入,当输入路径是目录时,默认情况下不会遍历目录文件。
相反,只读取基本目录中的文件,而忽略嵌套文件。方法启用嵌套文件的递归枚举
读取压缩文件
Flink目前支持输入文件的透明解压,如果这些文件用适当的文件扩展名进行了标记。
特别是,这意味着不需要进一步配置输入格式,任何FileInputFormat都支持压缩,包括自定义输入格式。
压缩文件可能无法并行读取,从而影响作业的可伸缩性。
DataSet Transformations 数据集转换
针对数据集的转换操作
转换的实质是将DataSet转换成另一个新的DataSet,然后将各个DataSet的转换连接成有向无环图,并基于DAG完成对批量数据的处理
数据处理
Map
获取一个元素并生成一个元素,数据分区不发生变化
FlatMap
获取一个元素并生成零个、一个或多个元素。包括空值
MapPartition
类似Map,MapPartition是基于DataSet分区对数据进行处理,以“iterator”的形式获取分区数据,并可以生成任意数量的结果值
每个分区中的元素数量取决于并行度和以前的操作
Filter
根据传入的条件对每条数据进行过滤,只有判定为Truede 的数据元素才会传输到下游的DataSet中
聚合操作
Reduce
通过将两个元素重复组合为一个元素,将一组元素组合为单个元素。
Reduce可以应用于完整的数据集,也可以应用于分组的数据集。
ReduceGroup
将一组元素组合成一个或多个元素。ReduceGroup可以应用于完整的数据集,也可以应用于分组的数据集。
Aggregate
聚合可以应用于完整的数据集,也可以应用于分组的数据集。
可以对最小、最大和聚合使用简写语法。
Distinct
返回数据集中不同的元素。它从输入数据集中删除与元素的所有字段或字段子集相关的重复项。
多表关联
Join
根据指定条件关联两个数据集,然后根据所选字段形成一个新数据集
关联的key可以是key的表达式、Key-selector函数、字段位置以及Case Class字段指定
注意,连接转换只对等连接起作用。其他连接类型需要使用OuterJoin或CoGroup来表示。
可以根据Size Hint 标记数据及大小,Flink根据用户给定的线索调整计算策略
joinWithTiny
第二个数据集是小数据集
joinWithHuge
第二个数据集是大数据集
可以通过连接提示指定运行时执行连接的方式。描述了连接是通过分区还是广播进行的,以及它是使用基于排序的算法还是基于散列的算法。
JoinHint
BROADCAST_HASH_FIRST
将第一个数据集广播出去,并转换成HashTable存储,适合第一个数据集较小的情况
BROADCAST_HASH_SECOND
将第二个数据集广播出去,并转换成HashTable存储,适合第二个数据集较小的情况
OPTIMIZER_CHOOSES
与不设定JoinHint相同,优化的工作交给系统
REPARTITION_HASH_FIRST
将两个数据集重分区,将第一个数据集转换成HashTable存储,适用于第一个数据集比第二个数据集小,但是两个数据集都相对较大的情况
REPARTITION_HASH_SECOND
将两个数据集重分区,将第二个数据集转换成HashTable存储,适用于第二个数据集比第一个数据集小,但是两个数据集都相对较大的情况
REPARTITION_SORT_MERGE
将两个数据集重分区,并将每个分区排序,适用于两个数据集已经排好序的情况
OuterJoin
OuterJoin对两个数据集进行外关联
包含的关联方式
left
leftOuterJoin
right
rightOuterJoin
full outer join
fullOuterJoin
类似Join,但是并不是所有都支持,简单的总结就是左右关联外表支持广播、重分区哈希,全关联是不支持广播、重分区哈希
CoGroup
将两个数据集根据key组合在一起,相同的key会放在同一个Group中,如果制定的key尽在一个数据集中有记录,则会将Group 与空的Group 关联
Cross
将两个数据集合并成一个数据集,合并的的及结果是两个数据集的笛卡尔积
集合操作
Union
生成两个数据集的并集,两个数据集的格式必须相同
Rebalance
对数据集中的数据进行平均分布,使每个分区上的数据量相同,以消除数据倾斜
只有Map-like 的转换才可能遵循再平衡转换。
Hash-Partition
按给定键对数据集进行哈希分区。键可以指定为位置键、表达式键和键选择器函数。
Range-Partition
给定键上对数据集进行范围分区
Sort Partition
按指定顺序对指定字段上的数据集的所有分区进行本地排序。
Custom Partitioning
使用自定义分区器函数基于特定分区的键分配记录。
排序操作
First-n
返回数据集的前n个(任意)元素。first -n可以应用于常规数据集、分组数据集或分组排序的数据集。
MinBy / MaxBy
从数据集中返回指定字段或组合对应的最小或最大的记录
Data Sinks
数据接收器使用数据集并用于存储或返回它们。数据接收操作使用OutputFormat进行描述
三中数据输出类型
基于文件输出接口
writeAsText() / TextOutputFormat
将元素按行写入为字符串。通过调用每个元素的toString()方法获得字符串。
writeAsCsv(...) / CsvOutputFormat
将元组写入逗号分隔的值文件。行和字段分隔符是可配置的
write() / FileOutputFormat
方法和自定义文件输出的基类。支持自定义对象到字节的转换。
通用输出接口
output()/ OutputFormat
使用自定义的OutputFormat来实现输出
HadoopOutputFormat
JDBCOutputFormat
. . .
客户端输出
print() / printToErr()
打印标准输出/标准错误流中每个元素的toString()值。
Iteration Operators 迭代计算
全量迭代
过程
Iteration Input
初始化数据,通过DataSource算子读取或从其他转换中接入
Step Function
Step Function将在每次迭代中执行,它是由map、reduce、join等操作符组成的任意数据流,取决于您手头的特定任务。
将结合数据集以及上一次迭代计算的Solution数据集进行本次迭代计算
Next Partial Solution
每次迭代计算的输出结果称为Next Partial Solution,该结果会被作为下一次迭代计算的输入数据
Iteration Result
最后一次迭代的输出被写入DataSink,或用作以下操作符的输入。
迭代终止的两种条件
达到最大迭代次数
指定迭代的最大次数,当计算次数超过该设定阀值,终止迭代
符合自定义聚合器收敛条件
用户自定义的聚合器和收敛条件
增量迭代
Iteration Input
从数据源读取initial workset或从以前的操作符中读取solution set,作为第一次迭代的输入。
Step Function将在每次迭代中执行
Next Workset/Update Solution Set
Iteration Result
0 条评论
下一页
为你推荐
查看更多