DataX使用总结_datax where-程序员宅基地

技术标签: DataX  MySQL  

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github,地址:https://github.com/alibaba/DataX。

 

DataX安装部署


1.下载压缩包:

下载页面地址:https://github.com/alibaba/DataX 在页面中【Quick Start】--->【Download DataX下载地址】进行下载。下载后的包名:datax.tar.gz。解压后{datax}目录下有{bin conf job lib log log_perf plugin script tmp}几个目录。
2.安装

将下载后的压缩包直接解压后可用,前提是对应的java及python环境满足要求。
    JDK(1.6以上,推荐1.6)
    Python(推荐Python2.6.X)一定要为python2,因为后面执行datax.py的时候,里面的python的print会执行不了,导致运行不成功,会提示你print语法要加括号,python2中加不加都行 python3中必须要加,否则报语法错

另外要注意, 不要解压到C:\Program Files目录下或其他名字带空格的目录下,因为在cmd执行时会因为路径有空格导致找不到程序主文件。

DataX支持绝大部分种类数据库的数据转移,其数据转移的主要流程有三步:Reader -->transform-->writer

reader 从数据库读取需要转移的数据,transform在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,这些工作在这里进行,writer将读取并处理后的数据写入目标数据库。

DataX配置文件

DataX是通过读取配置文件进行数据转移,配置文件为json格式,这里以MySQL做示范,MySQL 2 MySQL

{
    "job": {
        
        "content": [
                    //reader过程配置信息
            {
                "reader": {   
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",//数据库用户名
                        "password": "root",
                        "column": [
                            "id",        //要读取的列元素 list
                            "name"
                        ],
                         "where":"",//可以添加筛选条件
                        "splitPk": "db_id",//数据分片
                        "connection": [
                            {
                                "table": [
                                    "table"//要读取的表名list,支持多个表的读取
                                ],
                                "querySql":[     //自定义筛选SQL
                                    "select reflect.id as id from user ,reflect where user.id = reflect.user_id",
                                ]
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database" //要读取的数据库URL 可以加数据库配置信息后缀
                                ]
                            }
                        ]
                    }
                },

                "writer": {  //writer过程配置信息
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",    //要写入的列list 注意要与读取的列一致
                        "name"
                        ],
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://120.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "root", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
            }
        ]
    }
}
  • 数据分片:如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。官方文档推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点,要注意目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

  • Where:添加where配置可以对要转移的数据进行筛选,比如可以选择只转移今天的数据,"where":"gmt_create > $bizdate "

MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据读取

querySql:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

类型转换

DataX 内部类型 Mysql 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

贴上我的一个测试数据转移配置

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "querySql":[
                                    "select reflect.id as id,reflect.user_id as user_id,user.mail as comment from user ,reflect where user.id = reflect.user_id",
                                ]
                            }
                        ], 
                        "password": "xxxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",//必选 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句 insert/replace/update  默认insert
                        "column": [
                        "id",
                        "user_id",
                        "comment"
                        ],
                        "preSql":"",   //可选 插入数据前执行的SQL
                        "postSql":"",  //可选 插入数据成功后执行的SQL
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

用到了连表查询信息然后作为一个字段值转移到目标数据库,这里要注意查询结果的列名和顺序要和writer里column里一样。

writeMode

  • 描述:控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句

  • 必选:是

  • 所有选项:insert/replace/update

  • 默认值:insert

启动命令

CMD下 进到你解压的dataX/bin目录下

python  datax.py  ..\job\mysql2mysql.json 

Transformer

下面说一下transform的使用,transform用于对读取的数据进行特殊定制化的需求场景,包括裁剪列、转换列等工作,主要使用的是五个对数据进行处理的方法,分别是dx_substr(),dx_pad(),dx_replace(),dx_filter(),dx_groovy(),官方文档对这几个方法的解释如下: 

dx_substr

参数:3个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。

返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"

dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

dx_pad

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。

返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符

举例:

         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz

         dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

dx_replace

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。

返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"

dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"data****"

dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)

参数:

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。

返回:

    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。

举例:

