kafka debezium mysql 日志订阅_订阅mysql blog日志-程序员宅基地

技术标签: kafka  mysql  大数据  

mysql

[mysqld]
server-id         = 1
log_bin           = mysql-bin
default-time-zone = '+8:00'
binlog_format = ROW
binlog_row_image  = FULL 
gtid_mode =ON
enforce_gtid_consistency = ON
log-slave-updates=1 # 5.6 版本需要开启该参数

debezium

下载 debezium 的 mysql插件
https://debezium.io/documentation/reference/1.2/connectors/mysql.html#mysql-connector-configuration-properties_debezium
下的Download the Debezium
MySQL connector plug-in.
解压到\kafka_2.13-2.8.0\plugins

kafka

启动zk kafka

nohup ./zookeeper-server-start.sh ../config/zookeeper.properties >> zook.log &
nohup ./kafka-server-start.sh ../config/server.properties  >> kafka.log &

启动kafka-connnector

nohup ./connect-distributed.sh ../config/connect-distributed.properties  >> distributed.log &

附:删除连接

curl -X DELETE
http://192.168.80.133:8083/connectors/sqlserver-syncdb-connector

查看插件是否生效

http://192.168.80.133:8083/connector-plugins

主题列表

./kafka-topics.sh --list --zookeeper 192.168.80.133:2181

生成者

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic
oracle-sync-topic

消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092
–from-beginning --topic mysql80.cdc_demo.customers

连接方式一:简约库表正则

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
    "name": "mysql-while-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "192.168.80.135",
      "database.port": "3306",
      "database.user": "root",
      "database.password": "123456",
      "database.server.id": "1",
      "database.server.name": "mysql80",
      "database.whitelist": "cdc_demo.*",
      "database.history.kafka.bootstrap.servers": "localhost:9092",
      "database.history.kafka.topic": "mysql.history.sync",
      "include.schema.changes": "true"
    }
 }'

自动生成的主题

[root@hd03 bin]# ./kafka-topics.sh --list --zookeeper  192.168.80.133:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql.history.sync
mysql80
mysql80.cdc_demo.customers
mysql80.cdc_demo.t_user

对应的增删改日志

{
    "before":{
    "id":1,"first_name":"23","last_name":"啊多发点是","email":"1"},"after":{
    "id":1,"first_name":"23","last_name":"11","email":"1"},"source":{
    "version":"1.2.5.Final","connector":"mysql","name":"mysql80","ts_ms":1658205507000,"snapshot":"false","db":"cdc_demo","table":"customers","server_id":1,"gtid":"cfda4e73-040a-11ed-a4c4-000c29d36977:6","file":"DESKTOP-5G0JOCT-bin.000010","pos":1689,"row":0,"thread":16,"query":null},"op":"u","ts_ms":1658205507576,"transaction":null}
{
    "before":null,"after":{
    "id":2,"first_name":"123","last_name":"123123","email":"123"},"source":{
    "version":"1.2.5.Final","connector":"mysql","name":"mysql80","ts_ms":1658205525000,"snapshot":"false","db":"cdc_demo","table":"customers","server_id":1,"gtid":"cfda4e73-040a-11ed-a4c4-000c29d36977:7","file":"DESKTOP-5G0JOCT-bin.000010","pos":2030,"row":0,"thread":16,"query":null},"op":"c","ts_ms":1658205525239,"transaction":null}
{
    "before":{
    "id":2,"first_name":"123","last_name":"123123","email":"123"},"after":null,"source":{
    "version":"1.2.5.Final","connector":"mysql","name":"mysql80","ts_ms":1658205529000,"snapshot":"false","db":"cdc_demo","table":"customers","server_id":1,"gtid":"cfda4e73-040a-11ed-a4c4-000c29d36977:8","file":"DESKTOP-5G0JOCT-bin.000010","pos":2348,"row":0,"thread":16,"query":null},"op":"d","ts_ms":1658205529284,"transaction":null}
null

连接方式二:指定库的list

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
    
