python基础教程:用python简单实现mysql数据同步到ElasticSearch的教程_老程序员阿福的博客-程序员宝宝

技术标签: python  python基础编程  数据库  大数据  

今天小编就为大家分享一篇用python简单实现mysql数据同步到ElasticSearch的教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个

思路:

网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据

使用:

只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(数据库管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:

# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滚和提交的操作
def find(func):
 def wrapper(self, *args, **kwargs):
  try:
   return func(self, *args, **kwargs)
  except Exception as e:
   print(traceback.format_exc())
   print(str(e))
   return traceback.format_exc()
  finally:
   self.session.close()
 return wrapper
class MysqlManager(object):
 def __init__(self):
  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
         pool_recycle=3600)
  # self.DB_Session = sessionmaker(bind=self.engine)
  # self.session = self.DB_Session()
  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
  self.db = scoped_session(self.DB_Session)
  self.session = self.db()
 @find
 def select_all_dict(self, sql, keys):
  a = self.session.execute(sql)
  a = a.fetchall()
  lists = []
  for i in a:
   if len(keys) == len(i):
    data_dict = {}
    for k, v in zip(keys, i):
     data_dict[k] = v
    lists.append(data_dict)
   else:
    return False
  return lists
 # 关闭
 def close(self):
  self.session.close()

aa.sql:

select 
 CONVERT(c.`id`,CHAR)    as id, 
 c.`code`   as code, 
 c.`project_name` as project_name, 
 c.`name`   as name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, 
from `cc` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';

bb.sql:

select 
 CONVERT(c.`id`,CHAR)    as id, 
 CONVERT(c.`age`,CHAR)    as age, 
 c.`code`   as code, 
 c.`name`   as name, 
 c.`project_name` as project_name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `bb` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名与es中的type名一致
mysql = {
 # mysql连接信息
 "mysql_connection_string": "root:[email protected]:3306/xxx",
 # sql文件信息
 "statement_filespath":[
  # sql对应的es索引和es类型
  {
   "index":"a1",
   "sqlfile":"aa.sql",
   "type":"aa"
  },
  {
   "index":"a1",
   "sqlfile":"bb.sql",
   "type":"bb"
  },
 ],
}
# es的ip和端口
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
# 字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识
db_field = {
  "aa":
   ("id",
   "code",
   "name",
   "project_name",
   "update_time",
   ),
 "bb":
  ("id",
   "code",
   "age",
   "project_name",
   "name",
   "update_time",
   ),
}
es_config = {
 # 间隔多少秒同步一次
 "sleep_time":10,
 # 为了解决服务器之间时间差问题
 "time_difference":3,
 # show_json 用来展示导入的json格式数据,
 "show_json":False,
}

mstes.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
  try:
   # 是否展示json数据在控制台
   self.show_json = es_config.get("show_json")
   # 间隔多少秒同步一次
   self.sleep_time = es_config.get("sleep_time")
   # 为了解决同步时数据更新产生的误差
   self.time_difference = es_config.get("time_difference")
   # 当前时间,留有后用
   self.datetime_now = ""
   # es的ip和端口
   es_host = elasticsearch.get("hosts")
   # 连接es
   self.es = Elasticsearch(es_host)
   # 连接mysql
   self.mm = MysqlManager()
  except :
   print(traceback.format_exc())
 def tongbu_es_mm(self):
  try:
   # 同步开始时间
   start_time = time.time()
   print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   # 这个list用于批量插入es
   actions = []
   # 获得所有sql文件list
   statement_filespath = mysql.get("statement_filespath",[])
   if self.datetime_now:
    # 当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化
    # sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T
    self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
   else:
    self.datetime_now = "1999-01-01T00:00:00"
   if statement_filespath:
    for filepath in statement_filespath:
     # sql文件
     sqlfile = filepath.get("sqlfile")
     # es的索引
     es_index = filepath.get("index")
     # es的type
     es_type = filepath.get("type")
     # 读取sql文件内容
     with open(sqlfile,"r") as opf:
      sqldatas = opf.read()
      # ::datetime_now是一个自定义的特殊字符串用于增量更新
      if "::datetime_now" in sqldatas:
       sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
      else:
       sqldatas = sqldatas
      # es和sql字段的映射
      dict_set = db_field.get(es_type)
      # 访问mysql,得到一个list,元素都是字典,键是字段名,值是数据
      db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
      if db_data_list:
       # 将数据拼装成es的格式
       for db_data in db_data_list:
        action = {
         "_index": es_index,
         "_type": es_type,
         "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
         "_source": db_data
        }
        # 如果没有id字段就自动生成
        es_id = db_data.get("id", "")
        if es_id:
         action["_id"] = es_id
        # 是否显示json再终端
        if self.show_json:
         print(action)
        # 将拼装好的数据放进list中
        actions.append(action)
   # list不为空就批量插入数据到es中
   if len(actions) > 0 :
    helpers.bulk(self.es, actions)
  except Exception as e:
   print(traceback.format_exc())
  else:
   end_time = time.time()
   print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   self.time_difference = end_time-start_time
  finally:
   # 报错就关闭数据库
   self.mm.close()
