分享好友 站长动态首页 网站导航

Flink CDC 在大健云仓的实践

2022-06-21 09:08 · 头闻号数据库

摘要:本文整理自大健云仓基础架构负责人、Flink CDC Maintainer 龚中强在 5 月 21 日 Flink CDC Meetup 的演讲。主要内容包括:

一、引入 Flink CDC 的背景

公司引入 CDC 技术,主要基于以下四个角色的需求:

CDC 是数据捕获变更的技术。广义上来说,但凡能够捕获数据变更的技术,都能被称为 CDC。但通常我们说的 CDC 技术主要面向数据库的变更。

CDC 的实现方式主要有两种,分别是基于查询和基于日志:

正如 Flink 的宣言 “实时即未来”,在如今的大背景下,实时性是亟待解决的重要问题。因此,我们将主流 CDC 基于日志的技术做了对比,如上图所示:

二、现今内部落地的业务场景

LDSS 库存管理的业务场景主要有以下四种:

上图为 LDSS 库存管理分单场景架构图。

首先,通过多数据源同步的应用向下拉取仓储系统、平台系统以及内部 ERP 系统数据,将所需数据抽取到 LDSS 系统的数据库中,以支撑 LDSS 系统订单、库存、物流三大模块的业务功能。

其次,需要产品信息、订单信息以及仓库信息才能进行有效的分单决策。多数据源定时同步任务基于 JDBC 查询,通过时间做筛选,同步变更的数据到 LDSS 系统中。LDSS 系统基于这些数据做分单决策,以获得最优解。

定时任务同步的代码,首先需要定义定时任务、定义定时任务的类、执行方法以及执行间隔。

上图左侧为定时任务的定义,右侧是定时任务的逻辑开发。首先,打开 Oracle 数据库进行查询,然后 upsert 到 MySQL 数据库,即完成了定时任务的开发。此处以接近原生 JDBC 的查询方式,将数据依次塞到对应的数据库表中,开发逻辑十分繁琐,也容易出现 bug。

因此,我们基于 Flink CDC 对其进行了改造。

上图为基于 Flink CDC 实现的实时同步场景,唯一的变化是将此前的多数据源同步应用程序换成了 Flink CDC 。

首先,通过 SqlServer CDC、MySQL CDC、Oracle CDC 分别连接抽取对应仓储平台、 ERP 系统数据库的表数据,然后通过 Flink 提供的 JDBC connector 写入到 LDSS 系统的 MySQL 数据库中。能够通过 SqlServer CDC、MySQL CDC、Oracle CDC 将异构数据源转化为统一的 Flink 内部类型,再往下游写。

此架构相比于之前的架构,对业务系统没有侵入性,而且实现较为简单。

我们引入了 MySQL CDC 和 SqlServer CDC 分别连接 B2B 平台的 MySQL 数据库以及仓储系统的 SqlServer 数据库,然后将抽取到的数据通过 JDBC Connector 写入到 LDSS 系统的 MySQL 数据库。

通过以上改造,得益于 Flink CDC 赋予其实时的能力,不需要管理繁杂的定时任务。

基于 Flink CDC 同步代码的实现分为以下三步:

  1. 第一步,定义源表 —— 需要同步的表;
  2. 第二步,定义目标表 —— 需要写入数据的目标表;
  3. 第三步,通过 insert select 语句,即可完成 CDC 同步任务的开发。

上述开发模式非常简单,逻辑清晰。此外,依托 Flink CDC 的同步任务和 Flink 架构,还获得了失败重试、分布式、高可用、全量增量一致性切换等特性。

三、未来内部推广及平台化建设

上图为平台架构图。

左侧 source 是由 Flink CDC + Flink 提供的源端,能够通过丰富的源端抽取数据,通过数据平台上的开发写入到目标端。目标端又依托于 Flink 的强大生态,能够很好地支撑数据湖、关系型数据库、MQ 等。

Flink 目前有两种运行方式,一种是国内比较流行的 Flink on Yarn,另一种是 Flink on Kubernets。中间部分的数据平台向下管理 Flink 集群,以向上支撑 SQL 在线开发、任务开发、血缘管理、任务提交、在线 Notebook 开发、权限和配置以及对任务性能的监控和告警,同时也能够对数据源做到很好的管理。

数据同步的需求在公司内部特别旺盛,需要通过平台来提高开发效率,加快交付速度。而且平台化之后,可以统一公司内部的数据同步技术,收拢同步技术栈,减少维护成本。

平台化的目标如下:

  1. 能够很好地管理数据源、表等元信息;
  2. 任务的整个生命周期都可以在平台上完成;
  3. 实现任务的性能观测以及告警;
  4. 简化开发,快速上手,业务开发人员经过简单培训即可上手开发同步任务。

平台化能带来以下三个方面的收益:

  1. 收拢数据同步任务,统一来管理;
  2. 平台管理维护同步任务的全生命周期;
  3. 专门的团队负责,团队能够专注前沿的数据集成技术。

有了平台之后,即可快速落地应用更多的业务场景。

有了平台的助力,相信 Flink CDC 能够在公司内部更好地释放它的能力。

