python3实现从kafka获取数据,并解析为json格式,写入到mysql中
项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中。
配置信息:
[Global] kafka_server=xxxxxxxxxxx:9092 kafka_topic=mes consumer_group=test100 passwd=tracking port=3306 host=xxxxxxxxxx user=track schema=track dd_socket= dd_host=xxxxxxxxxxxx dd_port=3306 dd_user=xxxxxxxxx dd_passwd=xxxxxxxx
代码又长又丑,半吊子,只完成了面向过程的编程,没做到对象,将就看,有问题可以联系我
代码:
#encoding=utf-8 importdatetime importconfigparser importre importpymysql fromvertica_pythonimportconnect importvertica_python importjson fromconfluent_kafkaimportConsumer,KafkaError importcsv importlogging importos importtime importsignal importsys #写日志 logging.basicConfig(filename=os.path.join(os.getcwd(),'log_tracking.txt'),level=logging.WARN,filemode='a',format='%(asctime)s-%(levelname)s:%(message)s') defwriteErrorLog(errSrc,errType,errMsg): try: v_log_file='err_tracking.log'; v_file=open(v_log_file,'a') v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d%H:%M:%S")+"-"+errSrc+"-"+errType+":"+errMsg+'\n') v_file.flush() exceptExceptionasdata: v_err_file=open('err_tracking.log','a') v_err_file.write(str(data)+'\n') v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d%H:%M:%S")+"-"+errSrc+"-"+errType+":"+errMsg+'\n') v_err_file.flush() v_err_file.close() finally: v_file.close() classRH_Consumer: #读取配置文件的配置信息,并初始化一些类需要的变量 def__init__(self): self.config=configparser.ConfigParser() self.config.read('config.ini') self.host=self.config.get('Global','host') self.user=self.config.get('Global','user') self.passwd=self.config.get('Global','passwd') self.schema=self.config.get('Global','schema') self.port=int(self.config.get('Global','port')) self.kafka_server=self.config.get('Global','kafka_server') self.kafka_topic=self.config.get('Global','kafka_topic') self.consumer_group=self.config.get('Global','consumer_group') self.dd_host=self.config.get('Global','dd_host') self.dd_user=self.config.get('Global','dd_user') self.dd_passwd=self.config.get('Global','dd_passwd') self.dd_port=int(self.config.get('Global','dd_port')) self.dd_socket=self.config.get('Global','dd_socket') self.operation_time=datetime.datetime.now() self.stop_flag=0 self.src_table_name=[] self.__init_db() self.__init_mes_db() self._get_all_src_table() #连接写入目标数据库 def__init_db(self): try: self.conn_info={'host':self.host,'port':self.port,'user':self.user,'password':self.passwd,'db':'tracking'} self.mysql_db=pymysql.connect(**self.conn_info,charset="utf8") self.mysql_cur=self.mysql_db.cursor() exceptExceptionasdata: writeErrorLog('__init_db','Error',str(data)) #连接生产数据库,用于获取相关维度信息 def__init_mes_db(self): try: self.mes_mysql_db=pymysql.connect(host=self.dd_host,user=self.dd_user,passwd=self.dd_passwd,port=self.dd_port,unix_socket=self.dd_socket,charset="utf8") self.mes_mysql_cur=self.mes_mysql_db.cursor() exceptExceptionasdata: writeErrorLog('__init_db','Error',str(data)) #关闭数据库 def_release_db(self): self.mysql_cur.close() self.mysql_db.close() self.mes_mysql_cur.close() self.mes_mysql_db.close() #获取所有的配置表信息(需要获取的表) def_get_all_src_table(self): try: #获取table的信息 select_src_table_names="selectdistinctsrc_table_namefromtracking.tracking_table_mapping_rule" self.mysql_cur.execute(select_src_table_names) rows=self.mysql_cur.fetchall() foriteminrows: self.src_table_name.append(item[0]) returnself.src_table_name exceptExceptionasdata: writeErrorLog('_get_all_src_table','Error',str(data)) logging.error('_get_all_src_table:'+str(data)) #获取src表的目标表信息 def_get_tgt_table_name(self,table_name,table_schema): try: #获取table的信息(table_name是schema|tablename) select_tgt_table_names="selectdistincttgt_table_namefromtracking.tracking_table_mapping_rulewheresrc_table_name='%s'andsrc_table_schema='%s'"%(table_name,table_schema) self.mysql_cur.execute(select_tgt_table_names) rows=self.mysql_cur.fetchall() tgt_table_names=[] foriteminrows: tgt_table_names.append(item[0]) returntgt_table_names exceptExceptionasdata: writeErrorLog('_get_tgt_table_name','Error',str(data)) logging.error('_get_tgt_table_name:'+str(data)) #根据获取到输入的table_name,读取表的配置信息会以json格式返回获取到的数据 def_get_config(self,table_name,tgt_table_name,table_schema): try: #获取table的信息(table_name是schema|tablename) select_table_config="selectcoalesce(src_system,'')assrc_system,coalesce(src_table_schema,'')assrc_table_schema,coalesce(src_table_name,'')assrc_table_name,coalesce(tgt_operation,'{}')astgt_operation,active_flag,coalesce(tgt_system,'')astgt_system,coalesce(tgt_table_schema,'')astgt_table_schema,coalesce(tgt_table_name,'')astgt_table_namefromtracking.tracking_table_mapping_rulewheresrc_table_name='%s'andtgt_table_name='%s'andsrc_table_schema='%s'"%(table_name,tgt_table_name,table_schema) self.mysql_cur.execute(select_table_config) rows=self.mysql_cur.fetchall() foriteminrows: self.src_system=item[0] self.src_table_schema=item[1] self.src_table_name=item[2] self.tgt_operation=item[3] self.active_flag=item[4] self.tgt_system=item[5] self.tgt_table_schema=item[6] self.tgt_table_name=item[7] #解析出self.tgt_operation中以后所需要的数据 self.tgt_operation=eval(self.tgt_operation) result_data={'src_system':self.src_system, 'src_table_schema':self.src_table_schema, 'src_table_name':self.src_table_name, 'tgt_operation':self.tgt_operation, 'active_flag':self.active_flag, 'tgt_system':self.tgt_system, 'tgt_table_schema':self.tgt_table_schema, 'tgt_table_name':self.tgt_table_name, #解析出来的self.tgt_operation里的信息 'source_primary_key':self.tgt_operation['source_primary_key'], 'source_all_column':self.tgt_operation['source_all_column'], 'target_primary_key':self.tgt_operation['target_primary_key'], 'target_column':self.tgt_operation['target_column'], 'source_level':self.tgt_operation['source_level']} returnresult_data exceptExceptionasdata: writeErrorLog('_get_config','Error',str(data)+':tableisnotavailable') logging.error('_get_config:'+str(data)) #主方法的入口 def_do(self): try: #配置consumer的信息,可以配置很多其他信息 c=Consumer({ 'bootstrap.servers':self.kafka_server, 'group.id':self.consumer_group, 'default.topic.config':{ 'auto.offset.reset':'smallest', 'enable.auto.commit':False} }) #定义消费kafka中的主题 c.subscribe([self.kafka_topic]) whileTrue: msg=c.poll(1.0) ifmsgisNone: continue ifmsg.error(): ifmsg.error().code()==KafkaError._PARTITION_EOF: continue else: print(msg.error()) break text=msg.value().decode(encoding="utf-8") #kfk_text=eval(text) kfk_text=json.loads(text) #此处判断kfk数据是否在配置表中,如果在则进行下一步,如果不在则忽略 #添加异常处理目的是为了如果这条数据写入有问题,就不commit,方便下次处理还可以继续消费 try: kfk_table=kfk_text['table'] ifkfk_tablein['order_mails']: print(type(text),text) logging.warning('--------------startexectabletime:'+str(time.strftime("%Y-%m-%d%H:%M:%S",time.localtime()))+'---------------------') kfk_text=str(kfk_text) kfk_text=kfk_text.replace(":None",":''") kfk_text=eval(kfk_text) kfk_datas=kfk_text['data'] kfk_type=kfk_text['type'] kfk_old=kfk_text['old'] logging.warning('table_name:'+str(kfk_table)+'table_type:'+kfk_type) ifkfk_type=='UPDATE': continue print('update') fori,datainenumerate(kfk_datas): kfk_text['data']=eval("["+str(data)+"]") kfk_text['old']=eval("["+str(kfk_old[i])+"]") self._get_rh_from_kafka(kfk_text) else: print('insert') fordatainkfk_datas: kfk_text['data']=eval("["+str(data)+"]") print(type(kfk_text),kfk_text) self._get_rh_from_kafka(kfk_text) logging.warning('----------------endexectabletime:'+str(time.strftime("%Y-%m-%d%H:%M:%S",time.localtime()))+'---------------') c.commit() exceptExceptionasdata: writeErrorLog('_do','excedataError',str(data)) logging.error('_do:'+str(data)) #如果停止程序 ifself.stop_flag==1: self._exit_consumer() c.close() exceptExceptionasdata: print(data) writeErrorLog('_do','Error',str(data)) logging.error('_do:'+str(data)) def_trans_path(self,tgt_path): new_tgt_path=tgt_path.replace('.','\".\"').replace('$\".','$.')+'\"' returnnew_tgt_path #此方法用来获取kafka中的数据, def_get_rh_from_kafka(self,kfk_text): try: #解析获取到的kfk中的数据流 self.kfk_tb_schema=kfk_text["database"]#schema self.kfk_tb_name=kfk_text["table"]#table_name self.kfk_data=kfk_text['data'][0]#data self.kfk_type=kfk_text['type']#数据类型type self.kfk_es=time.strftime("%Y-%m-%d%H:%M:%S",time.localtime(float(kfk_text['es']/1000)))#数据表更的时间 #获取kfk传递过来src表的配置信息,读取配置表信息-----可能为空需要添加判断 tgt_table_names=self._get_tgt_table_name(self.kfk_tb_name,self.kfk_tb_schema) iflen(tgt_table_names)!=0: fortgt_table_name_for_configintgt_table_names: tb_config=self._get_config(self.kfk_tb_name,tgt_table_name_for_config,self.kfk_tb_schema) tgt_pk_key=tb_config['target_primary_key']#目标表的主键(order_no/order_item_id) tgt_schema=tb_config['tgt_table_schema']#目标表的schema tgt_table_name=tb_config['tgt_table_name']#目标表的名称(目前只有两个目标表tracking_order,tracking_order_item) src_table_name=tb_config['src_table_name']#源表的名称(schema|table_name) src_table_schema=tb_config['src_table_schema'] tgt_columns=tb_config['target_column']#获取插入到目标表中字段的配置信息(例如该表在order_info的插入路径等配置信息) src_level=tb_config['source_level']#源表的level,目前有三种root,leaf,father src_pk_key=tb_config['source_primary_key']#源表的主键 src_pk_value=self.kfk_data[src_pk_key]#源表的主键值(从kfk中获取到) tgt_operation=tb_config['tgt_operation']#源表的其他配置,在下面处理时候再进行解析 #处理的逻辑是,将表类型分为三类,root,leaf,father分开处理,分别处理其insert,update和delete的操作 ifself.kfk_type=='INSERT':#判断kfk的操作类型是INSERT,UPDATE,DELETE ifsrc_level=='root':#判断该数据是否是root表 tgt_pk_value=self.kfk_data[tgt_pk_key]#如果是root表,则获取目标表表的主键的值(和src_pk_value的值相同) foritemintgt_columns:#item取值范围:order_info、order_progress等循环插入列,按照配置分别写入,因为可能一张表在不同列中插入位置不同 tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"#拼成如下形式,目的为了_get_data_from_kfk传入参数,例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#将字符串转换成dict类型 ifstr(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+"isnull,pleasecheck") else: tgt_path=list(tgt_columns[item]['target_path'].values())[0]#表在配置中,写入目标表的路径 (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)=self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,tgt_pk_value)#调用方法,返回三种格式的json,为了不同的写入方式传参 #调用将kfk中数据入库的方法 self._insert_data_from_kfk_for_root(tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,item,table_insert_data,tgt_path)#将kfk中主数据写入数据库 self._insert_father_data(src_table_schema,src_table_name,insert_data,tgt_path,tgt_pk_value,item,catalog_type,tgt_table_name_for_config)#将主数据涉及到父表写入 #子表insert思路:通过配置表找到上层关联表的键值,通过键值到数据库中查找到子表属于的记录(order_no/order_item_id)的值,从而可以确认子表的写入的绝对路径(拼上表名称或者是拼上键对应值),然后按照路径写入,补全父表 elifsrc_level=='leaf':#判断kfk的操作类型是INSERT,UPDATE,DELETE parent_pk_info=tgt_operation['parent_pk_key'] foritemintgt_columns:#item取值范围:order_info、order_progress、order_operation、order_adjudgement tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"##拼成如下形式,目的为了_get_data_from_kfk传入参数,例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#将字符串转换成dict类型 ifstr(tgt_columns[item]['target_path'])=='{}':#因为子节点可能不会每一列都会配置写入信息(这个是不是不判断也可以,只要不配置即可,如果判断,root中也需要判断吗?) logging.warning(str(item)+"isnull,pleasecheck") else: tgt_path=list(tgt_columns[item]['target_path'].keys())[0]#获取写入的路径 (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)=self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,src_pk_value)##调用方法,返回三种格式的json,为了不同的写入方式传参 (parent_tgt_path,tgt_pk_value_new)=self._get_tgt_info_for_leaf(item,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#获取子节点表的需要写入的目标表的主键的值和上一层的写入真实绝对路径 tgt_path_true=parent_tgt_path+"."+src_table_name#获取子表写入的绝对路径(一直到子表的表名的路径) self._insert_data_from_kfk_for_leaf(tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value_new,item,table_insert_data_for_leaf,tgt_path_true,src_pk_value,insert_data)#将从kafka获取的数据入库 tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'#获取子表写入的绝对路径(一直到子表的主键值的路径) self._insert_father_data(src_table_name,insert_data,tgt_path_new,tgt_pk_value_new,item,catalog_type,tgt_table_name_for_config)#递归,写入子表的父表信息 elifsrc_level=='father':#针对父表数据在主表和子表数据之后产生的情况 foritemintgt_columns:#item取值范围:order_info、order_progress、order_operation、order_adjudgement tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"#tgt_column例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#拼接目标列和目标列的值的信息 ifstr(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+"isnull,pleasecheck") else: tgt_paths=list(tgt_columns[item]['target_path'].values()) (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)=self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,src_pk_value)#从kafka获取的需要插入的json串 if'product'insrc_table_name.lower(): catalog_type='PRODUCT' elif'service'insrc_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' fortgt_pathintgt_paths: tgt_info_for_father=self._get_tgt_info_for_father(tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,item,catalog_type) iflen(tgt_info_for_father)==0: logging.warning('cannotavailablethedataoftherootandleaftable') else: foriinrange(len(tgt_info_for_father)): tgt_pk_value_new=tgt_info_for_father[i][0] tgt_path_new=('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._insert_data_from_db(tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value_new,item,tgt_path_new,insert_data) self._insert_father_data(src_table_name,insert_data,tgt_path_new,tgt_pk_value_new,item,catalog_type,tgt_table_name_for_config) elifself.kfk_type=='UPDATE':#update处理方式 #主表update思路:找到更新的记录,将需要更新的字段按照配置的路径更新(主表的路径不存在多层),再补全父表,写入历史纪录 ifsrc_level=='root':#判断是否是root表 tgt_pk_value=self.kfk_data[tgt_pk_key]##如果是root表,则获取目标表表的主键的值(和src_pk_value的值相同) foritemintgt_columns:#item取值范围:order_info、order_progress、order_operation、order_adjudgement tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"#tgt_column例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#拼接目标列和目标列的值的信息 ifstr(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+"isnull,pleasecheck") else: update_columns=kfk_text['old'][0]#获取kfk中变更信息 tgt_path=list(tgt_columns[item]['target_path'].values())[0] (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)=self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,tgt_pk_value) self._update_data(tgt_schema,tgt_table_name,tgt_pk_key,src_table_name,update_columns,insert_data,tgt_path,tgt_pk_value,item,catalog_type,tgt_table_name_for_config,src_table_schema)#更新数据 #将变更历史写入 if'alter_column'inlist(tgt_columns[item].keys()): record_history_column=tgt_columns[item]['alter_column'] self._insert_history_data(update_columns,insert_data,tgt_path,record_history_column,self.kfk_es,item,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value) else: logging.warning(str(item)+"alter_columnisnotavailable") #子表update思路:通过配置表找到上层关联表的键值,通过键值到数据库中查找到子表属于的记录(order_no/order_item_id)的值,从而可以确认子表的写入的绝对路径(拼上表名称或者是拼上键对应值),然后按照路径更新对饮的字段,补全父表 elifsrc_level=='leaf':##判断是否是root表 parent_pk_info=tgt_operation['parent_pk_key'] foritemintgt_columns:#item取值范围:order_info、order_progress、order_operation、order_adjudgement tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"#tgt_column例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#拼接目标列和目标列的值的信息 ifstr(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+"isnull,pleasecheck") else: update_columns=kfk_text['old'][0]#获取到变更信息 tgt_path=list(tgt_columns[item]['target_path'].keys())[0] (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)=self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,src_pk_value)#从kafka获取的需要插入的json串 (parent_tgt_path,tgt_pk_value_new)=self._get_tgt_info_for_leaf(item,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#获取子表上一层主键路径 tgt_path_true=parent_tgt_path+"."+src_table_name##获取子表写入的绝对路径(一直到子表的表名的路径) tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'#获取子表写入的绝对路径(一直到子表的主键值) self._update_data(tgt_schema,tgt_table_name,tgt_pk_key,src_table_name,update_columns,insert_data,tgt_path_new,tgt_pk_value_new,item,catalog_type,tgt_table_name_for_config,src_table_schema) if'alter_column'inlist(tgt_columns[item].keys()): record_history_column=tgt_columns[item]['alter_column'] self._insert_history_data(update_columns,insert_data,tgt_path_new,record_history_column,self.kfk_es,item,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value_new) else: logging.warning(str(item)+"alter_columnisnotavailable") #父表更新的思路:从配置表获取所有目标路径,循环每一个路径,通过模糊匹配找到所有的目标主键值及准确路径,然后一条条更新,并将涉及的下一级信息补全 elifsrc_level=='father':#判断该数据是否是kfk入库信息如果不是就pass foritemintgt_columns:#item取值范围:order_info、order_progress、order_operation、order_adjudgement tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"#tgt_column例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#拼接目标列和目标列的值的信息 ifstr(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+"isnull,pleasecheck") else: update_columns=kfk_text['old'][0]#获取到变更信息 tgt_paths=list(tgt_columns[item]['target_path'].values()) (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)=self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,src_pk_value)#从kafka获取的需要插入的json串 if'product'insrc_table_name.lower(): catalog_type='PRODUCT' elif'service'insrc_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' fortgt_pathintgt_paths: tgt_info_for_father=self._get_tgt_info_for_father(tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,item,catalog_type) foriinrange(len(tgt_info_for_father)): tgt_pk_value_new=tgt_info_for_father[i][0] tgt_path_new=('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._update_data(tgt_schema,tgt_table_name,tgt_pk_key,src_table_name,update_columns,insert_data,tgt_path_new,tgt_pk_value_new,item,catalog_type,tgt_table_name_for_config,src_table_schema) #删除操作思路:root表直接删除所有的记录,leaf删除按照路径删除目标,再加上判断如果子节点中没有数据,将对应的表名的字段删除 elifself.kfk_type=='DELETE': ifsrc_level=='root': tgt_pk_value=self.kfk_data[tgt_pk_key] self._delete_data_for_root(tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name) elifsrc_level=='leaf':# parent_pk_info=tgt_operation['parent_pk_key'] foritemintgt_columns:#item取值范围:order_info、order_progress、order_operation、order_adjudgement tgt_column="{'"+item+"'"+":"+str(tgt_columns[item]['source_column'])+"}"#tgt_column例如{"order_info":["order_no","cust_no"]} tgt_column=eval(tgt_column)#拼接目标列和目标列的值的信息 ifstr(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+"isnull,pleasecheck") else: tgt_path=list(tgt_columns[item]['target_path'].keys())[0] (parent_tgt_path,tgt_pk_value_new)=self._get_tgt_info_for_leaf(item,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#获取子表上一层主键路径 tgt_path_true=parent_tgt_path+"."+src_table_name#获取子表上一层表的路径 tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"' self._delete_data_for_leaf(tgt_schema,tgt_table_name,item,tgt_path_new,tgt_pk_key,tgt_pk_value_new,tgt_path_true) exceptExceptionasdata: writeErrorLog('_get_rh_from_kafka','Error',str(data)) logging.error('_get_rh_from_kafka:'+str(data)) def_get_tgt_info_for_father(self,tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,tgt_column,catalog_type): try: tgt_path_true=tgt_path+"."+src_pk_key ifcatalog_type=='0': select_sql_for_father="select"+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\')from"+tgt_schema+"."+tgt_table_name+"wherejson_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\';" else: select_sql_for_father="select"+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\')from"+tgt_schema+"."+tgt_table_name+"wherejson_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\'andjson_extract("+tgt_column+",\'$."+tgt_table_name+".type=\'"+catalog_type+"\';" self.mysql_cur.execute(select_sql_for_father) tgt_info_for_father=self.mysql_cur.fetchall() returntgt_info_for_father exceptExceptionasdata: writeErrorLog('_get_tgt_info_for_father','Error',str(data)) logging.error('_get_tgt_info_for_father:'+str(data)) def_delete_data_for_root(self,tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name): try: delete_sql="deletefrom"+tgt_schema+"."+tgt_table_name+"where"+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() exceptExceptionasdata: writeErrorLog('_delete_data_for_root','Error',str(data)) logging.error('_delete_data_for_root:'+str(data)) def_delete_data_for_leaf(self,tgt_schema,tgt_table_name,tgt_column,tgt_path,tgt_pk_key,tgt_pk_value,tgt_path_true): try: delete_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path+"\')where"+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() select_sql="selectjson_extract("+tgt_column+",\'"+tgt_path_true+"\')from"+tgt_schema+"."+tgt_table_name+"where"+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(select_sql) tgt_column_value=self.mysql_cur.fetchall()[0][0] iftgt_column_value==r'{}': table_delete_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path_true+"\')where"+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(table_delete_sql) self.mysql_db.commit() exceptExceptionasdata: writeErrorLog('_delete_data_for_leaf','Error',str(data)) logging.error('_delete_data_for_leaf:'+str(data)) def_insert_history_data(self,update_columns,insert_data,tgt_path,record_history_column,data_time,tgt_column,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value): try: update_columns_key=list(update_columns.keys()) foriteminrecord_history_column: ifiteminupdate_columns_key: tgt_path_for_column=tgt_path+'.alter_data.'+item tgt_path_for_alter=tgt_path+'.alter_data' select_sql_for_alter_column_path='selectjson_extract('+tgt_column+',\''+tgt_path_for_column+'\')'+'from'+tgt_schema+'.'+tgt_table_name+'where'+tgt_pk_key+'=\''+str(tgt_pk_value)+'\';' select_sql_for_alter_path='selectjson_extract('+tgt_column+',\''+tgt_path_for_alter+'\')'+'from'+tgt_schema+'.'+tgt_table_name+'where'+tgt_pk_key+'=\''+str(tgt_pk_value)+'\';' self.mysql_cur.execute(select_sql_for_alter_column_path) tgt_path_vlaue_for_column=self.mysql_cur.fetchall() self.mysql_cur.execute(select_sql_for_alter_path) tgt_path_vlaue_for_alter=self.mysql_cur.fetchall() old_data=update_columns[item] new_data=eval(insert_data)[item] iftgt_path_vlaue_for_alter[0][0]==None: history_data='{\"'+item+'\":[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]}' insert_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_insert("+tgt_column+",\'"+tgt_path_for_alter+"\',cast(\'"+history_data+"\'asjson))where"+tgt_pk_key+"='"+str(tgt_pk_value)+"';" else: iftgt_path_vlaue_for_column[0][0]==None: history_data='[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]' insert_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_insert("+tgt_column+",\'"+tgt_path_for_column+"\',cast(\'"+history_data+"\'asjson))where"+tgt_pk_key+"='"+str(tgt_pk_value)+"';" else: history_data='{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}' insert_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_array_append("+tgt_column+",\'"+tgt_path_for_column+"\',cast(\'"+history_data+"\'asjson))where"+tgt_pk_key+"='"+str(tgt_pk_value)+"';" self.mysql_cur.execute(insert_sql) self.mysql_db.commit() exceptExceptionasdata: writeErrorLog('_insert_history_data','Error',str(data)) logging.error('_insert_history_data:'+str(data)) #将kfk中的数据,进行转换,转换成不同的写入方式需要的json格式 def_get_data_from_kfk(self,text,tgt_column,src_table_name,src_table_pk_value): try: tgt_column_json=tgt_column#传入的目标表的列名称 tgt_column_key='' forkeyintgt_column_json:#循环tgt_column中的key值 json_column_key='{' foritemintgt_column_json[key]: json_column_key+='"'+item+'":"'+text['data'][0][item].replace('"',r'\\"')+'",' tgt_column_item=json_column_key[:-1] tgt_column_key+=tgt_column_item+'},' if'type'intext['data'][0]: catalog_type=text['data'][0]['type'] else: catalog_type='0' table_insert_data='{\"'+src_table_name+'\":'+tgt_column_key[:-1]+'}'#拼接成如下带有表名和主键值格式{"order":{"order_no":"100"}} insert_data=tgt_column_key[:-1]#拼接成如下不带表名和不带主键值的格式{"order_no":"100"} table_insert_data_for_leaf='{\"'+src_table_pk_value+'\":'+insert_data+'}'#拼接成如下带有主键值格式的{"100":{"order_no":"100"}} print(insert_data) return(table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type)#返回数据 exceptExceptionasdata: writeErrorLog('_get_data_from_kfk','Error',str(data)) logging.error('_get_data_from_kfk:'+str(data)) def_insert_data_from_kfk_for_root(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data,tgt_path): try: #先判断主键是否存在,如果存在则插入其他数据,如果不存在,则先插入主键信息 select_tb_count='selectcount(*)from'+tgt_schema+"."+tgt_table_name+'where'+tgt_table_pk+'=\''+tgt_table_value+'\';' #判断列中是否存在数据 select_tb_column_count='selectcasewhencoalesce('+tgt_column+',\'\')=\'\'then1else0endfrom'+tgt_schema+"."+tgt_table_name+'where'+tgt_table_pk+'=\''+tgt_table_value+'\';' self.mysql_cur.execute(select_tb_count) tb_count=self.mysql_cur.fetchall() self.mysql_cur.execute(select_tb_column_count) tb_column_count=self.mysql_cur.fetchall() #判断是否存在数据,如果不存在,则先插入主键(order_no/order_item_id)再将数据写入到列中 iftb_count[0][0]==0: insert_pk_sql="insertinto"+tgt_schema+"."+tgt_table_name+"("+tgt_table_pk+")values('"+tgt_table_value+"')" self.mysql_cur.execute(insert_pk_sql) self.mysql_db.commit() update_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=cast('"+table_insert_data+"'asjson)where"+tgt_table_pk+"='"+tgt_table_value+"';" else: #如果主键存在,列为空,则需要直接写入带有table_name格式的json iftb_column_count[0][0]==1:#当目标字段为空 update_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=cast('"+table_insert_data+"'asjson)where"+tgt_table_pk+"='"+tgt_table_value+"';" else: #如果主键存在,列不为空,则需要使用json_insert方法写入带有table_name格式的json update_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_insert("+tgt_column+",\'"+tgt_path+"\',cast(\'"+table_insert_data+"\'asjson))where"+tgt_table_pk+"=\'"+tgt_table_value+"\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() exceptExceptionasdata: writeErrorLog('_insert_data_from_kfk_for_root','Error',str(data)) logging.error('_insert_data_from_kfk_for_root:'+str(data)) def_get_tgt_pk_value_for_leaf(self,tgt_table_pk,tgt_schema,tgt_table_name,tgt_column,tgt_path,parent_pk_value): try: select_tgt_pk_sql="select"+tgt_table_pk+"from"+tgt_schema+"."+tgt_table_name+"wherejson_extract("+tgt_column+",\'"+tgt_path+"\')=\'"+parent_pk_value+"\';" self.mysql_cur.execute(select_tgt_pk_sql) tgt_pk_value=self.mysql_cur.fetchall()[0][0] returntgt_pk_value exceptExceptionasdata: writeErrorLog('_get_tgt_pk_value_for_leaf','Error',str(data)) logging.error('_get_tgt_pk_value_for_leaf:'+str(data)) #获取子节点表的需要写入的目标表的主键的值和上一层的写入真实绝对路径 def_get_tgt_info_for_leaf(self,tgt_column,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,kafka_data): try: if_tgt_path='.'.join(tgt_path.split('.')[:-1]) i=0 json_search_sql='' where_sql='' ifif_tgt_path=='$': forparent_pk_keyinlist(parent_pk_info.keys()): parent_pk_value=kafka_data[parent_pk_info[parent_pk_key]] json_search_sql+=",json_search("+tgt_column+",'one','"+str(parent_pk_value)+"',null,'"+tgt_path+"."+parent_pk_key+"')astgt_path"+str(i) where_sql+="tgt_path"+str(i)+"isnotnulland" i=i+1 else: forparent_pk_keyinlist(parent_pk_info.keys()): parent_pk_value=kafka_data[parent_pk_info[parent_pk_key]] json_search_sql+=",json_search("+tgt_column+",'one','"+str(parent_pk_value)+"',null,'"+tgt_path+".*."+parent_pk_key+"')astgt_path"+str(i) where_sql+="tgt_path"+str(i)+"isnotnulland" i=i+1 select_sql="select"+tgt_pk_key+",tgt_path0from(select"+tgt_pk_key+json_search_sql+"from"+tgt_schema+"."+tgt_table_name+")twhere"+where_sql[:-4]+";" self.mysql_cur.execute(select_sql) rows=self.mysql_cur.fetchall()[0] tgt_path_new=('.'.join(rows[1].split('.')[:-1]))[1:] tgt_pk_value_new=rows[0] return(tgt_path_new,tgt_pk_value_new) exceptExceptionasdata: writeErrorLog('_get_tgt_info_for_leaf','Error',str(data)) logging.error('_get_tgt_info_for_leaf:'+str(data)) def_insert_data_from_kfk_for_leaf(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data_for_leaf,tgt_path,src_pk_value,insert_data): try: select_tb_column_key='selectcasewhencoalesce(json_extract('+tgt_column+',\''+tgt_path+'\'),\'\')=\'\'then1else0endfrom'+tgt_schema+"."+tgt_table_name+'where'+tgt_table_pk+'=\''+str(tgt_table_value)+'\';' self.mysql_cur.execute(select_tb_column_key) column_key_data=self.mysql_cur.fetchall() ifcolumn_key_data[0][0]==1:#当主键存在并且目标字段不为空路径不存在, tgt_path_new=tgt_path tgt_insert_data=table_insert_data_for_leaf else: tgt_path_new=tgt_path+r'.\"'+str(src_pk_value)+r'\"' tgt_insert_data=insert_data update_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_insert("+tgt_column+",\'"+tgt_path_new+"\',cast(\'"+tgt_insert_data+"\'asjson))where"+tgt_table_pk+"=\'"+str(tgt_table_value)+"\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() exceptExceptionasdata: writeErrorLog('_insert_data_from_kfk_for_leaf','Error',str(data)) logging.error('_insert_data_from_kfk_for_leaf:'+str(data)) #将父表数据写入(父表数据从生产库中获取,按照对应的配置路径写入数据库中) def_insert_father_data(self,src_table_schema,scr_table_name,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config): try: src_config_data=self._get_config(scr_table_name,tgt_table_name_for_config,src_table_schema)#获取初始表的配置信息(此处获取是为了递归时候传入下一层的表名,获取对应的配置信息) src_foreign_info=src_config_data['target_column'][tgt_column]['source_foreign_info']#从数据库配置表中获取source_foreign_info的信息,也就是外键的信息,包括外键,外键的表,以及外键表中的主键名称 iflen(json.dumps(src_foreign_info))==2:#当没有外键的时候,配置表只存在‘{}'长度为2,就不需要向下递归执行,对应的source_foreign_info=[],长度为2 logging.warning(scr_table_name+":Recursiveover") else: forsrc_pk_keyinsrc_foreign_info:#获取当前表与下层父表的关联键(例如customer表的配置获取到org_id,"source_foreign_info":{"org_id":{"customer.organization":"org_id"}}) foreign_table_name_tmp=list(src_foreign_info[src_pk_key].keys())[0]#获取外键对应的表名foreign_table_name(organization),(每次传入的key对应一个外键表,只存在一个列,order_info,所以取第一个元素即可) foreign_table_schema=foreign_table_name_tmp.split('.')[0] foreign_table_name_tmp=foreign_table_name_tmp.split('.')[1] if'#'inforeign_table_name_tmp: foreign_table_name=foreign_table_name_tmp.replace('#',catalog_type).lower() else: foreign_table_name=foreign_table_name_tmp foreign_table_pk_key=list(src_foreign_info[src_pk_key].values())[0]#获取外键对应的表的关联键foreign_table_key,即org_id foreign_datas=self._get_config(foreign_table_name,tgt_table_name_for_config,foreign_table_schema)#获取外键表的配置信息,以便下面获取配置表的信息 foreign_column=foreign_datas['target_column'][tgt_column]#获取要插入的目标表列是order_info/order_progress)(organization写入目标表的列的配置信息) foreign_schema=foreign_datas['src_table_schema']#获取表的schema(organization的原始srcschema) foreign_table_pk_value=eval(str(insert_data))[src_pk_key]#获取外键对应的value(即organization在kfk数据中对应的值) #获取外键对应表的配置信息(写入数据库需要用) tgt_schema=foreign_datas['tgt_table_schema'] tgt_table_name=foreign_datas['tgt_table_name'] tgt_pk_key=foreign_datas['target_primary_key'] tgt_pk_value=root_pk_value#目标表主键的值 #获取数据,并在其中获取后,写入数据库(此处部分参数是为了给insert服务) forforeign_pathinforeign_column['target_path']: src_tgt_path=list(foreign_path.keys())[0] foreign_tgt_path=list(foreign_path.values())[0] ifre.sub('.\"\S*?\"',r'*',src_path)==src_tgt_pathandre.sub('.\"\S*?\"',r'*',src_path)+'.'+src_pk_key==foreign_tgt_path: next_src_path=src_path+'.'+src_pk_key next_insert_data=self._get_data_from_db(foreign_column,foreign_table_name,foreign_schema,foreign_table_pk_key,foreign_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,next_src_path) self._insert_father_data(foreign_table_schema,foreign_table_name,next_insert_data,next_src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config) else: logging.warning(foreign_table_name+':havenonextlevel') exceptExceptionasdata: writeErrorLog('_insert_father_data','Error',str(data)) logging.error('_insert_father_data:'+str(data)) #从数据库中获取数据,并将获取到的数据,直接插入数据库中,返回递归需要使用的数据 def_get_data_from_db(self,src_tgt_column,src_table_name,src_table_schema,src_table_pk_key,src_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,tgt_path): try: result_data='{' src_column=src_tgt_column['source_column']#读取需要获取的字段 iflen(src_column)==0: logging(str(src_column)+'lengthequal0error') else: foriteminsrc_column:#拼接好sql语句,获取数据 select_sql1='concat(\'' select_sql1+=u'"'+item+'":"\',coalesce('+item+',\'\'),\'",' select_sql1=select_sql1[:-1]+'\')' select_sql="select"+select_sql1+"from"+src_table_schema+"."+src_table_name+"where"+src_table_pk_key+"=\'"+src_table_pk_value+"\';" #使用execute方法执行SQL语句 self.mes_mysql_cur.execute(select_sql) #使用fetchone()方法获取一条数据 data=self.mes_mysql_cur.fetchall() iflen(data)==0: result_data+='' else: result_data+=data[0][0]+',' ifresult_data!='{': tgt_value=result_data[:-1]+'}' else: tgt_value=result_data+'\"'+src_table_pk_key+'\":\"'+src_table_pk_value+'\"}' self._insert_data_from_db(tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,tgt_column,tgt_path,tgt_value)#将获取到的父表数据写入数据库 returntgt_value#返回写入的数据,和真是的写入路径(因为路径在配置表中层数多的是用*代替的,不是真正的绝对路径,这里返回的是绝对路径) exceptExceptionasdata: writeErrorLog('_get_data_from_db','Error',str(data)) logging.error('_get_data_from_db:'+str(data)) #将父表写入数据库 def_insert_data_from_db(self,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,tgt_column,tgt_path,tgt_value): try: insert_sql="update"+tgt_schema+"."+tgt_table_name+"set"+tgt_column+"=json_replace("+tgt_column+",\'"+tgt_path+"\',cast(\'"+tgt_value+"\'asjson))where"+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" #self.mysql_cur.execute(insert_sql.encode("utf-8").decode("latin1")) self.mysql_cur.execute(insert_sql) self.mysql_db.commit() exceptExceptionasdata: writeErrorLog('_insert_data_from_db','Error',str(data)) logging.error('_insert_data_from_db:'+str(data)) #当变更数据为外键时,补全外键对应的信息 def_update_data(self,tgt_schema,tgt_table_name,tgt_pk_key,src_table_name,update_columns,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config,src_table_schema): try: #判断是否涉及外键信息,判断变更的字段是否在外键信息里,将在的组成新的外键json,在调用_get_data_from_db进行更新数据 insert_data=json.loads(insert_data) forupdate_columninupdate_columns:# ifupdate_columninlist(insert_data.keys()): update_column_data='\"'+insert_data[update_column]+'\"' tgt_path=src_path+'.'+update_column self._insert_data_from_db(tgt_schema,tgt_table_name,tgt_pk_key,root_pk_value,tgt_column,tgt_path,update_column_data) self._insert_father_data(src_table_schema,src_table_name,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config) exceptExceptionasdata: writeErrorLog('_update_data','Error',str(data)) logging.error('_update_data:'+str(data)) #退出消费消息 def_exit_consumer(self): self._release_db() sys.exit() defexit_program(signum,frame): logging.info("ReceivedSignal:%satframe:%s"%(signum,frame)) p.stop_flag=1 defmain(): #实例化对象 p=RH_Consumer() signal.signal(signal.SIGTERM,exit_program) #whileTrue: p._do() main()
以上这篇python3实现从kafka获取数据,并解析为json格式,写入到mysql中就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。