def main():
 tb = TongBu()
 # 间隔多少秒同步一次
 sleep_time = tb.sleep_time
 # 死循环执行导入数据,加上时间间隔
 while True:
  tb.tongbu_es_mm()
  time.sleep(sleep_time)
if __name__ == '__main__':
 main()

写到这里,给大家推荐一个资源很全的python学习聚集地,点击进入,这里有资深程序员分享以前学习心得,学习笔记,还有一线企业的工作经验,且给大家精心整理一份python零基础到项目实战的资料,每天给大家讲解python最新的技术,前景,学习需要留言的小细节
以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/haoxun11/article/details/105036677

智能推荐

js如何获取IP地址?教你4种方法_js获取ip地址_君君啊�的博客-程序员宝宝

1、js取得IP地址的方法一<script src="http://pv.sohu.com/cityjson?ie=utf-8">\</script> <script type="text/javascript"> document.write(returnCitySN["cip"]+','+returnCitySN["cname"]) </script>2、js取得IP地址的方法二<script language="javascript"

Clickhouse rpm离线 集群安装_警告:clickhouse-client-22.2.2.1-2.noarch.rpm: 头v4 rs_思无邪_TongCC的博客-程序员宝宝

环境: centos7.5dockerapache-01dockerapache-02dockerapache-03由于服务器环境是离线的。需要采用离线安装。在线安装可以参考下列博客:https://blog.csdn.net/oppo62258801/article/details/1036018701.下载...

Dos命令大全完整版_♬ 小邢同学的博客-程序员宝宝

DOS(磁盘操作系统)命令,是DOS操作系统的命令,是一种面向磁盘的操作命令,主要包括目录操作类命令、磁盘操作类命令、文件操作类命令和其它命令。DOS命令不区分大小写,比如C盘的Program Files,在dos命令中完全可以用"progra~1"代替,加上英文引号是因为名称的中间有空格(即多于一个词),这一点是初学者经常忽略的。常用命令:(1)查看目录内容命令 DIR(2)指定可执行文件搜索目录 PATH(3)创建目录命令 MD(4)打开指定目录命令 CD(5)删除当前指定的子目录命令 R

影像组学资料_迷路在代码中的博客-程序员宝宝

pyradiomics官网Feature ClassesCurrently supports the following feature classes:First Order Statistics (19 features,一阶统计量) 体素强度的一阶统计特征值,包括平均值,最大、小值等, 1. Energy; 2. Total Energy; 3. Entropy; 4. Minimum,numpy.min(Array); 5. 10th percentile,numpy....

Renew_suhido的博客-程序员宝宝

[quote]Again, you can’t connect the dots looking forward; you can only connect them looking backwards. So you have to trust that the dots will somehow connect in your future. You have to trust in some...