上图展示了 SqlServer CDC 的原理。

社区同学使用了当前版本的 SqlServer CDC 后,主要反馈的问题有以下三个:

  1. 快照过程中锁表:锁表操作对于 DBA 和在线应用都是不可忍受的, DBA 无法接受数据库被夯住,同时也会影响在线应用。
  2. 快照过程中不能 checkpoint:不能 checkpoint 就意味着快照过程中一旦失败,只能重新开始跑快照过程,这对于大表非常不友好。
  3. 快照过程只支持单并发:千万级、上亿级的大表,在单并发的情况下需要同步十几甚至几十个小时,极大束缚了 SqlServer CDC 的应用场景。

我们针对上述问题做了实践和改进,参考社区 2.0 版本 MySQL CDC 并发无锁算法的思想,对 SqlServer CDC 进行了优化,最终实现了快照过程中无锁,实现一致性快照;快照过程中支持 checkpoint ;快照过程中支持并发,加速快照过程。在大表同步的情况下,并发优势尤为明显。

但是由于 2.2 版本社区将 MySQL 的并发无锁思想抽象成了统一公共的框架,SqlServer CDC 需要重新适配这套通用框架后才能贡献给社区。

提问&解答

Q1需要开启 SqlServer 自己的 CDC 吗?

是的,SqlServer CDC 的功能就是基于 SqlServer 数据库自己的 CDC 特性实现的。

Q2物化视图通过什么方式去刷新定时任务触发器?

通过 Flink CDC 将需要生成物化视图的 SQL 放在 Flink 里运行,通过原表的变动触发计算,然后同步到物化视图表里。

Q3平台化是怎么做的?

平台化参考了社区众多的开源项目以及优秀的开源平台,比如 StreamX、Dlink 等优秀的开源项目。

Q4SqlServer CDC 在消费 transaction log 时有瓶颈吗?

SqlServer 并没有直接消费 log,其原理是 SqlServer capture process 去匹配 log 内哪些表开启了 CDC ,然后将这些表从日志里捞到开启 CDC 表的变更数据,再转插到 change table 里,最后通过开启 CDC 之后数据库生成的 CDC query function 获取到数据的变更。

Q5Flink CDC 高可用如何保障同步任务过多或密集处理方案?

Flink 的高可用依赖于 Flink 特性比如 checkpoint 等来保证。同步任务过多或处理方案密集的情况,建议使用多套 Flink 下游集群,然后根据同步的实时性区分对待,将任务发布到相应的集群中。

Q6中间需要 Kafka 吗?

取决于同步任务或数仓架构是否需要将中间数据做 Kafka 落地。

Q7一个数据库中有多张表,可以放到一个任务里运行吗?

取决于开发方式。如果是 SQL 的开发方式,要实现一次性写多表只能通过多个任务。但 Flink CDC 提供了另外一种比较高阶的开发方式 DataStream ,可以将多表放到一个任务里运行。

Q8Flink CDC 支持读取 Oracle 从库的日志吗?

目前还无法实现。

Q9通过 CDC 同步后两个端的数据质量如何监控,如何比对?

目前只能通过定时抽样来做数据质量的检查,数据质量问题一直是业内比较棘手的问题。

Q10大健云仓用的什么调度系统?系统如何与 Flink CDC 集合?

使用 XXL Job 作为分布式的任务调度,CDC 没有用到定时任务。

Q11如果采集增删表,SqlServer CDC 需要重启吗?

SqlServer CDC 目前不支持动态加表的功能。

Q12同步任务会影响系统性能吗?

基于 CDC 做同步任务肯定会影响系统性能,尤其是快照过程对数据库会有影响,进而影响应用系统。社区将来会做限流、对所有 connector 做并发无锁的实现,都是为了扩大 CDC 的应用场景以及易用性。

Q13全量和增量的 savepoint 怎么处理?

(未通过并发无锁框架实现的连接器)全量过程中不可以触发 savepoint,增量过程中如果需要停机发布,可通过 savepoint 恢复任务。

Q14CDC 同步数据到 Kafka ,而 Kafka 里面存的是 Binlog ,如何保存历史数据和实时数据?

将 CDC 同步的数据全部 Sync 到 Kafka,保留的数据取决于 Kafka log 的清理策略,可以全部保留。

Q15CDC 会对 Binlog 的日志操作类型进行过滤吗?会影响效率吗?

即使有过滤操作,对性能影响也不大。

Q16CDC 读 MySQL 初始化快照阶段,多个程序读不同的表会有程序报错无法获取锁表的权限,这是什么原因?

建议先查看 MySQL CDC 是不是使用老的方式实现,可以尝试新版本的并发无锁实现。

Q17MySQL 上亿大表全量和增量如何衔接?

建议阅读雪尽老师在 2.0 的相关博客,非常简单清晰地介绍了并发无锁如何实现一致性快照,完成全量和增量的切换。

免责声明:本平台仅供信息发布交流之途,请谨慎判断信息真伪。如遇虚假诈骗信息,请立即举报

举报
反对 0
打赏 0
更多相关文章

评论

0

收藏

点赞