用python简单实现mysql数据同步到ElasticSearch的教程
之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个
思路:
网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据
使用:
只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(数据库管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
#-*-coding:utf-8-*-
__author__="ZJL"
fromsqlalchemy.poolimportQueuePool
fromsqlalchemyimportcreate_engine
fromsqlalchemy.ormimportsessionmaker,scoped_session
importtraceback
importesconfig
#用于不需要回滚和提交的操作
deffind(func):
defwrapper(self,*args,**kwargs):
try:
returnfunc(self,*args,**kwargs)
exceptExceptionase:
print(traceback.format_exc())
print(str(e))
returntraceback.format_exc()
finally:
self.session.close()
returnwrapper
classMysqlManager(object):
def__init__(self):
mysql_connection_string=esconfig.mysql.get("mysql_connection_string")
self.engine=create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8',poolclass=QueuePool,
pool_recycle=3600)
#self.DB_Session=sessionmaker(bind=self.engine)
#self.session=self.DB_Session()
self.DB_Session=sessionmaker(bind=self.engine,autocommit=False,autoflush=True,expire_on_commit=False)
self.db=scoped_session(self.DB_Session)
self.session=self.db()
@find
defselect_all_dict(self,sql,keys):
a=self.session.execute(sql)
a=a.fetchall()
lists=[]
foriina:
iflen(keys)==len(i):
data_dict={}
fork,vinzip(keys,i):
data_dict[k]=v
lists.append(data_dict)
else:
returnFalse
returnlists
#关闭
defclose(self):
self.session.close()
aa.sql:
select CONVERT(c.`id`,CHAR)asid, c.`code`ascode, c.`project_name`asproject_name, c.`name`asname, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')asupdate_time, from`cc`c wheredate_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
bb.sql:
select CONVERT(c.`id`,CHAR)asid, CONVERT(c.`age`,CHAR)asage, c.`code`ascode, c.`name`asname, c.`project_name`asproject_name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')asupdate_time, from`bb`c wheredate_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
#-*-coding:utf-8-*-
#__author__="ZJL"
#sql文件名与es中的type名一致
mysql={
#mysql连接信息
"mysql_connection_string":"root:123456@127.0.0.1:3306/xxx",
#sql文件信息
"statement_filespath":[
#sql对应的es索引和es类型
{
"index":"a1",
"sqlfile":"aa.sql",
"type":"aa"
},
{
"index":"a1",
"sqlfile":"bb.sql",
"type":"bb"
},
],
}
#es的ip和端口
elasticsearch={
"hosts":"127.0.0.1:9200",
}
#字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识
db_field={
"aa":
("id",
"code",
"name",
"project_name",
"update_time",
),
"bb":
("id",
"code",
"age",
"project_name",
"name",
"update_time",
),
}
es_config={
#间隔多少秒同步一次
"sleep_time":10,
#为了解决服务器之间时间差问题
"time_difference":3,
#show_json用来展示导入的json格式数据,
"show_json":False,
}
mstes.py:
#-*-coding:utf-8-*-
#__author__="ZJL"
fromsql_manageimportMysqlManager
fromesconfigimportmysql,elasticsearch,db_field,es_config
fromelasticsearchimportElasticsearch
fromelasticsearchimporthelpers
importtraceback
importtime
classTongBu(object):
def__init__(self):
try:
#是否展示json数据在控制台
self.show_json=es_config.get("show_json")
#间隔多少秒同步一次
self.sleep_time=es_config.get("sleep_time")
#为了解决同步时数据更新产生的误差
self.time_difference=es_config.get("time_difference")
#当前时间,留有后用
self.datetime_now=""
#es的ip和端口
es_host=elasticsearch.get("hosts")
#连接es
self.es=Elasticsearch(es_host)
#连接mysql
self.mm=MysqlManager()
except:
print(traceback.format_exc())
deftongbu_es_mm(self):
try:
#同步开始时间
start_time=time.time()
print("start..............",time.strftime("%Y-%m-%d%H:%M:%S",time.localtime(start_time)))
#这个list用于批量插入es
actions=[]
#获得所有sql文件list
statement_filespath=mysql.get("statement_filespath",[])
ifself.datetime_now:
#当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化
#sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T
self.datetime_now=time.strftime("%Y-%m-%dT%H:%M:%S",time.localtime(time.time()-(self.sleep_time+self.time_difference)))
else:
self.datetime_now="1999-01-01T00:00:00"
ifstatement_filespath:
forfilepathinstatement_filespath:
#sql文件
sqlfile=filepath.get("sqlfile")
#es的索引
es_index=filepath.get("index")
#es的type
es_type=filepath.get("type")
#读取sql文件内容
withopen(sqlfile,"r")asopf:
sqldatas=opf.read()
#::datetime_now是一个自定义的特殊字符串用于增量更新
if"::datetime_now"insqldatas:
sqldatas=sqldatas.replace("::datetime_now",self.datetime_now)
else:
sqldatas=sqldatas
#es和sql字段的映射
dict_set=db_field.get(es_type)
#访问mysql,得到一个list,元素都是字典,键是字段名,值是数据
db_data_list=self.mm.select_all_dict(sqldatas,dict_set)
ifdb_data_list:
#将数据拼装成es的格式
fordb_dataindb_data_list:
action={
"_index":es_index,
"_type":es_type,
"@timestamp":time.strftime("%Y-%m-%dT%H:%M:%S",time.localtime(time.time())),
"_source":db_data
}
#如果没有id字段就自动生成
es_id=db_data.get("id","")
ifes_id:
action["_id"]=es_id
#是否显示json再终端
ifself.show_json:
print(action)
#将拼装好的数据放进list中
actions.append(action)
#list不为空就批量插入数据到es中
iflen(actions)>0:
helpers.bulk(self.es,actions)
exceptExceptionase:
print(traceback.format_exc())
else:
end_time=time.time()
print("end...................",time.strftime("%Y-%m-%d%H:%M:%S",time.localtime(start_time)))
self.time_difference=end_time-start_time
finally:
#报错就关闭数据库
self.mm.close()
defmain():
tb=TongBu()
#间隔多少秒同步一次
sleep_time=tb.sleep_time
#死循环执行导入数据,加上时间间隔
whileTrue:
tb.tongbu_es_mm()
time.sleep(sleep_time)
if__name__=='__main__':
main()
以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。