Python对ElasticSearch获取数据及操作
使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下
Version
Python:2.7
ElasticSearch:6.3
代码:
#!/usr/bin/envpython
#-*-coding:utf-8-*-
"""
@Time:2018/7/4
@Author:LiuXueWen
@Site:
@File:ElasticSearchOperation.py
@Software:PyCharm
@Description:对elasticsearch数据的操作,包括获取数据,发送数据
"""
importelasticsearch
importjson
importUtil_Ini_Operation
classelasticsearch_data():
def__init__(self,hosts,username,password,maxsize,is_ssl):
#初始化ini操作脚本,获取配置文件
try:
#判断请求方式是否ssl加密
ifis_ssl=="true":
#获取证书地址
cert_pem=Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
es_ssl=elasticsearch.Elasticsearch(
#地址
hosts=hosts,
#用户名密码
http_auth=(username,password),
#开启ssl
use_ssl=True,
#确认有加密证书
verify_certs=True,
#对应的加密证书地址
client_cert=cert_pem
)
self.es=es_ssl
elifis_ssl=="false":
#创建普通类型的ES客户端
es_ordinary=elasticsearch.Elasticsearch(hosts,http_auth=(username,password),maxsize=int(maxsize))
self.es=es_ordinary
exceptExceptionase:
print(e)
defquery_data(self,keywords_list,date):
gte="now-"+str(date)
query_data={
#查询语句
"query":{
"bool":{
"must":[
{
"query_string":{
"query":keywords_list,
"analyze_wildcard":True
}
},
{
"range":{
"@timestamp":{
"gte":gte,
"lte":"now",
"format":"epoch_millis"
}
}
}
],
"must_not":[]
}
}
}
returnquery_data
#从es获取数据
defget_datas_by_query(self,index_name,keywords,param,date):
'''
:paramindex_name:索引名称
:paramkeywords:关键字词,数组
:paramparam:需要数据条件,例如_source
:paramdate:过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
:return:all_datas返回查询到的所有数据(已经过param过滤)
'''
all_datas=[]
#遍历所有的查询条件
forkeywords_listinkeywords:
#DSL语句
query_data=self.query_data(keywords_list,date)
res=self.es.search(
index=index_name,
body=query_data
)
forhitinres['hits']['hits']:
#获取指定的内容
response=hit[param]
#添加所有数据到数据集中
all_datas.append(response)
#返回所有数据内容
returnall_datas
#当索引不存在创建索引
defcreate_index(self,index_name):
'''
:paramindex_name:索引名称
:return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
'''
#获取索引的映射
#index_mapping=IndexMapping.index_mapping
##判断索引是否存在
#ifself.es.indices.exists(index=index_name)isnotTrue:
##创建索引
#res=self.es.indices.create(index=index_name,body=index_mapping)
##返回结果
#returnres
#else:
##返回索引名称
#returnindex_name
pass
#插入指定的单条数据内容
definsert_single_data(self,index_name,doc_type,data):
'''
:paramindex_name:索引名称
:paramdoc_type:文档类型
:paramdata:需要插入的数据内容
:return:执行结果
'''
res=self.es.index(index=index_name,doc_type=doc_type,body=data)
returnres
#向ES中新增数据,批量插入
definsert_datas(self,index_name):
'''
:desc通过读取指定的文件内容获取需要插入的数据集
:paramindex_name:索引名称
:return:插入成功的数据条数
'''
insert_datas=[]
#判断插入数据的索引是否存在
self.createIndex(index_name=index_name)
#获取插入数据的文件地址
data_file_path=self.ini.get_key_value("datafile","datafilepath")
#获取需要插入的数据集
withopen(data_file_path,"r+")asdata_file:
#获取文件所有数据
data_lines=data_file.readlines()
fordata_lineindata_lines:
#stringtojson
data_line=json.loads(data_line)
insert_datas.append(data_line)
#批量处理
res=self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
returnres
#从ES中在指定的索引中删除指定数据(根据id判断)
defdelete_data_by_id(self,index_name,doc_type,id):
'''
:paramindex_name:索引名称
:paramindex_type:文档类型
:paramid:唯一标识id
:return:删除结果信息
'''
res=self.es.delete(index=index_name,doc_type=doc_type,id=id)
returnres
#根据条件删除数据
defdelete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
'''
:paramindex_name:索引名称,为空查询所有索引
:paramdoc_type:文档类型,为空查询所有文档类型
:paramparam:过滤条件值
:paramgt_time:时间范围,大于该时间
:paramlt_time:时间范围,小于该时间
:return:执行条件删除后的结果信息
'''
#DSL语句
query_data={
#查询语句
"query":{
"bool":{
"must":[
{
"query_string":{
"query":param,
"analyze_wildcard":True
}
},
{
"range":{
"@timestamp":{
"gte":gt_time,
"lte":lt_time,
"format":"epoch_millis"
}
}
}
],
"must_not":[]
}
}
}
res=self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
returnres
#指定index中删除指定时间段内的全部数据
defdelete_all_datas(self,index_name,doc_type,gt_time,lt_time):
'''
:paramindex_name:索引名称,为空查询所有索引
:paramdoc_type:文档类型,为空查询所有文档类型
:paramgt_time:时间范围,大于该时间
:paramlt_time:时间范围,小于该时间
:return:执行条件删除后的结果信息
'''
#DSL语句
query_data={
#查询语句
"query":{
"bool":{
"must":[
{
"match_all":{}
},
{
"range":{
"@timestamp":{
"gte":gt_time,
"lte":lt_time,
"format":"epoch_millis"
}
}
}
],
"must_not":[]
}
}
}
res=self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
returnres
#修改ES中指定的数据
defupdate_data_by_id(self,index_name,doc_type,id,data):
'''
:paramindex_name:索引名称
:paramdoc_type:文档类型,为空表示所有类型
:paramid:文档唯一标识编号
:paramdata:更新的数据
:return:更新结果信息
'''
res=self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
returnres
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。