Apache Flink:Checkpoint、Savepoint配置与实践

  • 时间:2018-10-13 22:54 作者:Aruen 来源:Aruen 阅读:876
  • 扫一扫,手机访问
摘要:Flink CheckpointCheckpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进

Flink Checkpoint

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正由于故障带来的程序数据状态中断。这里,我们简单了解一下Flink Checkpoint机制,如官网下图所示:

Apache Flink:Checkpoint、Savepoint配置与实践

Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停解决Steam中新接收到的数据记录。由于一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了少量比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。

开启Checkpoint功能

开启Checkpoint功能,只要要在代码中进行配置就可,如下代码所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.6.1/flink-checkpoints"));

CheckpointConfig config = env.getCheckpointConfig();

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

config.setCheckpointInterval(60000);

上面调用enableExternalizedCheckpoints设置为ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink解决程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint解决。上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个Checkpoint

默认情况下,假如设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,假如我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比方,我们发现最近4个小时数据记录解决有问题,希望将整个状态复原到4小时之前。

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,增加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

这样设置以后,运行Flink Job,并查看对应的Checkpoint在HDFS上存储的文件目录,示例如下所示:

hdfs dfs -ls /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/

Found 22 items

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:36 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:37 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:38 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:39 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:40 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:41 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:42 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:43 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:44 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:45 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:46 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:47 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:48 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:49 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:50 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:51 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:52 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:53 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:54 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:55 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:56 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared

drwxr-xr-x - hadoop supergroup 0 2018-10-10 14:57 /flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned

可见,我们配置了state.checkpoints.num-retained的值为20,只保留了最近的20个Checkpoint。假如希望会退到某个Checkpoint点,只要要指定对应的某个Checkpoint路径就可实现。

从Checkpoint进行恢复

假如Flink程序异常失败,或者者最近一段时间内数据解决错误,我们可以将程序从某一个Checkpoint点,比方chk-860进行回放,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.6.1/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar

程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据,如下所示:

hdfs dfs -ls /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e

Found 6 items

drwxr-xr-x - hadoop supergroup 0 2018-10-10 15:10 /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861

drwxr-xr-x - hadoop supergroup 0 2018-10-10 15:11 /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862

drwxr-xr-x - hadoop supergroup 0 2018-10-10 15:12 /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863

drwxr-xr-x - hadoop supergroup 0 2018-10-10 15:13 /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864

drwxr-xr-x - hadoop supergroup 0 2018-10-10 15:14 /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared

drwxr-xr-x - hadoop supergroup 0 2018-10-10 15:15 /flink-1.6.1/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned

从上面我们可以看到,前面Flink Job的ID为582e17d2cc343e6c56255d111bae0191,所有的Checkpoint文件都在以Job ID为名称的目录里面,当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID(这里是11bbc5d9933e4ff7e25198a760e9792e),而对应的Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863等等。

Flink Savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创立一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。

Flink程序中包含两种状态数据,一种是客户定义的状态(User-defined State),他们是基于Flink的Transformation函数来创立或者者修改得到的状态数据;另一种是系统状态(System State),他们是指作为Operator计算一部分的数据Buffer等状态数据,比方在使用Window Function时,在Window内部缓存Streaming数据记录。为了能够在创立Savepoint过程中,唯一识别对应的Operator的状态数据,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续升级/更新程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,假如我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。

而且,强烈建议手动为每个Operator设置ID,即便未来Flink应用程序可能会改动很大,比方替换原来的Operator实现、添加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以假如未来Flink程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的Savepoint正确地恢复。

下面,我们以Flink官网文档中给定的例子,来看下如何设置Operator ID,代码如下所示:

DataStream stream = env.

// Stateful source (e.g. Kafka) with ID

.addSource(new StatefulSource())

.uid("source-id") // ID for the source operator

.shuffle()

// Stateful mapper with ID

.map(new StatefulMapper())

.uid("mapper-id") // ID for the mapper

// Stateless printing sink

.print(); // Auto-generated ID

创立Savepoint

创立一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定:

一种是,需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,增加如下配置,设置Savepoint存储目录,例如如下所示:

state.savepoints.dir: hdfs://namenode01.td.com/flink-1.6.1/flink-savepoints

另一种是,在手动执行savepoint命令的时候,指定Savepoint存储目录,命令格式如下所示:

bin/flink savepoint :jobId [:targetDirectory]

例如,正在运行的Flink Job对应的ID为40dcc6d2ba90f13930abce295de8d038,使用默认state.savepoints.dir配置指定的Savepoint目录,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

可以看到,在目录hdfs://namenode01.td.com/flink-1.6.1/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

为正在运行的Flink Job指定一个目录存储Savepoint数据,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

可以看到,在目录 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

从Savepoint恢复

现在,我们可以停掉Job 40dcc6d2ba90f13930abce295de8d038,而后通过Savepoint命令来恢复Job运行,命令格式如下所示:

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

可以看到,启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd。

Savepoint目录结构

下面,我们看一下Savepoint目录下面存储内容的结构,如下所示:

hdfs dfs -ls /flink-1.6.1/flink-savepoints/savepoint-11bbc5-bd967f90709b

Found 5 items

-rw-r--r-- 3 hadoop supergroup 4935 2018-10-11 10:15 /flink-1.6.1/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6

-rw-r--r-- 3 hadoop supergroup 4599 2018-10-11 10:15 /flink-1.6.1/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159

-rw-r--r-- 3 hadoop supergroup 4976 2018-10-11 10:15 /flink-1.6.1/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata

-rw-r--r-- 3 hadoop supergroup 4348 2018-10-11 10:15 /flink-1.6.1/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c

-rw-r--r-- 3 hadoop supergroup 4724 2018-10-11 10:15 /flink-1.6.1/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路径中,11bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串,而后savepoint-11bbc5-bd967f90709b作为存储此次Savepoint数据的根目录,最后savepoint-11bbc5-bd967f90709b目录下面_metadata文件包含了Savepoint的元数据信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目录下面其它文件的路径,这些文件内容都是序列化的状态信息。

  • 全部评论(0)
最新发布的资讯信息
【系统环境|】从谷歌到手机厂商都下决心了,要清除32位应用这匹“害群之马”(2025-10-17 05:41)
【系统环境|】Windows上使用QEMU创建aarch64(ARM64)虚拟机(2025-10-17 05:40)
【系统环境|】nodejs 如何安装在aarch64平台(2025-10-17 05:39)
【系统环境|】常用git命令-从远程更新代码合并分支、提交代码等(2025-10-17 05:38)
【系统环境|】技术干货|常用的 Git 功能和选项(2025-10-17 05:38)
【系统环境|】掌握git命令,图解一目了然(2025-10-17 05:37)
【系统环境|】总结几个常用的Git命令的使用方法(2025-10-17 05:36)
【系统环境|】这篇 Git 教程太清晰了,很多 3 年经验程序员都收藏了(2025-10-17 05:35)
【系统环境|】Git常用命令及操作指南(2025-10-17 05:35)
【系统环境|】「实用」盘点那些开发中最常用的Git命令(2025-10-17 05:34)
手机二维码手机访问领取大礼包
返回顶部