db-archiver 如何平替 Datax 成为 Databend 离线数据同步的最佳方案

2024/08/14 分享备份。

背景

db-archiver ( https://github.com/databendcloud/db-archiver ) 是我们自研的一个能够实现从 RDBMS 全量或增量 T+1 归档数据的工具,基于 db-archiver 目前可以实现 Mysql Or Tidb 的归档。

在此之前我们先来看一下目前在 databend 的生态中,我们提供实现数据同步的方案大概有这么几种,Flink CDC, kafka connect, debezium,Airbyte ,但是像 Flink CDC 等这几个方案,首先就需要用户已经有了 Flink 或者本身就搭建 kafka 等基础设施,而很多小公司都不会有人力去维护这么一套。或者用户只是想一次性迁移数据到 Databend,这种情况下就要求工具尽量简单,开箱即用,用完即走。能符合这个条件的,在这个图里只有 Datax 符合。

image.png

所以 Db-archiver 设计的初衷是为了平替 datax 。DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、databend 等各种异构数据源之间高效的数据同步功能。但是在实施过程中我们发现 Datax 有诸多的痛点,导致用户在数据迁移过程中困难重重, 体验很差。

db-archiver VS Datax

  1. 打包速度以及可执行文件大小

Datax 是java 写的,需要加载各种 jar 包,整个项目较为臃肿编译速度非常慢,且生成的可执行文件总共有 2 个多 G,使用起来非常灾难。

db-archiver 呢编译速度快,最终可执行文件只有 10M ,十分清爽。

2.开发以及 bug fix 的速度

Datax 的维护方是阿里,该项目目前基本处于一种半维护状态, issue 的解决以及 pr 的合并速度都极慢,所以我们每次 fix 问题之后,都是在 fork 的分支上打包给客户,鉴于第一条中提到的包大小有 2G 之多,整个过程对用户很不友好。

image.png

  1. 指标和日志

db-archiver 增加了非常多的指标和日志,对排查问题很有帮助,基本上哪里有问题,一眼就能定位出来。下面截取了一次同步过程中的部分日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
INFO[0011] upload by presigned url cost: 6076 ms
INFO[0012] upload by presigned url cost: 7262 ms
INFO[0012] thread-7: copy into cost: 1400 ms ingest_databend=IngestData
2024/08/13 15:43:58 thread-7: ingest 39999 rows (4201.005115 rows/s), 23639409 bytes (2482794.023097 bytes/s)
INFO[0012] thread-4: copy into cost: 1408 ms ingest_databend=IngestData
2024/08/13 15:43:58 thread-4: ingest 39999 rows (4125.188979 rows/s), 22799430 bytes (2437986.686345 bytes/s)
2024/08/13 15:43:58 Globla speed: total ingested 79998 rows (8315.510453 rows/s), 15279619 bytes (1588262.600387 bytes/s)
INFO[0012] condition: (id >= 1439993 and id < 1479992)
INFO[0012] thread-2: copy into cost: 1713 ms ingest_databend=IngestData
2024/08/13 15:43:58 thread-2: ingest 39999 rows (4095.170564 rows/s), 22799430 bytes (2420245.803072 bytes/s)
2024/08/13 15:43:58 Globla speed: total ingested 119997 rows (8162.421021 rows/s), 29719259 bytes (1559022.517010 bytes/s)
INFO[0012] condition: (id >= 839996 and id < 879995)
2024/08/13 15:43:58 Globla speed: total ingested 119997 rows (8087.586352 rows/s), 44158899 bytes (1544729.094295 bytes/s)
INFO[0012] condition: (id >= 439998 and id < 479997)
INFO[0012] thread-0: copy into cost: 1824 ms ingest_databend=IngestData
2024/08/13 15:43:58 thread-0: ingest 39999 rows (4035.346363 rows/s), 21726145 bytes (2384889.700537 bytes/s)
2024/08/13 15:43:58 thread-7: extract 39999 rows (0.000000 rows/s)
2024/08/13 15:43:58 thread-4: extract 39999 rows (0.000000 rows/s)
2024/08/13 15:43:58 Globla speed: total ingested 159996 rows (7927.205481 rows/s), 57525254 bytes (1514096.345894 bytes/s)
INFO[0013] condition: (id >= 40000 and id < 79999)
2024/08/13 15:43:58 thread-2: extract 39999 rows (0.000000 rows/s)
2024/08/13 15:43:58 thread-0: extract 39999 rows (0.000000 rows/s)
INFO[0013] upload by presigned url cost: 8293 ms
INFO[0013] thread-3: copy into cost: 1241 ms ingest_databend=IngestData
2024/08/13 15:43:59 thread-3: ingest 39999 rows (3748.537451 rows/s), 22799430 bytes (2215219.768763 bytes/s)
2024/08/13 15:43:59 Globla speed: total ingested 199995 rows (7375.987130 rows/s), 71964894 bytes (1408761.080845 bytes/s)
INFO[0013] condition: (id >= 639997 and id < 679996)
2024/08/13 15:43:59 thread-3: extract 39999 rows (40001.375235 rows/s)

image.png

  • 从 MySQL 中抽取 数据的速率
  • 每次 presign 的效率
  • upload stage
  • copy into
  • 每个线程的同步速率 (2024/08/13 15:43:58 thread-2: ingest 39999 rows (4095.170564 rows/s), 22799430 bytes (2420245.803072 bytes/s))
  • 全局的同步速率 (2024/08/13 15:43:59 Globla speed: total ingested 199995 rows (7375.987130 rows/s), 71964894 bytes (1408761.080845 bytes/s))

通过全局的同步速率我们可以确定当前的配置参数是不是最优的,比如 batchSize,thread 数量与机器的配置搭配是不是最优。

可执行文件大小 开发以及 bug fix 的速度 指标和日志 速率
db-archiver 10M 200w 数据 2min
Datax 2G 200w 数据 10min

总之我们自研 db-archiver 之后,整个过程更加的可控,再比如我们刚刚对接的一个客户,他们与 databend Cloud 不在同一个 region,同步数据走公网非常慢的问题,这时候我们增加 userStage 参数让用户创建并指定 external stage,这在 datax 中是无法通过参数配置的,所以这样也能够给到用户最佳的体验。

下面来看下 db-archiver 提供的两种同步模式。

两种模式

第一种是

根据 sourceSplitKey 同步数据

如果源表有自增主键,可以设置 sourceSplitKey为主键。db-archiver 将根据 sourceSplitKey 按照规则切分数据,并发同步数据到 Databend。这是性能最高的模式。后面会讲这个切分数据的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"sourceHost": "127.0.0.1",
"sourcePort": 3306,
"sourceUser": "root",
"sourcePass": "",
"sourceDB": "mydb",
"sourceTable": "test_table1",
"sourceWhereCondition": "id > 0",
"sourceSplitKey": "id",
"databendDSN": "<https://cloudapp:ajynnyvfk7ue@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443>",
"databendTable": "testSync.test_table1",
"batchSize": 40000,
"batchMaxInterval": 30,
"userStage": "USER STAGE",
"maxThread": 10
}