dx_filter(1,"like","dataTest") 

dx_filter(1,">=","10") 

dx_groovy

参数。

    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage

备注:

    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):

 transform job示例:

"transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": 
                            {
                            "columnIndex":5,
                            "paras":["1","3"]
                            }  
                    },
                    {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":4,
                            "paras":["3","4","****"]
                            }  
                    },
                    {
                        "name": "dx_groovy",
                          "parameter": 
                            {
                               "code": "//groovy code//",  
                               "extraPackage":[
                               "import somePackage1;", 
                               "import somePackage2;"
                               ]                      
                            }  
                    }
                ]

要注意  Reader  Writer 和Transform 的配置都要写在content下面,Reader和writer的配置是用大括号,transform是写在[]里,因为transform可以包括多个数据处理,下面贴上一个包括reader writer transform Job ,

{
    "job": {

        "setting": {
            "speed": {
                "channel": "1"
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",
                        "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                },
                "transformer":[
                    {
                        "name":"dx_substr",
                        "parameter": 
                            {
                            "columnIndex":3,
                            "paras":["1","3"]
                            }  
                    },
                     {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":2,
                            "paras":["1","4","****"]
                            }  
                    }
                ]
            }
        ]

    }
}

打开CMD 执行命令

结果:

可以看到读取到两条符合筛选条件的数据并全部写入,transform两条数据全部成功,然后看一下数据库:

被读取数据库数据:

写入目标数据库结果:

可以看到第三条数据经过了dx_replace()替换,第四条经过了dx_substr()裁剪,数据转移成功。

PS:CMD中文乱码问题  使用HCP 65001即可

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_42050545/article/details/93883887

智能推荐

874计算机科学基础综合,2018年四川大学874计算机科学专业基础综合之计算机操作系统考研仿真模拟五套题...-程序员宅基地

文章浏览阅读1.1k次。一、选择题1. 串行接口是指( )。A. 接口与系统总线之间串行传送,接口与I/0设备之间串行传送B. 接口与系统总线之间串行传送,接口与1/0设备之间并行传送C. 接口与系统总线之间并行传送,接口与I/0设备之间串行传送D. 接口与系统总线之间并行传送,接口与I/0设备之间并行传送【答案】C2. 最容易造成很多小碎片的可变分区分配算法是( )。A. 首次适应算法B. 最佳适应算法..._874 计算机科学专业基础综合题型

XShell连接失败:Could not connect to '192.168.191.128' (port 22): Connection failed._could not connect to '192.168.17.128' (port 22): c-程序员宅基地

文章浏览阅读9.7k次,点赞5次,收藏15次。连接xshell失败,报错如下图,怎么解决呢。1、通过ps -e|grep ssh命令判断是否安装ssh服务2、如果只有客户端安装了,服务器没有安装,则需要安装ssh服务器,命令:apt-get install openssh-server3、安装成功之后,启动ssh服务,命令:/etc/init.d/ssh start4、通过ps -e|grep ssh命令再次判断是否正确启动..._could not connect to '192.168.17.128' (port 22): connection failed.

杰理之KeyPage【篇】_杰理 空白芯片 烧入key文件-程序员宅基地

文章浏览阅读209次。00000000_杰理 空白芯片 烧入key文件

一文读懂ChatGPT,满足你对chatGPT的好奇心_引发对chatgpt兴趣的表述-程序员宅基地

文章浏览阅读475次。2023年初,“ChatGPT”一词在社交媒体上引起了热议,人们纷纷探讨它的本质和对社会的影响。就连央视新闻也对此进行了报道。作为新传专业的前沿人士,我们当然不能忽视这一热点。本文将全面解析ChatGPT,打开“技术黑箱”,探讨它对新闻与传播领域的影响。_引发对chatgpt兴趣的表述

中文字符频率统计python_用Python数据分析方法进行汉字声调频率统计分析-程序员宅基地

