Python 获取 datax 执行结果保存到数据库的方法
执行datax作业,创建执行文件,在crontab中每天1点(下面有关系)执行:
其中job_start及job_finish这两行记录是自己添加的,为了方便识别出哪张表。
#!/bin/bash
source/etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"
jobfile=(
job_table_a.json
job_table_b.json
)
forfilenamein${jobfile[@]}
do
echo"job_start:"`date"+%Y-%m-%d%H:%M:%S"`"${filename}"
python/opt/datax/bin/datax.py-p"-Duser1=${user1}-Dpass1=${pass1}-Duser2=${user2}-Dpass2=${pass2}"${job_path}${filename}
echo"job_finish:"`date"+%Y-%m-%d%H:%M:%S"`"${filename}"
done
#01***/opt/datax/job/dc_to_ods_incr.sh>>/opt/datax/job/log/dc_to_ods_incr_$(date+\%Y\%m\%d_\%H\%M\%S).log2>&1
#egrep'任务|速度|总数|job_start|job_finish'/opt/datax/job/log/
datax执行日志:
job_start:2018-08-0801:13:28job_table_a.json 任务启动时刻:2018-08-0801:13:28 任务结束时刻:2018-08-0801:14:49 任务总计耗时:81s 任务平均流量:192.82KB/s 记录写入速度:1998rec/s 读出记录总数:159916 读写失败总数:0 job_finish:2018-08-0801:14:49job_table_a.json job_start:2018-08-0801:14:49job_table_b.json 任务启动时刻:2018-08-0801:14:50 任务结束时刻:2018-08-0801:15:01 任务总计耗时:11s 任务平均流量:0B/s 记录写入速度:0rec/s 读出记录总数:0 读写失败总数:0 job_finish:2018-08-0801:15:01job_table_b.json
接下来读取这些信息保存到数据库,在数据库中创建表:
CREATETABLE`datax_job_result`( `log_file`varchar(200)DEFAULTNULL, `job_file`varchar(200)DEFAULTNULL, `start_time`datetimeDEFAULTNULL, `end_time`datetimeDEFAULTNULL, `seconds`int(11)DEFAULTNULL, `traffic`varchar(50)DEFAULTNULL, `write_speed`varchar(50)DEFAULTNULL, `read_record`int(11)DEFAULTNULL, `failed_record`int(11)DEFAULTNULL, `job_start`varchar(200)DEFAULTNULL, `job_finish`varchar(200)DEFAULTNULL, `insert_time`datetimeDEFAULTCURRENT_TIMESTAMP )ENGINE=InnoDBDEFAULTCHARSET=utf8;
定时执行以下文件,因为datax作业1点执行,为了获取一天内最新生产的日志,脚本中取82800内生产的日志文件,及23小时内生产的那个最新日志。所以一天内任何时间执行都可以。此文件也是定时每天执行(判断datax作业完成后执行)
#!/usr/bin/python
#-*-coding:UTF-8-*-
#05***source/etc/profile&&/usr/bin/python2.7/opt/datax/job/save_log_to_db.py>/dev/null2>&1
importre
importos
importsqlalchemy
importpandasaspd
importdatetimeasdt
defsave_to_db(df):
engine=sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test",encoding="utf-8")
df.to_sql("datax_job_result",engine,index=False,if_exists='append')
defget_the_latest_file(path):
t0=dt.datetime.utcfromtimestamp(0)
d2=(dt.datetime.now()-t0).total_seconds()
d1=d2-82800
for(dirpath,dirnames,filenames)inos.walk(path):
forfilenameinsorted(filenames,reverse=True):
iffilename.endswith(".log"):
f=os.path.join(dirpath,filename)
ctime=os.stat(f)[-1]
ifctime>=d1andctime<=d2:
returnf
defget_job_result_from_logfile(path):
result=pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
log_file=get_the_latest_file(path)
index=0
content=open(log_file,"r")
forlineincontent:
result.loc[index,'log_file']=log_file
ifre.compile(r'job_start').match(line):
result.loc[index,'job_file']=line.split('')[4].strip()
result.loc[index,'job_start']=line,
elifre.compile(r'任务启动时刻').match(line):
result.loc[index,'start_time']=line.split('刻')[1].strip().split('')[1].strip()+''+line.split('刻')[1].strip().split('')[2].strip()
elifre.compile(r'任务结束时刻').match(line):
result.loc[index,'end_time']=line.split('刻')[1].strip().split('')[1].strip()+''+line.split('刻')[1].strip().split('')[2].strip()
elifre.compile(r'任务总计耗时').match(line):
result.loc[index,'seconds']=line.split(':')[1].strip().replace('s','')
elifre.compile(r'任务平均流量').match(line):
result.loc[index,'traffic']=line.split(':')[1].strip()
elifre.compile(r'记录写入速度').match(line):
result.loc[index,'write_speed']=line.split(':')[1].strip()
elifre.compile(r'读出记录总数').match(line):
result.loc[index,'read_record']=line.split(':')[1].strip()
elifre.compile(r'读写失败总数').match(line):
result.loc[index,'failed_record']=line.split(':')[1].strip()
elifre.compile(r'job_finish').match(line):
result.loc[index,'job_finish']=line,
index=index+1
else:
pass
save_to_db(result)
get_job_result_from_logfile("/opt/datax/job/log")
以上这篇Python获取datax执行结果保存到数据库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。