根据 sourceSplitTimeKey 同步数据

某些情况下用户的源表没有自增主键,但是有时间列,这个时候可以设置 sourceSplitTimeKey 同步数据。db-archiver 将根据 sourceSplitTimeKey 分割数据。

sourceSplitTimeKey 必须与 timeSplitUnit 一起设置。timeSplitUnit 是切片数据的时间颗粒度,可以是 minute, hour, day。由于数据在时间上的密集度用户是最了解的,用户就可以自定义timeSplitUnit 按时间列分割数据,以达到最合理的同步效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"sourceHost": "127.0.0.1",
"sourcePort": 3306,
"sourceUser": "root",
"sourcePass": "123456",
"sourceDB": "mydb",
"sourceTable": "my_table2",
"sourceWhereCondition": "t1 >= '2024-06-01' and t1 < '2024-07-01'",
"sourceSplitKey": "",
"sourceSplitTimeKey": "t1",
"timeSplitUnit": "minute",
"databendDSN": "<https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443>",
"databendTable": "testSync.my_table2",
"batchSize": 2,
"batchMaxInterval": 30,
"userStage": "~",
"deleteAfterSync": false,
"maxThread": 10
}

压测的过程中按照 sourceSplitTimeKey 同步性能不如按照主键 id,且并发的话会对 MySQL 的性能造成影响,为了保障MySQL 不受影响,所以这个模式不支持并发。

