MySQL 与 Elasticsearch 数据不对称问题解决办法
MySQL与Elasticsearch数据不对称问题解决办法
jdbc-input-plugin只能实现数据库的追加,对于elasticsearch增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。
当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。
这里有一个数据表article,mtime字段定义了ONUPDATECURRENT_TIMESTAMP所以每次更新mtime的时间都会变化
mysql>descarticle;
+-------------+--------------+------+-----+--------------------------------+-------+
|Field|Type|Null|Key|Default|Extra|
+-------------+--------------+------+-----+--------------------------------+-------+
|id|int(11)|NO||0||
|title|mediumtext|NO||NULL||
|description|mediumtext|YES||NULL||
|author|varchar(100)|YES||NULL||
|source|varchar(100)|YES||NULL||
|content|longtext|YES||NULL||
|status|enum('Y','N')|NO||'N'||
|ctime|timestamp|NO||CURRENT_TIMESTAMP||
|mtime|timestamp|YES||ONUPDATECURRENT_TIMESTAMP||
+-------------+--------------+------+-----+--------------------------------+-------+
7rowsinset(0.00sec)
logstash增加mtime的查询规则
jdbc{
jdbc_driver_library=>"/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class=>"com.mysql.jdbc.Driver"
jdbc_connection_string=>"jdbc:mysql://localhost:3306/cms"
jdbc_user=>"cms"
jdbc_password=>"password"
schedule=>"*****"#定时cron的表达式,这里是每分钟执行一次
statement=>"select*fromarticlewheremtime>:sql_last_value"
use_column_value=>true
tracking_column=>"mtime"
tracking_column_type=>"timestamp"
record_last_run=>true
last_run_metadata_path=>"/var/tmp/article-mtime.last"
}
创建回收站表,这个事用于解决数据库删除,或者禁用status='N'这种情况的。
CREATETABLE`elasticsearch_trash`( `id`int(11)NOTNULL, `ctime`timestampNULLDEFAULTCURRENT_TIMESTAMP, PRIMARYKEY(`id`) )ENGINE=InnoDBDEFAULTCHARSET=utf8
为article表创建触发器
CREATEDEFINER=`dba`@`%`TRIGGER`article_BEFORE_UPDATE`BEFOREUPDATEON`article`FOREACHROW BEGIN --此处的逻辑是解决文章状态变为N的时候,需要将搜索引擎中对应的数据删除。 IFNEW.status='N'THEN insertintoelasticsearch_trash(id)values(OLD.id); ENDIF; --此处逻辑是修改状态到Y的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。 IFNEW.status='Y'THEN deletefromelasticsearch_trashwhereid=OLD.id; ENDIF; END CREATEDEFINER=`dba`@`%`TRIGGER`article_BEFORE_DELETE`BEFOREDELETEON`article`FOREACHROW BEGIN --此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 insertintoelasticsearch_trash(id)values(OLD.id); END
接下来我们需要写一个简单地Shell每分钟运行一次,从elasticsearch_trash数据表中取出数据,然后使用curl命令调用elasticsearchrestful接口,删除被收回的数据。
你还可以开发相关的程序,这里提供一个Springboot定时任务例子。
实体
packagecn.netkiller.api.domain.elasticsearch;
importjava.util.Date;
importjavax.persistence.Column;
importjavax.persistence.Entity;
importjavax.persistence.Id;
importjavax.persistence.Table;
@Entity
@Table
publicclassElasticsearchTrash{
@Id
privateintid;
@Column(columnDefinition="TIMESTAMPDEFAULTCURRENT_TIMESTAMP")
privateDatectime;
publicintgetId(){
returnid;
}
publicvoidsetId(intid){
this.id=id;
}
publicDategetCtime(){
returnctime;
}
publicvoidsetCtime(Datectime){
this.ctime=ctime;
}
}
仓库
packagecn.netkiller.api.repository.elasticsearch; importorg.springframework.data.repository.CrudRepository; importcom.example.api.domain.elasticsearch.ElasticsearchTrash; publicinterfaceElasticsearchTrashRepositoryextendsCrudRepository{ }
定时任务
packagecn.netkiller.api.schedule;
importorg.elasticsearch.action.delete.DeleteResponse;
importorg.elasticsearch.client.transport.TransportClient;
importorg.elasticsearch.rest.RestStatus;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.scheduling.annotation.Scheduled;
importorg.springframework.stereotype.Component;
importcom.example.api.domain.elasticsearch.ElasticsearchTrash;
importcom.example.api.repository.elasticsearch.ElasticsearchTrashRepository;
@Component
publicclassScheduledTasks{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ScheduledTasks.class);
@Autowired
privateTransportClientclient;
@Autowired
privateElasticsearchTrashRepositoryalasticsearchTrashRepository;
publicScheduledTasks(){
}
@Scheduled(fixedRate=1000*60)//60秒运行一次调度任务
publicvoidcleanTrash(){
for(ElasticsearchTrashelasticsearchTrash:alasticsearchTrashRepository.findAll()){
DeleteResponseresponse=client.prepareDelete("information","article",elasticsearchTrash.getId()+"").get();
RestStatusstatus=response.status();
logger.info("delete{}{}",elasticsearchTrash.getId(),status.toString());
if(status==RestStatus.OK||status==RestStatus.NOT_FOUND){
alasticsearchTrashRepository.delete(elasticsearchTrash);
}
}
}
}
Springboot启动主程序。
packagecn.netkiller.api;
importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;
importorg.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
publicclassApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(Application.class,args);
}
}
以上就是MySQL与Elasticsearch数据不对称问题解决办法的讲解,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!