448. 找到所有数组中消失的数字 & 442. 数组中重复的数据(数组)&剑指 Offer 03. 数组中重复的数字【S】& 41. 缺失的第一个正数【H】_hashset index上的值_zcc今天好好学习了吗的博客-程序员宝宝

448. 找到所有数组中消失的数字1. 使用额外空间可以想到用HashSet,遍历数组将所有的内容,放在set中,最后遍历[1,n],看哪些数字没有出现2. 原地修改题目要求:不使用额外空间——不能用哈希表等存储;复杂度为O(N),那么就是常量次遍历。我的方法:一个萝卜一个坑(初始思路来自哪里找不到了)想反例:如果正好是1~n个数字,有且仅出现一次,那么经过遍历之后一定能顺序排列,即1-1,2-2,…n-n(注意这边的index是从1开始的)如果存在数字重复,那么一定有数组不存在的,数字重复

随便推点

关于个人对UART+DMA +串口空闲中断的理解与疑惑_dma空闲中断 modbus_yl浪迹天涯的博客-程序员宝宝

以下只是个人根据自己实际项目的使用在论坛里看到的一些交流的理解,欢迎大家指出其中理解错误的地方。1. 我为什么选择了 串口+DMA+空闲中断模式 在网上去搜索会看到很多的资料与简介,甚至还有许多实例程序,这里我就不在重复介绍。当初我选择这种模式来进行处理接收串口数据有以下几个原因。1. 实际中我是用串口去接收一种船舶上发出的报文数据(AIS数据),你当成GPS就行了。这种数据...

记一次利用动态调试so包破解协议 (2)_Kawa103的博客-程序员宝宝

继续上一篇https://blog.csdn.net/u014476720/article/details/83650566的操作上一篇只分析出so包里面的 buildParam2 里面的加密方式,这一篇来看看buildParam1的加密方式这里分析是已用户主页的接口对应在分析buildParam1里面的加密方式的时候比较坑,可能是工具的原因,又或者是开发者设置的阻碍吧ida打...

Open SQL 增刪查改(CRUD)_eley的博客-程序员宝宝

ABAP 中 OPEN SQL中的查看操作上篇文件已有過介紹,此處將不再整理。接下來就來看看其它動作的相關語法。    1.UPDATE(修改操作)    UPDATE实现对数据的更新操作,语法如下:    UPDATE SET f1...fn (WHERE ).     UPDATE FROM TABLE (WHERE ).    [For Example]

Learn C++学习笔记:第M章—最常用的智能指针:std::unique_ptr & std::make_unique_不要熬夜多喝热水的博客-程序员宝宝

1、基本使用介绍前面已经介绍了很多智能指针了,它就是一个类。这么重要的类当然不需要我们手写,有现成的可以用,它就是:std::unique_ptr①、所在头文件这是一个封装好的类,包含在头文件<memory>中。②、初始化初始的方式也跟正常的类使用一样:std::unique_ptr<YourClass> res{ new YourClass() };③、智能指针之间的复制std::unique_ptr是实现了移动语义的功能的并且禁用复制语义,在智能指针和智能

C++ MFC深入详解之----设置控件背景透明_mfc控件透明背景_阿尔兹的博客-程序员宝宝

首先要添加OnCtlColor函数1.该函数在VC++中的添加方法为:快捷键Ctrl+W或右击空白处打开ClassWizard对话框(也可以菜单->view->ClassWizard),在Message Maps里的Messages框中找到OnCtlColor,点击Add Function添加在VS 类视图中右键点击你的DLG类点击属性,在上面找到消息图标,滑动,找到OnCtl...

html表格列中加列,记一次LayUI中Table动态添加列数据_weixin_39662228的博客-程序员宝宝

这次在开发中遇到,有列数不固定的情况。废话不多说,先上图,在上代码。下面上JS代码function SearchData() {var dYear = $("#DYear").val();var beginWeek = $("#DSWeek").val();var endWeek = $("#DEWeek").val();var params = {};params = CreateParamDa...

推荐文章

热门文章

相关标签