数据切片算法

对于 Databend 的数据同步没有太多可以分享的,核心的链路就是 源表(MySQL) → NDJSON→ Stage→ Copy into→ databend table。

比较核心的是数据切片的算法,这关乎数据同步的准确性、同步的效率,下面就重点讲下这个。

db-archiver 会取源表的主键为 sourceSplitKey 。将根据 sourceSplitKey 分割数据,并行同步数据到 Databend。

sourceSplitTimeKey 用于按时间列分割数据。sourceSplitTimeKeysourceSplitKey 至少需要设置一个。

Sync data according to the sourceSplitKey

下面详细讲一下 db-archiver 按照 id primary key 对数据切片的算法原理:

  1. 首先用户会传入一个 sourceWhereCondition 作为整体的数据范围圈定,比如 ‘id> 100 and id < 10000000’,这样就可以获取范围内的 min(id) 和 max(id)
  2. 根据上面拿到的 min, max id,再根据分配的线程数量计算出每个线程需要处理的数据范围:
1
2
3
4
5
6
7
8
9
10
rangeSize := (maxSplitKey - minSplitKey) / s.cfg.MaxThread
for i := 0; i < s.cfg.MaxThread; i++ {
lowerBound := minSplitKey + rangeSize*i
upperBound := lowerBound + rangeSize
if i == s.cfg.MaxThread-1 {
// Ensure the last condition includes maxSplitKey
upperBound = maxSplitKey
}
conditions = append(conditions, []int{lowerBound, upperBound})
}

注意处理这里的边界条件,当达到最后一个线程的时候,该线程内的 upperBound 直接等于 maxSplitKey,这里线程内的范围是一个全闭区间。

  1. 将上面得到的每个线程内的数据范围数组,分别再按照 batchSize 切分
1
2
3
4
5
6
if (minSplitKey + s.cfg.BatchSize - 1) >= allMax {
conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s <= %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, allMax))
return conditions
}
conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s < %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, minSplitKey+s.cfg.BatchSize-1))
minSplitKey += s.cfg.BatchSize - 1

这里也要处理一些边界条件,否则会导致切片范围有误最终导致同步过去的数据量不准确。

对于最后一次 (minSplitKey + s.cfg.BatchSize - 1) >= maxSplitKey,的情况,如果最小的 key 都比整个范围最大的 maxKey 大了,就直接返回。

如果当前的 maxSplitKey 恰好与 maxKey 相等,那这就是最后一批,需要保证右侧区间是闭区间,这样才能保证整个数据是连续的。

其余的情况就是左闭右开的数据区间。最终的切分效果就是:[1,100), [100, 200), [200,300), [300, 400)……, [9900, 10000]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for{
...
if (minSplitKey + s.cfg.BatchSize - 1) >= maxSplitKey {
if minSplitKey > allMax {
return conditions // minkey > allMax return directly
}
if maxSplitKey == allMax {
conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s <= %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, maxSplitKey)) // corner case, must <=x<=
} else {
conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s < %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, maxSplitKey)) // other condition is [)
}
break
}
}
  1. 在每个线程中,对按照 batchSize 切分后的数据进行同步
1
2
3
for _, condition := range conditions {
// processing
}

整个切片的流程可以参考下图:

image.png

Sync data according to the sourceSplitTimeKey

我们在给客户演示的时候得知,有些表是没有指定自增的 id,这种情况下上面按照 primary key id 进行数据同步的算法就失效了,所以我们需要支持按照时间字段切片同步。

  1. 跟 id 一样,也是先根据 where condition 确定 min, max time 的数据范围
  2. 第二步跟 id 有所区别,由于是按照时间字段切片,所以再使用 batchSize 来分割会让问题复杂并且容易出错, 并且数据在时间上的密集度用户是最了解的,所以这里我们引入新的字段 timeSplitUnit ,其取值有:
1
2
3
4
5
6
7
8
9
10
11
12
switch StringToTimeSplitUnit[c.TimeSplitUnit] {
caseMinute:
return 10 * time.Minute
caseQuarter:
return 15 * time.Minute
caseHour:
return 2 * time.Hour
caseDay:
return 24 * time.Hour
default:
return 0
}

这样可以让用户根据不同的数据密集程度,来决定用什么时间范围来切分。

1
2
3
4
5
6
7
8
9
10
if minTime.After(maxTime) {
conditions = append(conditions, fmt.Sprintf("(%s >= '%s' and %s <= '%s')", s.cfg.SourceSplitTimeKey, minTime.Format("2006-01-02 15:04:05"), s.cfg.SourceSplitTimeKey, maxTime.Format("2006-01-02 15:04:05")))
break
}
if minTime.Equal(maxTime) {
conditions = append(conditions, fmt.Sprintf("(%s >= '%s' and %s <= '%s')", s.cfg.SourceSplitTimeKey, minTime.Format("2006-01-02 15:04:05"), s.cfg.SourceSplitTimeKey, maxTime.Format("2006-01-02 15:04:05")))
break
}
conditions = append(conditions, fmt.Sprintf("(%s >= '%s' and %s < '%s')", s.cfg.SourceSplitTimeKey, minTime.Format("2006-01-02 15:04:05"), s.cfg.SourceSplitTimeKey, minTime.Add(s.cfg.GetTimeRangeBySplitUnit()).Format("2006-01-02 15:04:05")))
minTime = minTime.Add(s.cfg.GetTimeRangeBySplitUnit())
  1. 根据 batchSize 分批同步数据

这里取了个巧把 batchSize 切分放到了 SQL 里面,可以结合 offset 可以减小对源端的压力。

具体流程如下图:

image.png

演示

归档可视化((TODO))

为了进一步方便用户使用,简化数据进入 databend 的流程,我们计划将 db-archiver 可视化。

能够让用户通过在界面上配置同步任务,管理同步任务,点点点就能完成数据从外部迁移到 Databend。

配置页

配置的流程

image.png

任务管理页面

同步任务的界面如下:

image.png

也可以参考 dataworks 的任务界面:

image.png

可以查看目标任务的基本信息及运行情况。

  • 任务名称:为您展示任务的名称。单击任务名称,即可进入目标任务的详情页面。
  • 任务ID:为您展示任务的ID。
  • 运行状态:任务的运行状态,包括Runing, Stop, Success, Fail..等状态。
  • 开始运行:任务的开始运行时间。
  • 结束运行:任务的结束运行时间。
  • 运行时长:任务的运行时长,单位为秒。
  • 任务类型:任务的类型。
  • 运行速率: 数据导入的速率
  • 同步进度:数据同步的进度百分比

同样可以操作任务,对任务进行开始,暂停,继续,结束等操作。

前端直接在 https://github.com/databendcloud/db-archiver 项目中开发,然后通过 go embed 将前端一同打包,这样用户直接使用一个二进制启动项目即可。

存在问题

由于 db-archiver 可以根据用户配置信息使用了并发数据抽取,因此不能严格保证数据一致性:根据splitKey 进行数据切分后,会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间隔。因此这份数据并不是完整的一致的数据快照信息。

针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,我们提供几个解决思路给用户,用户可以自行选择:

  1. 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。
  2. 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。

不过 db-archiver 适用的是归档场景,基本能够保证当前为静态数据。

RoadMap

  • 支持 RDBMS 的更多数据源,比如 pg, oracle 等
  • 支持分库分表到一张表
  • 可视化归档

最后,希望 db-archiver 能够取代 datax ,成为日后我们给客户推荐离线数据同步的最佳方案。

我的分享就到这里,感谢大家。


-------------The End-------------

本文标题:db-archiver 如何平替 Datax 成为 Databend 离线数据同步的最佳方案

文章作者:cloud sjhan

发布时间:2024年08月14日 - 18:08

最后更新:2024年08月14日 - 18:08

原始链接:https://cloudsjhan.github.io/2024/08/14/db-archiver-如何平替-Datax-成为-Databend-离线数据同步的最佳方案/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

cloud sjhan wechat
subscribe to my blog by scanning my public wechat account
坚持原创技术分享,您的支持将鼓励我继续创作!
0%
;