文章浏览阅读259次。用Python数据分析方法进行汉字声调频率统计分析木合塔尔·沙地克;布合力齐姑丽·瓦斯力【期刊名称】《电脑知识与技术》【年(卷),期】2017(013)035【摘要】该文首先用Python程序,自动获取基本汉字字符集中的所有汉字,然后用汉字拼音转换工具pypinyin把所有汉字转换成拼音,最后根据所有汉字的拼音声调,统计并可视化拼音声调的占比.【总页数】2页(13-14)【关键词】数据分析;数据可..._汉字声调频率统计

linux输出信息调试信息重定向-程序员宅基地

文章浏览阅读64次。最近在做一个android系统移植的项目,所使用的开发板com1是调试串口,就是说会有uboot和kernel的调试信息打印在com1上(ttySAC0)。因为后期要使用ttySAC0作为上层应用通信串口,所以要把所有的调试信息都给去掉。参考网上的几篇文章,自己做了如下修改,终于把调试信息重定向到ttySAC1上了,在这做下记录。参考文章有:http://blog.csdn.net/longt..._嵌入式rootfs 输出重定向到/dev/console

随便推点

uniapp 引入iconfont图标库彩色symbol教程_uniapp symbol图标-程序员宅基地

文章浏览阅读1.2k次,点赞4次,收藏12次。1,先去iconfont登录,然后选择图标加入购物车 2,点击又上角车车添加进入项目我的项目中就会出现选择的图标 3,点击下载至本地,然后解压文件夹,然后切换到uniapp打开终端运行注:要保证自己电脑有安装node(没有安装node可以去官网下载Node.js 中文网)npm i -g iconfont-tools(mac用户失败的话在前面加个sudo,password就是自己的开机密码吧)4,终端切换到上面解压的文件夹里面,运行iconfont-tools 这些可以默认也可以自己命名(我是自己命名的_uniapp symbol图标

C、C++ 对于char*和char[]的理解_c++ char*-程序员宅基地

文章浏览阅读1.2w次,点赞25次,收藏192次。char*和char[]都是指针,指向第一个字符所在的地址,但char*是常量的指针,char[]是指针的常量_c++ char*

Sublime Text2 使用教程-程序员宅基地

文章浏览阅读930次。代码编辑器或者文本编辑器,对于程序员来说,就像剑与战士一样,谁都想拥有一把可以随心驾驭且锋利无比的宝剑,而每一位程序员,同样会去追求最适合自己的强大、灵活的编辑器,相信你和我一样,都不会例外。我用过的编辑器不少,真不少~ 但却没有哪款让我特别心仪的,直到我遇到了 Sublime Text 2 !如果说“神器”是我能给予一款软件最高的评价,那么我很乐意为它封上这么一个称号。它小巧绿色且速度非

对10个整数进行按照从小到大的顺序排序用选择法和冒泡排序_对十个数进行大小排序java-程序员宅基地

文章浏览阅读4.1k次。一、选择法这是每一个数出来跟后面所有的进行比较。2.冒泡排序法,是两个相邻的进行对比。_对十个数进行大小排序java

物联网开发笔记——使用网络调试助手连接阿里云物联网平台(基于MQTT协议)_网络调试助手连接阿里云连不上-程序员宅基地

文章浏览阅读2.9k次。物联网开发笔记——使用网络调试助手连接阿里云物联网平台(基于MQTT协议)其实作者本意是使用4G模块来实现与阿里云物联网平台的连接过程,但是由于自己用的4G模块自身的限制,使得阿里云连接总是无法建立,已经联系客服返厂检修了,于是我在此使用网络调试助手来演示如何与阿里云物联网平台建立连接。一.准备工作1.MQTT协议说明文档(3.1.1版本)2.网络调试助手(可使用域名与服务器建立连接)PS:与阿里云建立连解释,最好使用域名来完成连接过程,而不是使用IP号。这里我跟阿里云的售后工程师咨询过,表示对应_网络调试助手连接阿里云连不上

<<<零基础C++速成>>>_无c语言基础c++期末速成-程序员宅基地

文章浏览阅读544次,点赞5次,收藏6次。运算符与表达式任何高级程序设计语言中,表达式都是最基本的组成部分,可以说C++中的大部分语句都是由表达式构成的。_无c语言基础c++期末速成