Flink学习笔记
2022-07-12 10:10:37 104 举报
AI智能生成
Flink学习笔记
作者其他创作
大纲/内容
Flink介绍
Flink简介
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。
Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
Flink提供分层API:ProcessFunction、DataStream API、Table API和Flink SQL,以满足不同层次的开发需求
Flink能够保证计算的正确性,以低延迟和高吞吐量著称。
Flink框架处理流程
Flink应用场景
事件驱动型应用(Event-driven Applications)
传统事务处理
在传统架构中,应用需要读写远程事务型数据库,然后触发事件计算或者是外部操作等。
在这种设计中,计算和存储是分离。将数据持久化到远程的事务型数据库中,同时读写数据库。
事件驱动应用程序
一种有状态的应用程序,它从一个或多个事件流中获取事件,并通过触发计算、状态更新或外部操作对传入事件做出反应。
事件驱动型应用是在计算存储分离的传统应用基础上进化而来。
相反,事件驱动型应用是基于状态化流处理来完成。
在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。
无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。
系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。
传统事务处理和事件驱动应用程序的区别
计算和存储是否分离。
传统事务型是无状态的,可以水平扩展。事件驱动应用程序是有状态的,水平扩展有限制。
架构区别
应用场景
欺诈检测(Fraud detection)
异常检测(Anomaly detection)
基于规则的告警(Rule-based alerting)
业务流程监控(Business process monitoring)
Web应用程序(社交网络)
数据分析应用(Data Analytics Applications)
什么是数据分析应用
数据分析任务需要从原始数据中提取有价值的信息和指标。
传统数据分析应用
通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。
为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告
流式数据分析应用
流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。
这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。
仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。
由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低
传统数据分析应用和流式数据分析应用
电信网络质量监控
移动应用中的产品更新及实验评估分析
消费者技术中的实时数据即席分析
实时推荐系统
数据管道应用(Data Pipeline Applications)
电子商务中的实时查询索引构建
电子商务中的持续 ETL
Flink在行业中的典型应用
电商和市场营销
实时数据报表
广告投放
实时推荐
物联网(IOT)
传感器实时数据采集和显示
实时报警
物流配送和服务业
订单状态实时更新
通知信息推送
银行和金融业
实时结算
通知推送
实时检测异常行为
Flink在阿里的应用场景
实时监控
用户行为预警
对用户行为或者相关事件进行实时监测和分析,基于风控规则进行预警
app crash 预警
服务器攻击预警
实时报表
双11、双12等活动直播大屏
流数据分析
实时计算相关指标反馈及时调整决策
内容投放、无线智能推送、实时个性化推荐等
实时仓库
数据实时清洗、归并、结构化
数仓的补充和优化
流式数据处理的发展和演变
流处理和批处理
传统事务处理
有状态的流处理
Lambda架构
新一代流处理器
Flink的特点和优势
Flink核心特性
分层API
Flink vs Spark
数据处理架构
数据模型和运行架构
Spark还是Flink
部署模式和资源提供者
简介
Flink 是一个多功能框架,以混合搭配的方式支持许多不同的部署场景
对于资源消耗的不同,有多种部署模式,同时也支持多种资源提供者
部署模式
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。
Flink 为各种场景提供了三种的部署模式
会话模式(Session Mode)
预先启动一个Flink集群,保持一个会话,在这个会话中通过客户端提交作业。
集群所以所有提交的作业会竞争集群中的资源。集群的生命周期是超越于作业之上的。
会话模式比较适合于单个规模小、执行时间短的大量作业。
多个作业共享一个集群,集群生命周期高于作业,作业通过客户端提交
单作业模式(Per-Job Mode)(1.15版本已经废弃)
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个
提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。
提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。
单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。
同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。
作业作业完成后,集群就会关闭,所有资源也会释放。
Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管
理框架来启动集群,比如 YARN、Kubernetes
理框架来启动集群,比如 YARN、Kubernetes
一个作业就是一个集群,在客户端提交程序。
应用模式(Application Mode)
不管是会话模式还是单作业模式,应用程序都是在客户端执行,然后由客户端提交给JobMananger
这使得客户端成为一个沉重的资源消耗者,因为它可能需要大量的网络带宽来下载依赖项并将二进制文件发送到集群,并且需要 CPU 周期来执行 main()
所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。
而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。
这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式
直接启动JobManager执行应用代码,创建一个集群。一个作业对应一个集群,作业和集群生命周期一致。
三种模式的主要区别
集群的生命周期以及资源的分配方式
作业和集群的对应关系以及生命周期
以及应用的 main 方法到底在哪里执行——客户端(Client)还是 JobManager
资源提供者
Standalone模式
Yarn模式
Kubernetes模式
运行架构
Flink运行时的组件
任务提交流程
任务调度原理
DataStream API(基础篇)
概述
一个Flink程序,其实就是对DataStream的各种转换操作,具体来说,代码基本上都由以下几
部分构成
部分构成
获取执行环境(execution environment)
读取数据源(source)
定义基于数据的转换操作(transformations)
定义计算结果的输出位置(sink)
触发程序执行(execute)
Flink程序执行流程
执行环境(StreamExecutionEnvironment)
简介
Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交
到远程集群上运行
到远程集群上运行
Env在Flink程序中的位置
创建执行环境
StreamExecutionEnvironment是执行环境的核心类。在代码中创建执行环境的方式,
就是调用这个类的静态方法。一般而言直接调用getExecutionEnvironment即可
就是调用这个类的静态方法。一般而言直接调用getExecutionEnvironment即可
getExecutionEnvironment
它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;
如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境
如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度
val localEnv = StreamExecutionEnvironment.createLocalEnvironment()
createLocalEnvironmentWithWebUI
该方法返回一个本地执行环境。同时启动了一个web服务端,有ui界面展示程序运行信息
需要添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
val webUIEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,
并指定要在集群中运行的 Jar 包
并指定要在集群中运行的 Jar 包
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(host = "", port = 0, "")
执行模式
Flink是流、批统一的框架,既支持流处理也支持批处理。从1.12.0版本开始,Flink实现了API上的流批统一。
DataStream API新增了一个重要特性:可以支持不同的"执行模式"(execution mode),
通过简单的设置就可以让一段Flink程序在流处理和批处理之间切换
通过简单的设置就可以让一段Flink程序在流处理和批处理之间切换
DataStream支持三种模式,流执行模式、批执行模式、自动模式
流执行模式(STREAMING)
Flink默认的执行模式,用于处理无界数据流
批执行模式(BATCH)
用于处理有界数据流,需要通过env或者命令行参数进行显示设定
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
bin/flink run -Dexecution.runtime-mode=BATCH ...
自动模式(AUTOMATIC)
Flink框架根据数据流是否有界自行决定使用哪种执行模式
如何选择
Flink虽然可以把批量数据看成有界的流,在STREAMING模式下执行,
但是在STREAMING模式下,数据来一条就会计算一次。
在BATCH模式下只有数据全部处理完之后,才会一次性输出结果。
虽然二者最终结果相同,但是STREAMING模式输出了更多的中间结果,显然BATCH模式更加高效
最终结论:无界数据流选择流执行模式,有界数据流选择批处理
触发程序执行
Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)
需要显式地调用执行环境的execute()方法,来触发程序执行。
execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)
execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)
env.execute()
源算子(Source)
概述
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理
一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。
所以Source就是我们整个处理程序的输入端
Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法
val stream = env.addSource(...)
方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的 source function,
通常情况下足以应对我们的实际需求
通常情况下足以应对我们的实际需求
Source在Flink程序中的位置
从集合中读取数据
直接调用env的fromCollection方法即可,一般用于本地测试。
同类型的方法还有fromElements、fromSequences等
同类型的方法还有fromElements、fromSequences等
代码示例
import com.yanggu.flink.lowlevelapi.pojo.SensorReading
import org.apache.flink.streaming.api.scala._
/**
* 从集合中创建Source
* fromCollection、fromElements、fromSequences
*/
object SourceFromList {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(List(
SensorReading("sensor_1", System.currentTimeMillis(), 35.8),
SensorReading("sensor_2", System.currentTimeMillis(), 45.8),
SensorReading("sensor_3", System.currentTimeMillis(), 65.8)
))
stream.print("stream1: ").setParallelism(1)
env.execute()
}
}
import org.apache.flink.streaming.api.scala._
/**
* 从集合中创建Source
* fromCollection、fromElements、fromSequences
*/
object SourceFromList {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(List(
SensorReading("sensor_1", System.currentTimeMillis(), 35.8),
SensorReading("sensor_2", System.currentTimeMillis(), 45.8),
SensorReading("sensor_3", System.currentTimeMillis(), 65.8)
))
stream.print("stream1: ").setParallelism(1)
env.execute()
}
}
从文件中读取数据
从存储介质中获取数据,一个比较常见的方式就是读取日志文件
调用env的readTextFile方法即可。这里需要传入一个文件路径
参数可以是目录,也可以是文件
路径可以是相对路径,也可以是绝对路径
相对路径是从系统属性 user.dir 获取路径: idea 下是 project 的根目录, standalone 模式下是集群节点根目录
也可以从 hdfs 目录下读取, 使用路径 hdfs://...
代码示例
import org.apache.flink.streaming.api.scala._
/**
* 从文件读取数据
*/
object SourceFromFile {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//
val stream = env.readTextFile(getClass.getResource("/hello.txt").getPath)
val value = stream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
value.print("stream1: ")
.setParallelism(1)
env.execute()
}
}
/**
* 从文件读取数据
*/
object SourceFromFile {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//
val stream = env.readTextFile(getClass.getResource("/hello.txt").getPath)
val value = stream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
value.print("stream1: ")
.setParallelism(1)
env.execute()
}
}
从Socket中读取数据
不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的
使用socket 文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试
代码示例
import org.apache.flink.streaming.api.scala._
/**
* 从socket中读取数据, 一般用于测试。且不能设置并行度, 只能为1
*/
object SourceFromSocket {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//socketTextStream返回的是String类型
env.socketTextStream("localhost", 8888)
.print("socket source test").setParallelism(1)
env.execute()
}
}
/**
* 从socket中读取数据, 一般用于测试。且不能设置并行度, 只能为1
*/
object SourceFromSocket {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//socketTextStream返回的是String类型
env.socketTextStream("localhost", 8888)
.print("socket source test").setParallelism(1)
env.execute()
}
}
window系统,netcat工具下载地址,使用nc64 -Lp 端口号 命令即可
从Kafka中读取数据(重点)
Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。
而消息队列的传输方式,恰恰和流处理是完全一致的。
所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星
在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算
Kafka和Flink流处理架构图
Flink 提供了一个Apache Kafka连接器,用于从 Kafka 主题读取数据和将数据写入到 Kafka 主题中,并保证一次性保证
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
使用建造者模式构建Kafka Source,核心参数Kafka Server地址、topics、consumer group id、offset策略
后面需要调用env的fromSource方法,需要设置Watermark策略和Source的名称
代码示例
import cn.hutool.core.convert.Convert
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import java.util.Properties
/**
* 从kafka中读取数据
* flink官网关于kafka的source
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
*/
object SourceFromKafka {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//broker集群地址
val brokerList = "localhost:9092"
//通过","拼接的多个topic
val topics = "test"
//消费者组id
val groupId = "test-groupId"
//配置kafka的properties
val prop = new Properties
//使用建造者模式构建KafkaSource
val kafkaSource = KafkaSource.builder[String]()
//broker地址
.setBootstrapServers(brokerList)
//读取的topic
.setTopics(Convert.toList(classOf[String], topics))
//消费者组id
.setGroupId(groupId)
//value的序列化策略, 一般使用String即可
.setValueOnlyDeserializer(new SimpleStringSchema)
//设置kafka consumer的其他属性, 例如autocommit等
.setProperties(prop)
//从消费者组提交的偏移量开始,如果提交的偏移量不存在,读取最早的偏移量的数据
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build()
val kafkaDataStream = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), s"Kafka $topics Source")
//抽取出共用方法
//val stream2 = KafkaUtil.getKafkaDataStream(environment, brokerList, topics, groupId, prop)
kafkaDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print()
//开启任务
environment.execute()
}
}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import java.util.Properties
/**
* 从kafka中读取数据
* flink官网关于kafka的source
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
*/
object SourceFromKafka {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//broker集群地址
val brokerList = "localhost:9092"
//通过","拼接的多个topic
val topics = "test"
//消费者组id
val groupId = "test-groupId"
//配置kafka的properties
val prop = new Properties
//使用建造者模式构建KafkaSource
val kafkaSource = KafkaSource.builder[String]()
//broker地址
.setBootstrapServers(brokerList)
//读取的topic
.setTopics(Convert.toList(classOf[String], topics))
//消费者组id
.setGroupId(groupId)
//value的序列化策略, 一般使用String即可
.setValueOnlyDeserializer(new SimpleStringSchema)
//设置kafka consumer的其他属性, 例如autocommit等
.setProperties(prop)
//从消费者组提交的偏移量开始,如果提交的偏移量不存在,读取最早的偏移量的数据
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build()
val kafkaDataStream = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), s"Kafka $topics Source")
//抽取出共用方法
//val stream2 = KafkaUtil.getKafkaDataStream(environment, brokerList, topics, groupId, prop)
kafkaDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print()
//开启任务
environment.execute()
}
}
从RabbitMQ中读取数据
Flink提供了从RabbitMQ中读取数据的Connector,需要引入对应的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
使用建造者模式构建RabbitMQ的连接配置对象(host、port、username、password等信息)。基于配置对象创建RMQSource对象
代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
/**
* 从rabbitmq中获取数据
* flink官方网站
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/rabbitmq/
*/
object SourceFromRabbitMQ {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val tool = ParameterTool.fromPropertiesFile(getClass.getResourceAsStream("/rabbitmq.properties"))
//配置rabbitmq的连接信息
val rabbitmqConnectConfig = new RMQConnectionConfig.Builder()
.setHost(tool.get("rabbitmq.host"))
.setPort(tool.getInt("rabbitmq.port"))
.setVirtualHost(tool.get("rabbitmq.virtualHost"))
.setUserName(tool.get("rabbitmq.username"))
.setPassword(tool.get("rabbitmq.password"))
.build()
environment
.addSource(new RMQSource[String](rabbitmqConnectConfig, tool.get("rabbitmq.source.queue"), new SimpleStringSchema))
.print()
environment.execute("SourceFromRabbitMQ Job")
}
}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
/**
* 从rabbitmq中获取数据
* flink官方网站
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/rabbitmq/
*/
object SourceFromRabbitMQ {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val tool = ParameterTool.fromPropertiesFile(getClass.getResourceAsStream("/rabbitmq.properties"))
//配置rabbitmq的连接信息
val rabbitmqConnectConfig = new RMQConnectionConfig.Builder()
.setHost(tool.get("rabbitmq.host"))
.setPort(tool.getInt("rabbitmq.port"))
.setVirtualHost(tool.get("rabbitmq.virtualHost"))
.setUserName(tool.get("rabbitmq.username"))
.setPassword(tool.get("rabbitmq.password"))
.build()
environment
.addSource(new RMQSource[String](rabbitmqConnectConfig, tool.get("rabbitmq.source.queue"), new SimpleStringSchema))
.print()
environment.execute("SourceFromRabbitMQ Job")
}
}
自定义Source
如果Flink没有提供对应的Connector。这时候需要自定义类实现SourceFunction接口
自定义类实现SourceFunction接口,需要重写两个核心方法run方法和cancel方法。
需要传入一个泛型,表示自定义的Source输出的数据类型
run方法通过上下文对象(SourceContext)向下游发送数据,cancel方法在Flink程序被取消时调用,一般用于控制循环的标志位
SourceFunction的并行度只能设置为1, 如果要自定义并行的Source需要实现ParallelSourceFunction
代码示例
import com.yanggu.flink.lowlevelapi.pojo.SensorReading
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import java.util.concurrent.TimeUnit
import scala.util.Random
/**
* 自定义Source, 实现SourceFunction接口, 重写run()方法和cancel()方法即可
* SourceFunction的并行度只能设置为1, 如果要自定义并行的Source需要实现ParallelSourceFunction
*/
object SourceFromMySource {
def main(args: Array[String]): Unit = {
//创建环境对象
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//添加自定义的source
//设置成多并行度会直接报错
val streaming = environment.addSource(new MySensorSource)/*.setParallelism(2)*/
//打印, 并且设置并行度为1
streaming.print().setParallelism(1)
//开启任务
environment.execute()
}
}
class MySensorSource extends SourceFunction[SensorReading] {
// flag: 表示数据源是否还在运行
var running = true
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
val random = new Random
while (running) {
val curTime = System.currentTimeMillis()
1.to(5).foreach(
//使用ctx对象发送数据
i => ctx.collect(SensorReading("sensor_" + i, curTime, 65 + random.nextGaussian() * 20))
)
TimeUnit.MILLISECONDS.sleep(3000L)
}
}
override def cancel(): Unit = running = false
}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import java.util.concurrent.TimeUnit
import scala.util.Random
/**
* 自定义Source, 实现SourceFunction接口, 重写run()方法和cancel()方法即可
* SourceFunction的并行度只能设置为1, 如果要自定义并行的Source需要实现ParallelSourceFunction
*/
object SourceFromMySource {
def main(args: Array[String]): Unit = {
//创建环境对象
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//添加自定义的source
//设置成多并行度会直接报错
val streaming = environment.addSource(new MySensorSource)/*.setParallelism(2)*/
//打印, 并且设置并行度为1
streaming.print().setParallelism(1)
//开启任务
environment.execute()
}
}
class MySensorSource extends SourceFunction[SensorReading] {
// flag: 表示数据源是否还在运行
var running = true
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
val random = new Random
while (running) {
val curTime = System.currentTimeMillis()
1.to(5).foreach(
//使用ctx对象发送数据
i => ctx.collect(SensorReading("sensor_" + i, curTime, 65 + random.nextGaussian() * 20))
)
TimeUnit.MILLISECONDS.sleep(3000L)
}
}
override def cancel(): Unit = running = false
}
Flink支持的数据类型
Flink的TypeInformation
Flink作为分布式处理框架,免不了需要将数据进行序列化和反序列化,
因此Flink为了统一管理和处理数据类型,定义了一套"类型信息"(TypeInformation)来统一表示数据类型
因此Flink为了统一管理和处理数据类型,定义了一套"类型信息"(TypeInformation)来统一表示数据类型
TypeInformation 类是 Flink 中所有类型描述符的基类。
它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器
支持的数据类型
基本数据类型
所有Java基本数据类型和其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger
数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)
复合数据类型
Java 元组类型(TUPLE)
Scala 样例类及 Scala 元组
行类型(ROW)
POJO
类似于自定义Java Bean的方式
辅助类型
Option、Either、List、Map
泛型类型(GENERIC)
在DataStream API中处理TypeInformation
不论是Java API还是Scala API,在调用DataStream API中需要传入输出数据类型的TypeInformation信息。
一般情况下TypeInformation是不需要用户手动传入的,Flink可以根据函数的入参和出参自动提取。
Flink有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器
Scala
Scala 程序不会像 Java 程序那样遭受类型擦除。对于Scala而言,类型和方法可以访问其泛型参数的类型。
Scala的DataStream API定义了隐式参数
map方法的声明:def map[R: TypeInformation](fun: T => R): DataStream[R]
map的方法声明上定义了隐式形参(上下文界定),需要存在一个隐式值,将R => Typeinformation[R]
调用Scala 的DataStream API时,使用隐式转换函数,自动生成TypeInformation数据
//导入隐式转换函数createTypeInformation即可
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala._
createTypeInformation隐式转换函数定义在org.apache.flink.streaming.api.scala包对象中
如果不能隐式转换不能自动生成TypeInformation数据,需要在形参上手动设置泛型
例如flatMap类型,泛型可以在Collector[O]上设置
代码示例
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* flatMap, 传入一个数据, 输出0到多个数据, 可以实现filter + map的操作
*
*/
object FlatMapFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(1, 2, 3)
//对于匿名函数, 如果有泛型的, 需要在形参上显示指定
.flatMap((data, out: Collector[String]) => {
if (data.==(1)) {
out.collect(data.toString)
}
})
.print("flatMap>")
env.execute()
}
}
import org.apache.flink.util.Collector
/**
* flatMap, 传入一个数据, 输出0到多个数据, 可以实现filter + map的操作
*
*/
object FlatMapFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(1, 2, 3)
//对于匿名函数, 如果有泛型的, 需要在形参上显示指定
.flatMap((data, out: Collector[String]) => {
if (data.==(1)) {
out.collect(data.toString)
}
})
.print("flatMap>")
env.execute()
}
}
如果手动设置也不行,需要使用匿名内部类,在声明类时,显示指定泛型
Java
Java由于泛型擦除的机制,Flink可能无法提取出输出的数据类型,因此在部分情况下需要手动设置,来告诉Flink框架
大部分情况下不需要手动设置,如果运行报错,可以手动设置TypeInformation数据
有两种方式可以设置TypeInformation信息
在调用DataStream API时通过形参手动传入
map方法的声明:public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType)
MapFunction<T, R> mapper, TypeInformation<R> outputType)
对于非泛型类型
TypeInformation<String> info = TypeInformation.of(String.class);
对于泛型类型
TypeInformation<Tuple> info = Types.TUPLE(Types.STRING, Types.DOUBLE);
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
调用returns方法返回
对于非泛型类型
DataStream<SomeType> result = stream
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
对于泛型类型
returns(Types.TUPLE(Types.Int, Types.GENERIC(SomeType.class)))
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
Types和TypeHint之间的区别
绝大多数情况下,Types能够满足显示设定数据类型的需求,如果仍然不满足可以使用TypeHint,优先使用Types
转换算子(transformation)
概述
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream
一个 Flink 程序的核心,其实就是所有的转换操作,它们决定了处理的业务逻辑
以针对一条流进行转换处理,也可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑
重点介绍基本的单数据流的转换,多流转换的内容后面介绍
transformation在Flink程序中的位置
基本转换算子
map
每来一条数据,就会调用map方法。将一种数据类型转换成另外一种数据类型
一般用作数据类型的转换,常见如从Kafka Source中读取String类型的数据,然后使用map转换成样例类
map前后数据类型可以相同,也可以不同,是将一个 DataStream 转换成另一个 DataStream
代码示例
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
/**
* map方法, 每来一条数据就会调用一次, 将一种数据类型转换成另外一种数据类型
* 一般用作数据类型的转换, 常见如从Kafka读取String的数据, 然后使用map转换成样例类
*/
object MapFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(Seq(1, 2, 3))
//可以传入匿名函数
.map(data => data * 2)
//或者自定义类实现MapFunction接口, 重写map方法
.map(new MyMapFunction2)
.print("map test>>>").setParallelism(1)
env.execute()
}
}
/**
*
* @tparam I 输入的数据类型
* @tparam O 输出的数据类型
*/
class MyMapFunction2 extends MapFunction[Int, Int] {
override def map(value: Int) = value * 2
}
import org.apache.flink.streaming.api.scala._
/**
* map方法, 每来一条数据就会调用一次, 将一种数据类型转换成另外一种数据类型
* 一般用作数据类型的转换, 常见如从Kafka读取String的数据, 然后使用map转换成样例类
*/
object MapFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(Seq(1, 2, 3))
//可以传入匿名函数
.map(data => data * 2)
//或者自定义类实现MapFunction接口, 重写map方法
.map(new MyMapFunction2)
.print("map test>>>").setParallelism(1)
env.execute()
}
}
/**
*
* @tparam I 输入的数据类型
* @tparam O 输出的数据类型
*/
class MyMapFunction2 extends MapFunction[Int, Int] {
override def map(value: Int) = value * 2
}
filter
对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。
一般用于数据的过滤,filter前后的数据类型相同,是将一个 DataStream 转换成另一个 DataStream
代码示例
import org.apache.flink.streaming.api.scala._
/**
* 对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。一般用于数据的过滤
*/
object FilterFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(1, 2, 3)
//使用匿名函数
.filter(data => data.==(1))
.print("filter >").setParallelism(1)
env.execute()
}
}
/**
* 对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。一般用于数据的过滤
*/
object FilterFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(1, 2, 3)
//使用匿名函数
.filter(data => data.==(1))
.print("filter >").setParallelism(1)
env.execute()
}
}
flatMap
flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
flatMap消费一个元素,可以产生 0 到多个元素,可以实现filter + map的操作。
前后的数据类型可以相同也可以不同,将DataStream转换成DataStream
代码示例
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* flatMap, 传入一个数据, 输出0到多个数据, 可以实现filter + map的操作
*
*/
object FlatMapFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(1, 2, 3)
//对于匿名函数, 如果有泛型的, 需要在形参上显示指定
.flatMap((data, out: Collector[String]) => {
if (data.==(1)) {
out.collect(data.toString)
}
})
.print("flatMap>")
env.execute()
}
}
import org.apache.flink.util.Collector
/**
* flatMap, 传入一个数据, 输出0到多个数据, 可以实现filter + map的操作
*
*/
object FlatMapFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(1, 2, 3)
//对于匿名函数, 如果有泛型的, 需要在形参上显示指定
.flatMap((data, out: Collector[String]) => {
if (data.==(1)) {
out.collect(data.toString)
}
})
.print("flatMap>")
env.execute()
}
}
聚合算子(Aggregation)
概述
基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。
而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息
这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并,这就是聚合操作
按键分区(keyBy)
如果要做聚合操作,肯定需要进行分组(类似于SQL中的GROUP BY),那么在Flink中就是keyBy操作
keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。
这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。
基于不同的 key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区。
下一步聚合算子操作就将会在同一个子任务中进行处理key相同的数据
数据是如何被分区的呢?通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。
所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。
所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。
分区后从DataStream变成KeyedStream,调用聚合方法,从KeyedStream变成DataStream
keyBy方法的形参需要传入一个KeySelector<IN, KEY>,IN是输入的数据类型,KEY是key的数据类型
简单聚合
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。
Flink 为我们内置实现了一些最基本、最简单的聚合 API。例如sum、min、max等
sum
在输入流上,对指定的字段做叠加求和的操作
min
在输入流上,对指定的字段求最小值
max
在输入流上,对指定的字段求最大值
minBy
与 min()类似,在输入流上针对指定字段求最小值。
不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值
而 minBy()则会返回包含字段最小值的整条数据
maxBy
和minBy类似
代码示例
import org.apache.flink.streaming.api.scala._
/**
* keyBy之后使用简单聚合算子
* 例如sum、max、min等
*/
object TransTupleAggregationDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements(("a", 1), ("a", 3), ("b", 3), ("b", 4))
stream.keyBy(_._1).sum(1).print("sum position>")
stream.keyBy(_._1).sum("_2").print("sum field>")
stream.keyBy(_._1).max(1).print("max position>")
stream.keyBy(_._1).max("_2").print("max field>")
stream.keyBy(_._1).min(1).print("min position>")
stream.keyBy(_._1).min("_2").print("min field>")
stream.keyBy(_._1).maxBy(1).print("maxBy position>")
stream.keyBy(_._1).maxBy("_2").print("maxBy field>")
stream.keyBy(_._1).minBy(1).print("minBy position>")
stream.keyBy(_._1).minBy("_2").print("minBy field>")
env.execute()
}
}
/**
* keyBy之后使用简单聚合算子
* 例如sum、max、min等
*/
object TransTupleAggregationDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements(("a", 1), ("a", 3), ("b", 3), ("b", 4))
stream.keyBy(_._1).sum(1).print("sum position>")
stream.keyBy(_._1).sum("_2").print("sum field>")
stream.keyBy(_._1).max(1).print("max position>")
stream.keyBy(_._1).max("_2").print("max field>")
stream.keyBy(_._1).min(1).print("min position>")
stream.keyBy(_._1).min("_2").print("min field>")
stream.keyBy(_._1).maxBy(1).print("maxBy position>")
stream.keyBy(_._1).maxBy("_2").print("maxBy field>")
stream.keyBy(_._1).minBy(1).print("minBy position>")
stream.keyBy(_._1).minBy("_2").print("minBy field>")
env.execute()
}
}
归约聚合(reduce)
与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。
它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
代码示例
import com.yanggu.flink.lowlevelapi.pojo.Event
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import java.util.concurrent.TimeUnit
import scala.util.Random
object ReduceFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new ClickSource)
stream
.map(data => (data.name, 1L))
//DataStream => KeyedStream
.keyBy(_._1)
//KeyedStream => DataStream
//实现求和的功能(模仿sum)
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
//这里返回true, 意味着所有的数据的key都相同, 被发到同一个slot上执行
.keyBy(_ => true)
//实现最大值的功能(模仿maxBy)
.reduce((v1, v2) => (v1._1, v1._2.max(v2._2)))
.print()
env.execute()
}
}
class ClickSource extends SourceFunction[Event] {
private var runFlag = true
override def run(ctx: SourceFunction.SourceContext[Event]) = {
val users = Array("Mary", "Alice", "Bob", "Cary")
val urls = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2")
val usersLength = users.length
val urlLength = urls.length
val random = new Random()
while (runFlag) {
ctx.collect(Event(users(random.nextInt(usersLength)), urls(random.nextInt(urlLength)), System.currentTimeMillis()))
TimeUnit.MILLISECONDS.sleep(100L)
}
}
override def cancel() = {
runFlag = false
}
}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import java.util.concurrent.TimeUnit
import scala.util.Random
object ReduceFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new ClickSource)
stream
.map(data => (data.name, 1L))
//DataStream => KeyedStream
.keyBy(_._1)
//KeyedStream => DataStream
//实现求和的功能(模仿sum)
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
//这里返回true, 意味着所有的数据的key都相同, 被发到同一个slot上执行
.keyBy(_ => true)
//实现最大值的功能(模仿maxBy)
.reduce((v1, v2) => (v1._1, v1._2.max(v2._2)))
.print()
env.execute()
}
}
class ClickSource extends SourceFunction[Event] {
private var runFlag = true
override def run(ctx: SourceFunction.SourceContext[Event]) = {
val users = Array("Mary", "Alice", "Bob", "Cary")
val urls = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2")
val usersLength = users.length
val urlLength = urls.length
val random = new Random()
while (runFlag) {
ctx.collect(Event(users(random.nextInt(usersLength)), urls(random.nextInt(urlLength)), System.currentTimeMillis()))
TimeUnit.MILLISECONDS.sleep(100L)
}
}
override def cancel() = {
runFlag = false
}
}
用户自定义函数(UDF)
前言
对于大部分DataStream API而言,都是调用DataStream的方法,然后传入一个接口,或者是匿名函数
这些接口有一个共同特点:全部都以算子操作名称 + Function 命名
对于大部分操作而言,都需要传入一个用户自定义函数(UDF User Defined Function),实现相关操作的接口,来完成处理逻辑的定义
函数类(Function Classes)
对于各种算子而言都有对应的XXXFunction接口。例如MapFunction、FilterFunction等
需要自定义类实现对应的XXXFunction,重写抽象方法即可
当然也可以使用匿名内部类的方式来实现UDF
匿名函数(Lambda)
如果不想使用UDF或者匿名内部类,在Java中可以使用lambda表达式,在Scala中可以使用匿名函数
关于Flink能够正确提取出TypeInformation信息
Scala
导入隐式转换
import org.apache.flink.streaming.api.scala._
如果不行,在匿名函数的形参上手动指定泛型。例如flatMap算子
(data, out: Collector[String])
如果仍然不行,使用UDF或者匿名内部类
Java
Java由于泛型擦除的机制,Flink可能无法提取出输出的数据类型,因此在部分情况下需要手动设置,来告诉Flink框架
大部分情况下不需要手动设置,如果运行报错,可以手动设置TypeInformation数据
有两种方式可以设置TypeInformation信息
在调用DataStream API时通过形参手动传入
map方法的声明:public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType)
MapFunction<T, R> mapper, TypeInformation<R> outputType)
对于非泛型类型
TypeInformation<String> info = TypeInformation.of(String.class);
对于泛型类型
TypeInformation<Tuple> info = Types.TUPLE(Types.STRING, Types.DOUBLE);
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
调用returns方法返回
对于非泛型类型
DataStream<SomeType> result = stream
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
对于泛型类型
returns(Types.TUPLE(Types.Int, Types.GENERIC(SomeType.class)))
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
Types和TypeHint之间的区别
绝大多数情况下,Types能够满足显示设定数据类型的需求,如果仍然不满足可以使用TypeHint,优先使用Types
富函数类(Rich Function Classes)
概述
“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。
富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等
富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能
复函数提供两个生命周期方法open()方法和close()方法
open()方法
在对应的map、filter等方法业务逻辑方法前被调用
在并行子任务开始后只会被调用一次,这时候上下文信息已经初始化好。
文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成
close()方法
在任务停止前调用,一般用来做清理工作
例如关闭数据库的链接等
代码示例
import com.yanggu.flink.lowlevelapi.pojo.SensorReading
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
/**
* 测试自定义的RichFunction函数
* open()方法、close()方法生命周期方法在每个并行度上只会被调用一次
* getRuntimeContext()方法获取运行时上下文例如函 数执行的并行度, 任务的名字
*/
object UDFFunctionDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val stream = environment.fromCollection(List(
SensorReading("sensor_1", System.currentTimeMillis(), 35.8),
SensorReading("sensor_2", System.currentTimeMillis(), 45.8),
SensorReading("sensor_3", System.currentTimeMillis(), 65.8)
))
stream
//这里设置了map的并行度为2
.map(new MyMapFunction).setParallelism(2)
.print("测试自定义RichFunction").setParallelism(1)
//可以获取执行的结果
val result = environment.execute("测试自定义RichFunction")
//这里可以获取累加器的计算结果
val value = result.getAccumulatorResult[Int]("MyFunction的计数器")
println(s"MyFunction的计数器计算结果: $value")
}
}
/**
* Rich Function是Function函数的Rich版本,
* 拥有一些声明周期方法, 可以获取运行时的上下文, 因此可以实现一些更复杂的功能
*
* @tparam IN SensorReading 输入的数据类型
* @tparam Out SensorReading 输出的数据类型
*/
class MyMapFunction extends RichMapFunction[SensorReading, SensorReading] {
var subtask = 0
var intCounter: IntCounter = _
/**
* open()方法是rich function的初始化方法,
* 当一个算子例如map或者filter被调用之前open()会被执行, 且只会执行一次
* 注意这句话
* 在每个并行度上只会被调用一次
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//这里可以进行一些初始化的工作, 例如建立一个和hdfs的链接
//在这里可以获取上下文信息
this.subtask = getRuntimeContext.getIndexOfThisSubtask
this.intCounter = getRuntimeContext.getIntCounter("MyFunction的计数器")
//只会被打印2次, 设置了map的并行度为2
println(Thread.currentThread() + "open方法被执行了")
}
/**
* 默认生命周期方法, 最后一个方法, 做一些清理工作
* 注意这句话
* 在每个并行度上只会被调用一次, 而且是最后调用
*/
override def close(): Unit = {
//只会被打印2次, 设置了map的并行度为2
println(Thread.currentThread() + "close方法被执行了")
println(Thread.currentThread() + intCounter.toString)
}
/**
* 需要重写的核心方法
*
* @param value
* @return
*/
override def map(value: SensorReading): SensorReading = {
println(Thread.currentThread() + "map方法被执行了")
intCounter.add(1)
value
}
}
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
/**
* 测试自定义的RichFunction函数
* open()方法、close()方法生命周期方法在每个并行度上只会被调用一次
* getRuntimeContext()方法获取运行时上下文例如函 数执行的并行度, 任务的名字
*/
object UDFFunctionDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val stream = environment.fromCollection(List(
SensorReading("sensor_1", System.currentTimeMillis(), 35.8),
SensorReading("sensor_2", System.currentTimeMillis(), 45.8),
SensorReading("sensor_3", System.currentTimeMillis(), 65.8)
))
stream
//这里设置了map的并行度为2
.map(new MyMapFunction).setParallelism(2)
.print("测试自定义RichFunction").setParallelism(1)
//可以获取执行的结果
val result = environment.execute("测试自定义RichFunction")
//这里可以获取累加器的计算结果
val value = result.getAccumulatorResult[Int]("MyFunction的计数器")
println(s"MyFunction的计数器计算结果: $value")
}
}
/**
* Rich Function是Function函数的Rich版本,
* 拥有一些声明周期方法, 可以获取运行时的上下文, 因此可以实现一些更复杂的功能
*
* @tparam IN SensorReading 输入的数据类型
* @tparam Out SensorReading 输出的数据类型
*/
class MyMapFunction extends RichMapFunction[SensorReading, SensorReading] {
var subtask = 0
var intCounter: IntCounter = _
/**
* open()方法是rich function的初始化方法,
* 当一个算子例如map或者filter被调用之前open()会被执行, 且只会执行一次
* 注意这句话
* 在每个并行度上只会被调用一次
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//这里可以进行一些初始化的工作, 例如建立一个和hdfs的链接
//在这里可以获取上下文信息
this.subtask = getRuntimeContext.getIndexOfThisSubtask
this.intCounter = getRuntimeContext.getIntCounter("MyFunction的计数器")
//只会被打印2次, 设置了map的并行度为2
println(Thread.currentThread() + "open方法被执行了")
}
/**
* 默认生命周期方法, 最后一个方法, 做一些清理工作
* 注意这句话
* 在每个并行度上只会被调用一次, 而且是最后调用
*/
override def close(): Unit = {
//只会被打印2次, 设置了map的并行度为2
println(Thread.currentThread() + "close方法被执行了")
println(Thread.currentThread() + intCounter.toString)
}
/**
* 需要重写的核心方法
*
* @param value
* @return
*/
override def map(value: SensorReading): SensorReading = {
println(Thread.currentThread() + "map方法被执行了")
intCounter.add(1)
value
}
}
物理分区(Physical Partitioning)
概述
“分区”(partitioning)操作就是要将数据进行重新分布,把数据发送到不同的slot中,进行下一步处理
keyBy是一种按照键的哈希值来进行重新分区的操作,能够保证key相同的数据,在一起进行处理
keyBy 是一种逻辑分区(logical partitioning)操作,也是一种软分区。无法精准地调配数据,告诉每个数据到底去哪里slot
更一般地,如果上下游算子的并行度不一致,这时候并不能使用one to one(forward)的方式,而是使用rebalance的方式,保证数据均匀
DataStream API提供了一系列的底层操作接口,能够帮我们实现数据流的手动重分区,把这些操作统称为“物理分区”操作
keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变
分区算子并不对数据进行转换处理,只是定义了数据的传输方式
常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)
默认的分区策略是rebalance(上下游算子并行度不同),如果并行度相同,则是one to one(forward),不需要重分区
随机分区(shuffle)
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区
因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同
代码示例
import org.apache.flink.streaming.api.scala._
/**
* 随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。
* 因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同
* 上游算子并行度为1, 下游为4,上游算子的数据会随机发送到下游,每次执行的结果都不同
*/
//shuffle:4> 5
//shuffle:2> 4
//shuffle:3> 3
//shuffle:1> 1
//shuffle:1> 2
object ShuffleDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源,并行度为 1
val stream = env.fromElements(1, 2, 3, 4, 5)
// 经洗牌后打印输出,并行度为 4
stream
.shuffle
.print("shuffle").setParallelism(4)
env.execute()
}
}
/**
* 随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。
* 因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同
* 上游算子并行度为1, 下游为4,上游算子的数据会随机发送到下游,每次执行的结果都不同
*/
//shuffle:4> 5
//shuffle:2> 4
//shuffle:3> 3
//shuffle:1> 1
//shuffle:1> 2
object ShuffleDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源,并行度为 1
val stream = env.fromElements(1, 2, 3, 4, 5)
// 经洗牌后打印输出,并行度为 4
stream
.shuffle
.print("shuffle").setParallelism(4)
env.execute()
}
}
轮询分区(Round-Robin)
rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。同时rebalance也是默认的分区算法
代码示例
import org.apache.flink.streaming.api.scala._
/**
* rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
*/
//rebalance:2> 1
//rebalance:3> 2
//rebalance:4> 3
//rebalance:1> 4
//rebalance:2> 5
object RebalanceDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源,并行度为 1
val stream = env.fromElements(1, 2, 3, 4, 5)
stream
//默认的分区算法
.rebalance
.print("rebalance").setParallelism(4)
env.execute()
}
}
/**
* rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
*/
//rebalance:2> 1
//rebalance:3> 2
//rebalance:4> 3
//rebalance:1> 4
//rebalance:2> 5
object RebalanceDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源,并行度为 1
val stream = env.fromElements(1, 2, 3, 4, 5)
stream
//默认的分区算法
.rebalance
.print("rebalance").setParallelism(4)
env.execute()
}
}
重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中
当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍且上游算子并行度大于1,rescale的效率明显会更高。
rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;
而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源
而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源
代码示例
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
/**
* rescale是会将数据轮询发送到下游并行任务的一部分中
* rebalance是上游的所有子任务都与下游的子任务建立联系, 然后轮询发送数据, 是一种笛卡尔积的关系(M * N)
* 如果下游算子的并行度是上游的算子的整数倍, 那么只是上游算子只是与下游的部分算子建立联系, 然后轮询发送数据
* 具体为上游并行度为2(source0、source1), 下游并行度为4(rescale1、rescale2、rescale3、rescale4)
* source0中的数据为2、4、6、8
* source1中的数据为1、3、5、7
* 上游source0子任务和下游rescale1子任务、下游rescale2子任务建立联系, 同时轮询发送数据(2、4、6、8)
* 上游source1子任务和下游rescale3子任务、下游rescale4子任务建立联系, 同时轮询发送数据(1、3、5、7)
*
*/
//source:0> 2、4、6、8
//source:1> 1、3、5、7
//rescale:1> 2 rescale:2> 4 rescale:1> 6 rescale:2> 8
//rescale:3> 1 rescale:4> 3 rescale:3> 5 rescale:4> 7
object RescaleDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 这里使用了并行数据源的富函数版本
// 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
env.addSource(new RichParallelSourceFunction[Int] {
override def run(ctx: SourceFunction.SourceContext[Int]) = {
for (i <- 0 until 8) {
// 将奇数发送到索引为 1 的并行子任务
// 将偶数发送到索引为 0 的并行子任务
if ((i + 1) % 2 == getRuntimeContext.getIndexOfThisSubtask) {
ctx.collect(i + 1)
}
}
}
override def cancel() = {}
}).setParallelism(2)
.rescale
.print("rescale").setParallelism(4)
env.execute()
}
}
import org.apache.flink.streaming.api.scala._
/**
* rescale是会将数据轮询发送到下游并行任务的一部分中
* rebalance是上游的所有子任务都与下游的子任务建立联系, 然后轮询发送数据, 是一种笛卡尔积的关系(M * N)
* 如果下游算子的并行度是上游的算子的整数倍, 那么只是上游算子只是与下游的部分算子建立联系, 然后轮询发送数据
* 具体为上游并行度为2(source0、source1), 下游并行度为4(rescale1、rescale2、rescale3、rescale4)
* source0中的数据为2、4、6、8
* source1中的数据为1、3、5、7
* 上游source0子任务和下游rescale1子任务、下游rescale2子任务建立联系, 同时轮询发送数据(2、4、6、8)
* 上游source1子任务和下游rescale3子任务、下游rescale4子任务建立联系, 同时轮询发送数据(1、3、5、7)
*
*/
//source:0> 2、4、6、8
//source:1> 1、3、5、7
//rescale:1> 2 rescale:2> 4 rescale:1> 6 rescale:2> 8
//rescale:3> 1 rescale:4> 3 rescale:3> 5 rescale:4> 7
object RescaleDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 这里使用了并行数据源的富函数版本
// 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
env.addSource(new RichParallelSourceFunction[Int] {
override def run(ctx: SourceFunction.SourceContext[Int]) = {
for (i <- 0 until 8) {
// 将奇数发送到索引为 1 的并行子任务
// 将偶数发送到索引为 0 的并行子任务
if ((i + 1) % 2 == getRuntimeContext.getIndexOfThisSubtask) {
ctx.collect(i + 1)
}
}
}
override def cancel() = {}
}).setParallelism(2)
.rescale
.print("rescale").setParallelism(4)
env.execute()
}
}
广播(broadcast)
将输入数据复制并发送到下游算子的所有并行任务中去
代码示例
import org.apache.flink.streaming.api.scala._
//可以看到,数据被复制然后广播到了下游的所有并行任务中去了。
//broadcast:1> 1
//broadcast:2> 1
//broadcast:3> 1
//broadcast:4> 1
object BroadcastDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源,并行度为 1
val stream = env.fromElements(1)
// 经广播后打印输出,并行度为 4
stream
.broadcast
.print("broadcast").setParallelism(4)
env.execute()
}
}
//可以看到,数据被复制然后广播到了下游的所有并行任务中去了。
//broadcast:1> 1
//broadcast:2> 1
//broadcast:3> 1
//broadcast:4> 1
object BroadcastDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源,并行度为 1
val stream = env.fromElements(1)
// 经广播后打印输出,并行度为 4
stream
.broadcast
.print("broadcast").setParallelism(4)
env.execute()
}
}
全局分区(global)
通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1
代码示例
import org.apache.flink.streaming.api.scala._
/**
* 可以看到, 即使下游的print算子的并行度为2, 但是调用了global方法之后, 所有的数据都会发往下游的第一个算子
* 可以理解为强行将下游算子的并行度调整为1
*/
//global:1> 1
//global:1> 2
//global:1> 3
//global:1> 4
object GlobalDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements(1, 2, 3, 4)
stream.global
.print("global").setParallelism(2)
env.execute()
}
}
/**
* 可以看到, 即使下游的print算子的并行度为2, 但是调用了global方法之后, 所有的数据都会发往下游的第一个算子
* 可以理解为强行将下游算子的并行度调整为1
*/
//global:1> 1
//global:1> 2
//global:1> 3
//global:1> 4
object GlobalDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements(1, 2, 3, 4)
stream.global
.print("global").setParallelism(2)
env.execute()
}
}
自定义分区(Custom)
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段。
它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector
代码示例
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._
/**
* 当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略
*/
//CustomPartition:1> 2
//CustomPartition:2> 1
object CustomPartitionDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 将自然数按照奇偶分区// 将自然数按照奇偶分区
env
.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
.partitionCustom(new Partitioner[Int]() {
def partition(key: Int, numPartitions: Int) = key % 2
}, data => data)
.print("CustomPartition").setParallelism(2)
env.execute()
}
}
import org.apache.flink.streaming.api.scala._
/**
* 当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略
*/
//CustomPartition:1> 2
//CustomPartition:2> 1
object CustomPartitionDemo {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 将自然数按照奇偶分区// 将自然数按照奇偶分区
env
.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
.partitionCustom(new Partitioner[Int]() {
def partition(key: Int, numPartitions: Int) = key % 2
}, data => data)
.print("CustomPartition").setParallelism(2)
env.execute()
}
}
输出算子(Sink)
输出到文件
输出到Kafka
代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._
/**
* 把flink的DataStream数据输出到kafka中
*/
object KafkaSinkDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaSink = KafkaSink.builder[String]()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("test-kafka-sink")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build()
//val kafkaSink = KafkaUtil.getKafkaSink("localhost:9092", "test-kafka-sink")
environment
.fromCollection(Seq(1, 2, 3, 4))
.map(data => (data * 2).toString)
.sinkTo(kafkaSink)
environment.execute("KafkaSinkDemo Job")
}
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._
/**
* 把flink的DataStream数据输出到kafka中
*/
object KafkaSinkDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaSink = KafkaSink.builder[String]()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("test-kafka-sink")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build()
//val kafkaSink = KafkaUtil.getKafkaSink("localhost:9092", "test-kafka-sink")
environment
.fromCollection(Seq(1, 2, 3, 4))
.map(data => (data * 2).toString)
.sinkTo(kafkaSink)
environment.execute("KafkaSinkDemo Job")
}
输出到Redis
Redis 是一个开源的内存式的数据存储,提供了像字符串(string)、哈希表(hash)、列表
(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系
列常用的数据结构
(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系
列常用的数据结构
Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为
我们提供了 Flink-Redis 的连接工具
我们提供了 Flink-Redis 的连接工具
添加maven依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
代码示例
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* 把flink的DataStream数据输出到redis中
* https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
*/
object RedisSinkDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("127.0.0.1")
.setDatabase(0)
.setPort(6379)
.build()
environment
.fromCollection(List(("1", "1"), ("2", "2"), ("3", "3")))
.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
environment.execute("RedisSinkDemo Job")
}
}
/**
* @tparam T redis处理的数据类型
*/
class RedisExampleMapper extends RedisMapper[(String, String)] {
/**
* 指定使用redis的数据类型
* 这里使用string, 可以使用set
*
* @return
*/
override def getCommandDescription: RedisCommandDescription = {
//HASH和SORTED_SET需要添加额外的key
new RedisCommandDescription(RedisCommand.HSET, "test-hash")
}
//这里的获取的key就是redis中内层hash的key, 如果是string其他的类型, 直接就是redis的key
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* 把flink的DataStream数据输出到redis中
* https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
*/
object RedisSinkDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("127.0.0.1")
.setDatabase(0)
.setPort(6379)
.build()
environment
.fromCollection(List(("1", "1"), ("2", "2"), ("3", "3")))
.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
environment.execute("RedisSinkDemo Job")
}
}
/**
* @tparam T redis处理的数据类型
*/
class RedisExampleMapper extends RedisMapper[(String, String)] {
/**
* 指定使用redis的数据类型
* 这里使用string, 可以使用set
*
* @return
*/
override def getCommandDescription: RedisCommandDescription = {
//HASH和SORTED_SET需要添加额外的key
new RedisCommandDescription(RedisCommand.HSET, "test-hash")
}
//这里的获取的key就是redis中内层hash的key, 如果是string其他的类型, 直接就是redis的key
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
输出到Elasticsearch
ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch
有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域
应用非常广泛。
有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域
应用非常广泛。
需要添加maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
常见将事件时间滑动窗口的聚合数据输出到ElasticSearch中
代码示例
import cn.hutool.json.JSONUtil
import com.yanggu.flink.lowlevelapi.pojo.Event
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
import org.elasticsearch.common.xcontent.XContentType
import java.util.Collections
/**
* Sink输出到Elasticsearch中
* 使用GET http://ip:9200/clicks/_search可以查询到插入的数据
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/elasticsearch/#elasticsearch-sink
*/
object ElasticsearchSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromElements(
Event("Mary", "./home", 1000L),
Event("Bob", "./cart", 2000L),
Event("Alice", "./prod?id=100", 3000L),
Event("Alice", "./prod?id=200", 3500L),
Event("Bob", "./prod?id=2", 2500L),
Event("Alice", "./prod?id=300", 3600L),
Event("Bob", "./home", 3000L),
Event("Bob", "./prod?id=1", 2300L),
Event("Bob", "./prod?id=3", 3300L))
//核心配置es的ip和端口
val httpHosts = Collections.singletonList(new HttpHost("192.168.1.144", 9200, "http"))
//如何处理处理, 一般直接插入即可
val elasticsearchSinkFunction = new ElasticsearchSinkFunction[Event] {
override def process(data: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = {
val request = Requests
.indexRequest()
.index("clicks")
.source(JSONUtil.toJsonStr(data), XContentType.JSON)
requestIndexer.add(request)
}
}
val elasticsearchSink = new ElasticsearchSink.Builder[Event](httpHosts, elasticsearchSinkFunction)
.build()
dataStream.addSink(elasticsearchSink)
env.execute()
}
}
import com.yanggu.flink.lowlevelapi.pojo.Event
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
import org.elasticsearch.common.xcontent.XContentType
import java.util.Collections
/**
* Sink输出到Elasticsearch中
* 使用GET http://ip:9200/clicks/_search可以查询到插入的数据
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/elasticsearch/#elasticsearch-sink
*/
object ElasticsearchSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromElements(
Event("Mary", "./home", 1000L),
Event("Bob", "./cart", 2000L),
Event("Alice", "./prod?id=100", 3000L),
Event("Alice", "./prod?id=200", 3500L),
Event("Bob", "./prod?id=2", 2500L),
Event("Alice", "./prod?id=300", 3600L),
Event("Bob", "./home", 3000L),
Event("Bob", "./prod?id=1", 2300L),
Event("Bob", "./prod?id=3", 3300L))
//核心配置es的ip和端口
val httpHosts = Collections.singletonList(new HttpHost("192.168.1.144", 9200, "http"))
//如何处理处理, 一般直接插入即可
val elasticsearchSinkFunction = new ElasticsearchSinkFunction[Event] {
override def process(data: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = {
val request = Requests
.indexRequest()
.index("clicks")
.source(JSONUtil.toJsonStr(data), XContentType.JSON)
requestIndexer.add(request)
}
}
val elasticsearchSink = new ElasticsearchSink.Builder[Event](httpHosts, elasticsearchSinkFunction)
.build()
dataStream.addSink(elasticsearchSink)
env.execute()
}
}
输出打MySQL(JDBC)
可以将Flink计算好的结果输出到关系型数据库(例如MySQL中)
添加maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
代码示例
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._
import com.yanggu.flink.lowlevelapi.pojo.Book
import java.sql.PreparedStatement
/**
* 将数据输出到mysql中, 使用官方提供的sink
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
*/
object JdbcSinkDemo01 {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = environment
.fromCollection(
Seq(Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017))
)
val sql = "insert into books (id, title, authors, year) values (?, ?, ?, ?)"
val function = new JdbcStatementBuilder[Book] {
override def accept(statement: PreparedStatement, book: Book): Unit = {
statement.setLong(1, book.id)
statement.setString(2, book.title)
statement.setString(3, book.authors)
statement.setInt(4, book.year)
}
}
//JDBC的sink是批处理, 为了提高运行效率。(当数据攒满多少个,或者超过多少时间就输出)
val executionOptions = JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build()
val tool = ParameterTool.fromPropertiesFile(getClass.getResourceAsStream("/jdbc.properties"))
val connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(tool.get("jdbc.url"))
.withDriverName(tool.get("jdbc.driverClassName"))
.withUsername(tool.get("jdbc.username"))
.withPassword(tool.get("jdbc.password"))
.build()
val jdbcSinkFunction = JdbcSink.sink[Book](sql, function, executionOptions, connectionOptions)
dataStream.addSink(jdbcSinkFunction)
environment.execute("JdbcSinkDemo01 Job")
}
}
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._
import com.yanggu.flink.lowlevelapi.pojo.Book
import java.sql.PreparedStatement
/**
* 将数据输出到mysql中, 使用官方提供的sink
* https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
*/
object JdbcSinkDemo01 {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = environment
.fromCollection(
Seq(Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017))
)
val sql = "insert into books (id, title, authors, year) values (?, ?, ?, ?)"
val function = new JdbcStatementBuilder[Book] {
override def accept(statement: PreparedStatement, book: Book): Unit = {
statement.setLong(1, book.id)
statement.setString(2, book.title)
statement.setString(3, book.authors)
statement.setInt(4, book.year)
}
}
//JDBC的sink是批处理, 为了提高运行效率。(当数据攒满多少个,或者超过多少时间就输出)
val executionOptions = JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build()
val tool = ParameterTool.fromPropertiesFile(getClass.getResourceAsStream("/jdbc.properties"))
val connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(tool.get("jdbc.url"))
.withDriverName(tool.get("jdbc.driverClassName"))
.withUsername(tool.get("jdbc.username"))
.withPassword(tool.get("jdbc.password"))
.build()
val jdbcSinkFunction = JdbcSink.sink[Book](sql, function, executionOptions, connectionOptions)
dataStream.addSink(jdbcSinkFunction)
environment.execute("JdbcSinkDemo01 Job")
}
}
输出到RabbitMQ
代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
/**
* 把flink的DataStream数据输出到rabbitmq中
*/
object RabbitMQSinkDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val tool = ParameterTool.fromPropertiesFile(getClass.getResourceAsStream("/rabbitmq.properties"))
val config = new RMQConnectionConfig.Builder()
.setHost(tool.get("rabbitmq.host"))
.setPort(tool.getInt("rabbitmq.port"))
.setVirtualHost(tool.get("rabbitmq.virtualHost"))
.setUserName(tool.get("rabbitmq.username"))
.setPassword(tool.get("rabbitmq.password"))
.build()
val rabbitmqSink = new RMQSink[String](config, tool.get("rabbitmq.queue"), new SimpleStringSchema)
environment
.fromCollection(List("{123}"))
.addSink(rabbitmqSink)
environment.execute("RabbitMQSinkDemo Job")
}
}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
/**
* 把flink的DataStream数据输出到rabbitmq中
*/
object RabbitMQSinkDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val tool = ParameterTool.fromPropertiesFile(getClass.getResourceAsStream("/rabbitmq.properties"))
val config = new RMQConnectionConfig.Builder()
.setHost(tool.get("rabbitmq.host"))
.setPort(tool.getInt("rabbitmq.port"))
.setVirtualHost(tool.get("rabbitmq.virtualHost"))
.setUserName(tool.get("rabbitmq.username"))
.setPassword(tool.get("rabbitmq.password"))
.build()
val rabbitmqSink = new RMQSink[String](config, tool.get("rabbitmq.queue"), new SimpleStringSchema)
environment
.fromCollection(List("{123}"))
.addSink(rabbitmqSink)
environment.execute("RabbitMQSinkDemo Job")
}
}
自定义Sink输出
代码示例
import cn.hutool.core.date.DateUtil
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import java.util.Date
object MySinkFunctionDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment
.fromCollection(List(1, 2, 3, 4))
.addSink(new RichSinkFunction[Int] {
/**
*
* @param value 对应的值
* @param context 上下文信息
*/
override def invoke(value: Int, context: SinkFunction.Context): Unit = {
println(s"slot: ${getRuntimeContext.getTaskNameWithSubtasks}, 运行时间: ${DateUtil.formatDateTime(new Date(context.currentProcessingTime()))}" +
s", 对应的值: $value")
}
})
environment.execute("SinkDemo01 Job")
}
}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import java.util.Date
object MySinkFunctionDemo {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment
.fromCollection(List(1, 2, 3, 4))
.addSink(new RichSinkFunction[Int] {
/**
*
* @param value 对应的值
* @param context 上下文信息
*/
override def invoke(value: Int, context: SinkFunction.Context): Unit = {
println(s"slot: ${getRuntimeContext.getTaskNameWithSubtasks}, 运行时间: ${DateUtil.formatDateTime(new Date(context.currentProcessingTime()))}" +
s", 对应的值: $value")
}
})
environment.execute("SinkDemo01 Job")
}
}
Flink中的时间和窗口
时间语义
Flink中的时间语义
哪种时间语义更重要
水位线(Watermark)
事件时间和窗口
什么是水位线
如何生成水位线
水位线生成策略
周期性生成
断点式生成
Flink 内置水位线生成器
有序流
乱序流
为什么要多减去1ms
maxTimestamp - outOfOrdernessMillis - 1
自定义水位线生成策略
水位线的最值
水位线的最小值是Long.MIN_VALUE
水位线的最大值是Long.MAX_VALUE
水位线的传递
水位线总结
水位线在事件时间的世界里面,承担了逻辑时钟的角色
窗口的计算和销毁、定时器的触发和执行都依赖于水位线
水位线是一种特殊的事件,通过代码插入的数据流里面,一般而言都是周期性的生成,然后跟随数据流向下游流动
水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。单位是毫秒
水位线往下游算子的传递类似于木桶效应,总是取所有分区水位线的最小值,以保证逻辑处理处理的正确性
对于无界数据流,Flink会在数据流开始时,插入一个负无穷大的水位线,保证水位线一定生成和向下游传递。
而在数据流结束的时候(或者程序结束的时候)插入一个正无穷大的水位线,保证所有的窗口闭合以及所有的定时器都被触发
而在数据流结束的时候(或者程序结束的时候)插入一个正无穷大的水位线,保证所有的窗口闭合以及所有的定时器都被触发
对于有界数据流,Flink只会插入两次水位线,在开始的时候插入负无穷的水位线,在结束的时候插入正无穷的水位线。
因为只需要插入两次水位线就能保证计算的正确性,无需在数据流的中间插入水位线了
因为只需要插入两次水位线就能保证计算的正确性,无需在数据流的中间插入水位线了
窗口(Window)
窗口的概念
窗口的分类
滚动窗口(Tumbling Window)
滑动窗口(Sliding Window)
会话窗口(Session Window)
全局窗口(Global Window)
窗口API概览
窗口分配器(Window Assigners)
窗口函数(Window Functions)
其他API
窗口的生命周期
迟到数据的处理
设置水位线延迟时间
允许窗口处理迟到数据
将迟到数据放入窗口侧输出流
处理函数(底层API)
基本处理函数(ProcessFunction)
ProcessFunction的功能和使用
Processfunction解析
处理函数的分类
按键分区处理函数(KeyedProcessFunction)
定时器(Timer)和定时服务(TimerService)
KeyedProcessFunction的使用
窗口处理函数(ProcessWindowFunction)
ProcessWindowFunction的使用
ProcessWindowFunction的解析
案例应用(TopN)
使用ProcessAllWindowFunction
使用KeyedProcessFunction
侧输出流(Side Output)
多流转换
分流操作
简单实现
使用侧输出流实现
UNION操作
CONNECT操作
基于时间的合流(双流连接JOIN操作)
Window Join
Interval Join
Window CoGroup
状态编程
Flink中的状态
有状态算子
状态的管理
状态的分类
按键分区状态(Keyed State)
基本概念和特点
支持的结构和类型
代码实现
状态生存时间(TTL)
算子状态(Operator State)
基本概念和特点
状态类型
代码实现
广播状态(Broadcast State)
基本用法
代码实现
状态持久化和状态后端
检查点(Checkpoint)
状态后端(State backend)
容错机制
检查点(Checkpoint)和保存点(Savepoint)
检查点的保存
从检查点恢复状态
检查点算法
检查点配置
保存点(Savepoint)
状态一致性
一致性的概念
一致性的级别
端到端的状态一致性
端到端精确一次性(end-to-end exactly-once)
输入端保证
输出端保证
Flink和Kafka连接时的精确一次性保证
Table API和Flink SQL
基本API
流处理中的表
时间属性和窗口
聚合(Aggregation)查询
连接(JOIN)查询
函数
连接到外部系统
Flink CEP
基本概念
模式API(Pattern API)
模式的检测处理
CEP的状态机实现
Flink CDC
CDC简介
DataStream方式
SQL方式
Flink调优
ParameterTool工具的使用
异步IO(Async IO)
0 条评论
下一页