"name": "dbhistory.mysql",
"config": {
    
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.80.135",
"database.port": "3306",
"database.user": "root",
"database.password": "123456",
"database.server.id": "3306",
"database.server.name": "mysql81",
"database.include.list": "cdc_demo",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.mysql",
"include.schema.changes": "true"
}
}'
命令成功后一大堆的主题生成
[root@hd03 bin]# ./kafka-topics.sh --list --zookeeper  192.168.80.133:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbhistory.mysql
mysql81
mysql81.cdc_demo.customers
mysql81.cdc_demo.t_user
mysql81.sakila.actor
mysql81.sakila.address
mysql81.sakila.category
mysql81.sakila.city
mysql81.sakila.country
mysql81.sakila.customer
mysql81.sakila.film
mysql81.sakila.film_actor
mysql81.sakila.film_category
mysql81.sakila.film_text
mysql81.sakila.inventory
mysql81.sakila.language
mysql81.sakila.payment
mysql81.sakila.rental
mysql81.sakila.staff
mysql81.sakila.store
mysql81.world.city
mysql81.world.country
mysql81.world.countrylanguage

对应主题下的增删改

{
    "before":{
    "id":2,"name":"11"},"after":{
    "id":2,"name":"爱的色放"},"source":{
    "version":"1.2.5.Final","connector":"mysql","name":"mysql81","ts_ms":1658205137000,"snapshot":"false","db":"cdc_demo","table":"t_user","server_id":1,"gtid":"cfda4e73-040a-11ed-a4c4-000c29d36977:3","file":"DESKTOP-5G0JOCT-bin.000010","pos":764,"row":0,"thread":16,"query":null},"op":"u","ts_ms":1658205137957,"transaction":null}
{
    "before":null,"after":{
    "id":223,"name":"123"},"source":{
    "version":"1.2.5.Final","connector":"mysql","name":"mysql81","ts_ms":1658205159000,"snapshot":"false","db":"cdc_demo","table":"t_user","server_id":1,"gtid":"cfda4e73-040a-11ed-a4c4-000c29d36977:4","file":"DESKTOP-5G0JOCT-bin.000010","pos":1079,"row":0,"thread":16,"query":null},"op":"c","ts_ms":1658205159470,"transaction":null}
{
    "before":{
    "id":223,"name":"123"},"after":null,"source":{
    "version":"1.2.5.Final","connector":"mysql","name":"mysql81","ts_ms":1658205163000,"snapshot":"false","db":"cdc_demo","table":"t_user","server_id":1,"gtid":"cfda4e73-040a-11ed-a4c4-000c29d36977:5","file":"DESKTOP-5G0JOCT-bin.000010","pos":1376,"row":0,"thread":16,"query":null},"op":"d","ts_ms":1658205163561,"transaction":null}
null

附:

1. default-time-zone 需要指定 否则,报错

Unable to connect: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than on
解决
[mysqld] 
default-time-zone = '+8:00'
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/dk18stone/article/details/125870001

智能推荐

mysql-5.7.18 linux安装,MYSQL数据库使用YUM在Linux(CentOS 7)下安装mysql 5.7.18的教程详解...-程序员宅基地

文章浏览阅读56次。《MYSQL数据库使用YUM在Linux(CentOS 7)下安装mysql 5.7.18的教程详解》要点:本文介绍了MYSQL数据库使用YUM在Linux(CentOS 7)下安装mysql 5.7.18的教程详解,希望对您有用。如果有疑问,可以联系我们。MYSQL实例项目需要使用MySQL,由于以前都是在windows下傻瓜式安装,基本没有遇到什么问题,但是这次是在服务器上安装,由于到Linu..._linux mysql-5.7.18安装包

知识积累,概要(杂项)_知识积累曲线-程序员宅基地

文章浏览阅读252次。直接进入正文,简单概括,划重点!!! 堆与栈:栈存储较小和短暂的数据,堆存储较大和较长的数据。栈是自行维护的,堆需要考虑垃圾回收。 GC方面:垃圾回收主要是指堆上的内存分配和回收。触发机制有3种:1.内存不够,自动触发。2.定时触发(依据平台)。3.GC强制执行内存分配流程:当需要获取一个足够大的内存块时,检测是否有足够的空闲内存 OFF--》 开始GC,清理..._知识积累曲线

i2c的时钟延展问题-程序员宅基地

文章浏览阅读4.5k次。结论: (即在模拟i2c主:在主设置SCL为高后,要超时判断SCL是否为高,再发后面的时序) 现象(如下图):由于在发送读命令之后,即ACk之后,下面从设备需要准备数据时间,(大约10us,一个时钟的时间),此时还在I2C中断中,因此SCLK上是被拉低。由于主设备,并未检查该SCLK信号,导致下一个数据的第一个时钟信号被拉低,而不知道,而当做有效信号采样,结果导致数据采用出错;我们自己_i2c的时钟延展

