Python定时从Mysql提取数据存入Redis的实现
设计思路:
1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来
2.然后实例化redis类,将数据简单解析后逐条传入redis队列
3.定时器设计每天凌晨12点开始跑
ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档。
#-*-coding:utf-8-*-
importMySQLdb
importschedule
importtime
importdatetime
importrandom
importstring
importredis
#getthedatafrommysql
classFromSql(object):
def__init__(self,conn):
self.conn=conn
defacquire(self):
cursor=self.conn.cursor()
try:
sql="SELECT*FROMtestWHERETO_DAYS(NOW())-TO_DAYS(t)<=1"
cursor.execute(sql)
rs=cursor.fetchall()
#print(rs)
foreveinrs:
print('%s,%s,%s,%s'%eve)
copy_rs=rs
cursor.close()
returncopy_rs
exceptExceptionase:
print("Theerror:%s"%e)
classRedisQueue(object):
def__init__(self,name,namespace='queue',**redis_kwargs):
"""Thedefaultconnectionparametersare:host='localhost',port=6379,db=0"""
self.__db=redis.Redis(**redis_kwargs)
self.key='%s:%s'%(namespace,name)
defqsize(self):
returnself.__db.llen(self.key)
defput(self,item):
self.__db.rpush(self.key,item)
defget(self,block=True,timeout=None):
ifblock:
item=self.__db.blpop(self.key,timeout=timeout)
else:
item=self.__db.lpop(self.key)
ifitem:
item=item[1]
returnitem
defget_nowait(self):
returnself.get(False)
if__name__=="__main__":
#connectmysqldb
conn_sql=MySQLdb.connect(
host='127.0.0.1',
port=3306,
user='root',
passwd='',
db='test',
charset='utf8'
)
defjob_for_redis():
get_data=FromSql(conn_sql)
data=get_data.acquire()
q=RedisQueue('test',host='localhost',port=6379,db=0)
forsingle_dataindata:
formeta_datainsingle_data:
q.put(meta_data)
print(meta_data)
print("Alldatahadbeeninserted.")
"""
try:
schedule.every().day.at("00:00").do(job_for_redis)
exceptExceptionase:
print('Error:%s'%e)
#finally:
#conn.close()
whileTrue:
schedule.run_pending()
time.sleep(1)
"""
补充知识:python定时获取汇率存入数据库
python定时任务:
我们可以使用轻量级的第三方模块schedule。首先先安装:pipinstallschedule
定时任务的的小测试:
importschedule
importtime
defjob():
print("I'mworking...")
schedule.every(10).minutes.do(job)#每隔10分钟执行一次任务
schedule.every().hour.do(job)#每隔一小时执行一次任务
schedule.every().day.at("10:30").do(job)#每天10:30执行一次任务
schedule.every(5).to(10).days.do(job)#每5-10天执行一次任务
schedule.every().monday.do(job)#每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(job)#每周三13:15执行一次任务
whileTrue:
schedule.run_pending()
获取数据存入数据库:(格式可能不太对,还有一些符号。自己修改一下即可)
importpymysql
importschedule
importtime
importrequests
importpandas
fromsqlalchemyimportcreate_engine
#获取美元的所有外汇
defjob():
content='美元'
url='http://www.boc.cn/sourcedb/whpj/index.html'#外汇数据地址
html=requests.get(url).content.decode('utf-8')
index=html.index(''+content+' ')
str=html[index:index+300]
result=re.findall('(.*?) ',str)
print("币种:"+result[0])
print("现汇买入价:"+result[1])
print("现钞买入价:"+result[2])
print("现汇卖出价:"+result[3])
print("现钞卖出价:"+result[4])
print("中行结算价:"+result[5])
print("发布时间:"+result[6]+''+result[7])
#本地地址数据库账号密码数据库名
db=pymysql.connect('localhost','root','root','pinyougoudb')
cursor=db.cursor()
#sql语句
sql="updatetb_moneysethuiBuy=%s,chaoBuy=%s,huiSale=%s,chaoSale=%s,centerResult=%s,publishTime='%s'wheretypeId='%s'"%(result[1],result[2],result[3],result[4],result[5],result[6]+''+result[7],result[0])
cursor.execute(sql)
db.commit()
print('success')
#查询语句,将存入的数据查出来
#sqlalchemy进行数据库初始化
engine=create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
sql='''select*fromtb_money'''
#pandas进行数据库读写
df=pandas.read_sql_query(sql,engine)
print(df)
db.commit()
#每隔几分中刷新一次
#schedule.every(0.1).minutes.do(job)
#每天什么时候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)
#一直循环知道满足条件执行
whileTrue:
schedule.run_pending()
以上这篇Python定时从Mysql提取数据存入Redis的实现就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。