Python对ElasticSearch获取数据及操作
使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下
Version
Python:2.7
ElasticSearch:6.3
代码:
#!/usr/bin/envpython #-*-coding:utf-8-*- """ @Time:2018/7/4 @Author:LiuXueWen @Site: @File:ElasticSearchOperation.py @Software:PyCharm @Description:对elasticsearch数据的操作,包括获取数据,发送数据 """ importelasticsearch importjson importUtil_Ini_Operation classelasticsearch_data(): def__init__(self,hosts,username,password,maxsize,is_ssl): #初始化ini操作脚本,获取配置文件 try: #判断请求方式是否ssl加密 ifis_ssl=="true": #获取证书地址 cert_pem=Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs") es_ssl=elasticsearch.Elasticsearch( #地址 hosts=hosts, #用户名密码 http_auth=(username,password), #开启ssl use_ssl=True, #确认有加密证书 verify_certs=True, #对应的加密证书地址 client_cert=cert_pem ) self.es=es_ssl elifis_ssl=="false": #创建普通类型的ES客户端 es_ordinary=elasticsearch.Elasticsearch(hosts,http_auth=(username,password),maxsize=int(maxsize)) self.es=es_ordinary exceptExceptionase: print(e) defquery_data(self,keywords_list,date): gte="now-"+str(date) query_data={ #查询语句 "query":{ "bool":{ "must":[ { "query_string":{ "query":keywords_list, "analyze_wildcard":True } }, { "range":{ "@timestamp":{ "gte":gte, "lte":"now", "format":"epoch_millis" } } } ], "must_not":[] } } } returnquery_data #从es获取数据 defget_datas_by_query(self,index_name,keywords,param,date): ''' :paramindex_name:索引名称 :paramkeywords:关键字词,数组 :paramparam:需要数据条件,例如_source :paramdate:过去时间范围,字符串格式,例如过去30分钟内数据,"30m" :return:all_datas返回查询到的所有数据(已经过param过滤) ''' all_datas=[] #遍历所有的查询条件 forkeywords_listinkeywords: #DSL语句 query_data=self.query_data(keywords_list,date) res=self.es.search( index=index_name, body=query_data ) forhitinres['hits']['hits']: #获取指定的内容 response=hit[param] #添加所有数据到数据集中 all_datas.append(response) #返回所有数据内容 returnall_datas #当索引不存在创建索引 defcreate_index(self,index_name): ''' :paramindex_name:索引名称 :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称 ''' #获取索引的映射 #index_mapping=IndexMapping.index_mapping ##判断索引是否存在 #ifself.es.indices.exists(index=index_name)isnotTrue: ##创建索引 #res=self.es.indices.create(index=index_name,body=index_mapping) ##返回结果 #returnres #else: ##返回索引名称 #returnindex_name pass #插入指定的单条数据内容 definsert_single_data(self,index_name,doc_type,data): ''' :paramindex_name:索引名称 :paramdoc_type:文档类型 :paramdata:需要插入的数据内容 :return:执行结果 ''' res=self.es.index(index=index_name,doc_type=doc_type,body=data) returnres #向ES中新增数据,批量插入 definsert_datas(self,index_name): ''' :desc通过读取指定的文件内容获取需要插入的数据集 :paramindex_name:索引名称 :return:插入成功的数据条数 ''' insert_datas=[] #判断插入数据的索引是否存在 self.createIndex(index_name=index_name) #获取插入数据的文件地址 data_file_path=self.ini.get_key_value("datafile","datafilepath") #获取需要插入的数据集 withopen(data_file_path,"r+")asdata_file: #获取文件所有数据 data_lines=data_file.readlines() fordata_lineindata_lines: #stringtojson data_line=json.loads(data_line) insert_datas.append(data_line) #批量处理 res=self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True) returnres #从ES中在指定的索引中删除指定数据(根据id判断) defdelete_data_by_id(self,index_name,doc_type,id): ''' :paramindex_name:索引名称 :paramindex_type:文档类型 :paramid:唯一标识id :return:删除结果信息 ''' res=self.es.delete(index=index_name,doc_type=doc_type,id=id) returnres #根据条件删除数据 defdelete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time): ''' :paramindex_name:索引名称,为空查询所有索引 :paramdoc_type:文档类型,为空查询所有文档类型 :paramparam:过滤条件值 :paramgt_time:时间范围,大于该时间 :paramlt_time:时间范围,小于该时间 :return:执行条件删除后的结果信息 ''' #DSL语句 query_data={ #查询语句 "query":{ "bool":{ "must":[ { "query_string":{ "query":param, "analyze_wildcard":True } }, { "range":{ "@timestamp":{ "gte":gt_time, "lte":lt_time, "format":"epoch_millis" } } } ], "must_not":[] } } } res=self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True) returnres #指定index中删除指定时间段内的全部数据 defdelete_all_datas(self,index_name,doc_type,gt_time,lt_time): ''' :paramindex_name:索引名称,为空查询所有索引 :paramdoc_type:文档类型,为空查询所有文档类型 :paramgt_time:时间范围,大于该时间 :paramlt_time:时间范围,小于该时间 :return:执行条件删除后的结果信息 ''' #DSL语句 query_data={ #查询语句 "query":{ "bool":{ "must":[ { "match_all":{} }, { "range":{ "@timestamp":{ "gte":gt_time, "lte":lt_time, "format":"epoch_millis" } } } ], "must_not":[] } } } res=self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True) returnres #修改ES中指定的数据 defupdate_data_by_id(self,index_name,doc_type,id,data): ''' :paramindex_name:索引名称 :paramdoc_type:文档类型,为空表示所有类型 :paramid:文档唯一标识编号 :paramdata:更新的数据 :return:更新结果信息 ''' res=self.es.update(index=index_name,doc_type=doc_type,id=id,body=data) returnres
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。