mt6735通用recovery_山寨4G V8主板MT6735线刷机包-程序员宅基地

文章浏览阅读3.2k次。版本号V8_F23_4G_V3_NP_H_V1.0.4_VOL_user本站提示:下载软件包请点击普通下载或者高速下载即可--升级工具包内自带!关于ROM官方首发优化信息:(手机官网)优化手机的来电归属地错误问题!增强信号差BUG,修复已知BUG!修复充电过慢问题,提高微信抢红包速率!提高系统安全性及性能!解决3G信号问题 增强3G接收!最后感谢大家对本站大力支持!刷机过程中遇到任何问题请联系本站..._mtk6735刷机包

codeforces 148D之概率DP_codeforces 概率-程序员宅基地

文章浏览阅读2.4k次。http://codeforces.com/problemset/problem/148/DD. Bag of micetime limit per test2 secondsmemory limit per test256 megabytesinputstandard inputoutputstandard outp_codeforces 概率

在java中实现订餐系统_Java实现简单订餐系统-程序员宅基地

文章浏览阅读4.1k次,点赞2次,收藏28次。本文实例为大家分享了Java实现简单订餐系统的具体代码,供大家参考,具体内容如下import java.util.Scanner;import java.util.*;public class OrderingMsg {public static void main(String[] args) {// 数据主体:一组订单信息String[] names = new String[4]; // 订..._java美每胃订餐系统

随便推点

Windows下使用QT+OpenCV完成人脸检测(获取摄像头的数据进行检测)_qtopencv人脸检测-程序员宅基地

文章浏览阅读2.1k次。Windows版本: Win10 X64OpenCV版本: 2.4.13.6QT版本: 5.12。_qtopencv人脸检测

python的快速排序算法_python快速排序算法代码-程序员宅基地

文章浏览阅读443次。该函数首先检查数组的长度是否小于等于1,如果是,则直接返回该数组。否则,它会选择第一个元素作为枢纽值,然后将数组分成两部分,一部分是小于枢纽值的元素,另一部分是大于枢纽值的元素。最后,它会对两部分分别递归调用快速排序函数,并将结果拼接起来。该算法的时间复杂度为O(nlogn),其中n是数组的长度。它在最坏情况下的时间复杂度为O(n^2),但通常情况下都能够达到O(nlogn)的效率。_python快速排序算法代码

Python的random模块-程序员宅基地

文章浏览阅读2.1w次,点赞18次,收藏135次。【Python模块学习】3、random模块参考:1、官网;2、别人的 以下是random模块的方法: 1 random.seed(a=None, version=2) # 初始化伪随机数生成器。如果未提供a或者a=None,则使用系统时间为种子。如果a是一个整数,则作为种子。 2 random.getstate() # 返回一个当前生成器的内部状态的对象 3_python的random模块

面试技巧之单面_单面流程准备要点-程序员宅基地

文章浏览阅读6.3k次。单面的要点:Credibility(信任感,自己做的事-懂行+有料)Reliability(可靠-首先别迟到+说到做到)Intimacy(亲密感,企业文化、企业新闻)Self-Orientation(从自己的利益考虑)(A*B*C)/D 最终,我能给公司带来什么单面一般一开始都会有自我介绍:自己信息+面试职位自己能力和该公司的职位要求的匹配点,这点非_单面流程准备要点

PPT-程序员宅基地

文章浏览阅读330次。1.好的 PPT字要少,因为相比于选择题与判断题观众不喜欢阅读题。2.PPT做的好的人,一定是要站在观众角度进行思考的人。3.增加PPT的审美。站酷、花瓣网。海报、VI、画册、平面、演示。4.PPT初始化设置:撤销次数设置:文件--选项--高级--编辑选项--最多可取消操作数。自动保存:文件--选项--保存--勾选自动恢复版本。幻灯片比例调整:设计--右...

http://liveforlinux.blog.51cto.com/3337218/1056484-程序员宅基地

文章浏览阅读522次。awk系列3--比较全面在各大网站看到的 自己整理的awk学习实例[root@localhostopt]#catgrade.txtM.Tansley05/9948311Green84044J.Lulu06/9948317green92426P.Bunny02/99..._456mmmcon