Python 调用 ES、Solr、Phoenix的示例代码
#!/usr/bin/envpython
#-*-coding:utf-8-*-
#*************************************
#@Time:2019/8/12
#@Author:ZhangFan
#@Desc:Library
#@File:MyDatabases.py
#@Update:2019/8/23
#*************************************
importelasticsearch
importphoenixdb
importpysolr
importpymysql
classMyELS(object):
"""
===================================================================
=====================MyELS=========================
===================================================================
"""
def__init__(self):
self.els_conn=None
defconnect_to_els(self,host,port):
"""
连接到ElasticSearch服务器.
"""
self.els_conn=elasticsearch.Elasticsearch([{'host':host,'port':port}])
print('Executing:ConnectToElasticSearch|%s'%self.els_conn)
defget_els_data(self,query,index):
"""
获取ElasticSearch数据
"""
print('Executing:Search|%s'%query)
try:
rst=self.els_conn.search(index=index,q=query)
returnrst['hits']
exceptExceptionase:
print('ElasticSearchError|%s'%e)
raiseException(e)
classMyPhoenix(object):
"""
===================================================================
=====================MyPhoenix======================
===================================================================
"""
def__init__(self):
self.phoenix_conn=None
self.phoenix_cursor=None
defconnect_to_phoenix(self,host,port=8765):
"""
连接到phoenix服务器
"""
address='http://{0}:{1}/'.format(host,port)
print('Executing:ConnectToPhoenix|%s'%address)
self.phoenix_conn=phoenixdb.connect(address,autocommit=True)
self.phoenix_cursor=self.phoenix_conn.cursor()
defset_schema(self,sql,schema):
"""
设置schema
"""
pre_sub,sub,fol_sub=sql.upper().partition('FROM')
fol_sub=''+schema+'.'+fol_sub.strip()
new_sql=''.join([pre_sub,sub,fol_sub])
returnnew_sql
defexecute_phoenix_sql(self,sql):
"""
执行sql语句
"""
#sql=self.set_schema(sql,schema)
print('Executing:Execute|%s'%sql)
self.phoenix_cursor.execute(sql)
defget_from_phoenix(self,sql):
"""
获取phoenix数据
"""
#sql=self.set_schema(sql,schema)
print('Executing:Query|%s'%sql)
try:
self.phoenix_cursor.execute(sql)
exceptExceptionase:
print('PhoenixError|%s'%e)
raiseException(e)
returnself.phoenix_cursor.fetchall()
defdisconnect_from_phoenix(self):
"""
断开phoenix连接
"""
print('Executing:DisconnectFromHBase')
self.phoenix_cursor.close()
self.phoenix_conn.close()
classMySolr(object):
"""
===================================================================
=====================MySolr=========================
===================================================================
"""
def__init__(self):
self.solr_conn=None
self.base_url=None
defconnect_to_solr(self,address,selector):
"""连接到solr服务器.
"""
self.base_url='http://{0}/solr/{1}/'.format(address,selector)
self.solr_conn=pysolr.Solr(self.base_url)
print('Executing:ConnectToSolr|%s'%self.base_url)
defget_solr_data(self,query):
"""
获取solr数据
"""
results=list()
print('Executing:Search|%s'%query)
try:
items=self.solr_conn.search(query)
foriteminitems:
results.append(item)
exceptExceptionase:
print('SolrError|%s'%e)
raiseException(e)
returnresults
defadd_solr_data(self,data):
"""
添加solr数据
"""
print('Executing:add|%s'%data)
try:
self.solr_conn.add([data])
self.solr_conn.commit()
exceptExceptionase:
print('SolrError|%s'%e)
raiseException(e)
defdel_solr_byId(self,data):
"""
删除solr数据
"""
print('Executing:del|%s'%data)
try:
self.solr_conn.delete(id=data)
self.solr_conn.commit()
exceptExceptionase:
print('SolrError|%s'%e)
raiseException(e)
if__name__=='__main__':
print('Thisistest.')
ms=MySolr()
me=MyELS()
mp=MyPhoenix()
以上就是Python调用ES、Solr、Phoenix的示例代码的详细内容,更多关于Python调用ES、Solr、Phoenix的资料请关注毛票票其它相关文章!