Datax

1.DataX概述

DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(Mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

2.DataX作用

DataX作为中间传输载体负责连接数据各种数据源

3.DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NoSQL、大数据计算系统都已经接入,支持如下:

类型 数据源 Reader(读) Writer(写)
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
通用 RDBMS
阿里云数仓数据库 ODPS
ADS
OSS
OCS
NoSQL 数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB

3.DataX使用

DataX的使用非常简单,用户仅需要根据自己同步数据的数据源和目的地的类型来选择相应的Reader和Writer插件即可,并将Reader和Writer插件的信息配置在一个json文件中,然后,在执行命令时,指定配置文件提交数据同步任务即可。

3.1DataX配置文件格式

这里以同步HDFS数据到MySQL为例,说明DataX的配置文件格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/base_province",
"defaultFS": "hdfs://hadoop102:8020",
"column": ["*"],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "replace",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8",
"table": [
"test_province"
]
}
]
}
}
}
]
}
}

Reader和Writer的具体参数可参考官方文档

3.2DataX命令执行

DataX的命令执行非常简单,只需要在命令行中执行如下命令:

python bin/datax.py job/base_province.json

3.2.1DataX传参

在生产环境中,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,用来对每日同步的数据加以分区,也就是说每日同步数据的目标路径不是固定的,因此DataX配置文件中的DHFSWriter插件中参数path的值应该是动态变化的。为实现这个业务需求,我们需要使用DataX的传参功能
DataX传参用法
① 首选在任务的json配置文件中使用${param}引用参数
② 然后在提交任务时使用-p “-Dparam=value” 传入参数值

4.DataX调优

4.1DataX运行流程

4.2DataX调度决策思路

举例来说,用户提交了一个DataX作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。DataX的调度决策思路是:

  1. DataX Job根据分库分表切分策略(默认一个表一个Task,可以根据业务需要在reader插件中进行切分策略调整),将同步工作分成100个Task。
  2. DataX根据配置的总的并发度20,以及每个Task Group的并发度5,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分100个Task,每一个TaskGroup负责运行25个Task。

4.3DataX调优建议

  1. 速度控制:DataX中提供了包括通道(并发)、记录流、字节流三种流控模式,可以根据需要控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
  2. 优化参数
参数 说明 注意事项
job.setting.speed.channel 并发数
job.setting.speed.record 总 record 限速 配置此参数,则必须配置单个 channel 的 record 限速参数
job.setting.speed.byte 总 byte 限速 配置此参数,则必须配置单个 channel 的 byte 限速参数
core.transport.channel.speed.record 单个 channel 的 record 限速,默认 10000 条/s
core.transport.channel.speed.byte 单个 channel 的 byte 限速,默认 1M/s

注意:如果配置了总record限速和总byte限速,channel并发数就会失效。因为配置了这两个参数后,实际的channel并发数是通过计算得到的
①计算公式:
并发数=min(byte_总⁄〖byte〗_单个channel , record_总⁄〖record〗_单个channel )
② 配置实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576 //单个channel byte限速1M/s
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880 //总byte限速5M/s
}
},
...
}
}