PostgreSQL 数据同步到ES 搭建操作
安装python和dev开发包
[root@rtm2Packages]#rpm-ivhpython-devel-2.7.5-58.el7.x86_64.rpm 准备中...#################################[100%] 正在升级/安装... 1:python-devel-2.7.5-58.el7#################################[100%] [root@rtm2Packages]#ls
安装multicorn
[root@rtm2multicorn-1.3.5]#make Pythonversionis2.7 gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/errors.osrc/errors.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/python.osrc/python.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/query.osrc/query.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/multicorn.osrc/multicorn.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-shared-omulticorn.sosrc/errors.osrc/python.osrc/query.osrc/multicorn.o-L/opt/pgsql-10/lib-Wl,--as-needed-Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags-lpthread-ldl-lutil-lm-lpython2.7-lpthread-ldl-lutil-lm-lpython2.7-Xlinker-export-dynamic .//preflight-check.sh cpsql/multicorn.sqlsql/multicorn--1.3.5.sql [root@rtm2multicorn-1.3.5]#makeinstall Pythonversionis2.7 ...
安装pg-es-fdw-master
[root@rtm2multicorn-1.3.5]#cd../pg-es-fdw-master [root@rtm2pg-es-fdw-master]#ls demo.shditeLICENSEREADME.mdsetup.py [root@rtm2pg-es-fdw-master]#pythonsetup.pybuild runningbuild runningbuild_py creatingbuild creatingbuild/lib creatingbuild/lib/dite copyingdite/__init__.py->build/lib/dite [root@rtm2pg-es-fdw-master]#pythonsetup.pyinstall runninginstall runningbdist_egg runningegg_info creatingdite.egg-info writingdite.egg-info/PKG-INFO
安装插件multicorn
[postgres@rtm2~]$psql psql(10.3) Type"help"forhelp. postgres=#select*frompg_extension; extname|extowner|extnamespace|extrelocatable|extversion|extconfig|extcondition ---------+----------+--------------+----------------+------------+-----------+-------------- plpgsql|10|11|f|1.0|| (1row) postgres=#CREATEEXTENSIONmulticorn; CREATEEXTENSION postgres=#psql postgres=#select*frompg_extension; extname|extowner|extnamespace|extrelocatable|extversion|extconfig|extcondition -----------+----------+--------------+----------------+------------+-----------+-------------- plpgsql|10|11|f|1.0|| multicorn|10|2200|t|1.3.5|| (2rows) postgres=#CREATESERVERmulticorn_esFOREIGNDATAWRAPPERmulticornOPTIONS(wrapper'dite.ElasticsearchFDW'); CREATESERVER postgres=#
es
[root@rtm2config]#vielasticsearch.yml node.name:"es-node1" network.host:192.168.31.121 discovery.zen.ping.unicast.hosts:["192.168.31.121"]
[root@rtm2config]#vi/etc/sysctl.conf vm.max_map_count=262144 sysctl-p [root@rtm2config]#vi/etc/security/limits.conf #Endoffile rootsoftnofile65536 roothardnofile65536 rootsoftnproc4096 roothardnproc4096 ~
启动es
[root@rtm2bin]#ls elasticsearchelasticsearch.in.batelasticsearch-service-mgr.exeelasticsearch-service-x86.exeplugin.bat elasticsearch.batelasticsearch.in.shelasticsearch-service-x64.exepluginservice.bat [root@rtm2bin]#./bin/elasticsearch
test=#CREATEFOREIGNTABLEpp_es(idbigint,agebigint)SERVERmulticorn_esOPTIONS(host test(#'192.168.31.121',port'9200',node'es-node1',index'pp'); CREATEFOREIGNTABLE test=#
创建触发器和外部表
test=#CREATEORREPLACEFUNCTIONindex_pp()RETURNStriggerAS$def$ test$#BEGIN test$#INSERTINTOpp_es(id,age)VALUES test$#(NEW.id,NEW.age); test$#RETURNNEW; test$#END; test$#$def$LANGUAGEplpgsql; CREATEFUNCTION test=#CREATETRIGGERes_insert_ppAFTERINSERTONppFOREACHROWEXECUTEPROCEDUREindex_pp(); CREATETRIGGER test=#
新增数据测试
test=#insertintopp(id,age)values(1,11); INSERT01 test=#select*frompp; id|age ----+----- 1|11 (1row) test=#
检查es数据
[root@rtm2~]#curl'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'
{
"took":104,
"timed_out":false,
"_shards":{
"total":5,
"successful":5,
"failed":0
},
"hits":{
"total":2,
"max_score":1.0,
"hits":[{
"_index":"es-node1",
"_type":"pp",
"_id":"1",
"_score":1.0,
"_source":{"age":"11"}
},{
"_index":"es-node1",
"_type":"pp",
"_id":"2",
"_score":1.0,
"_source":{"age":"22"}
}]
}
}
[root@rtm2~]#
创建更新触发器
test=#CREATEORREPLACEFUNCTIONupdadeIndex_pp()RETURNStriggerAS$def$ BEGIN UPDATEpp_esSET id=NEW.id, age=NEW.age whereid=NEW.id; RETURNNEW; END; $def$LANGUAGEplpgsql; CREATEFUNCTION test=#^C test=# test=#CREATETRIGGERes_update_ppAFTERUPDATEOFid,ageONppFOREACHROWWHEN(OLD.*ISDISTINCT test(#FROMNEW.*)EXECUTEPROCEDUREupdadeIndex_pp(); CREATETRIGGER test=#
更新表数据
test=#select*frompp; id|age ----+----- 1|11 2|22 3|22 (3rows) test=#updateppaseta.age=33wherea.id=3; ERROR:column"a"ofrelation"pp"doesnotexist LINE1:updateppaseta.age=33wherea.id=3; ^ test=#updateppsetage=33whereid=3; UPDATE1 test=#select*frompp; id|age ----+----- 1|11 2|22 3|33 (3rows) test=#
es查询变更
[root@rtm2~]#curl'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'
{
"took":4,
"timed_out":false,
"_shards":{
"total":5,
"successful":5,
"failed":0
},
"hits":{
"total":3,
"max_score":1.0,
"hits":[{
"_index":"es-node1",
"_type":"pp",
"_id":"1",
"_score":1.0,
"_source":{"age":"11"}
},{
"_index":"es-node1",
"_type":"pp",
"_id":"2",
"_score":1.0,
"_source":{"age":"22"}
},{
"_index":"es-node1",
"_type":"pp",
"_id":"3",
"_score":1.0,
"_source":{"age":"33"}
}]
}
}
[root@rtm2~]#
补充:logstash同步pgsql数据到Elasticsearch
一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置
二、配置步骤
1、input配置
input{
stdin{
}
jdbc{
jdbc_connection_string=>"jdbc:postgresql://127.0.0.1:5432/world"
jdbc_user=>"postgres"
jdbc_password=>"zhang123"
jdbc_driver_library=>"D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"
jdbc_driver_class=>"org.postgresql.Driver"
jdbc_paging_enabled=>"true"
jdbc_page_size=>"300000"
use_column_value=>"true"
tracking_column=>"id"
statement_filepath=>"D:\logstash-6.4.0\bin\pgsql\jdbc.sql"
schedule=>"*****"
type=>"jdbc"
jdbc_default_timezone=>"Asia/Shanghai"
}
}
2、filter配置
filter{
json{
source=>"message"
remove_field=>["message"]
}
}
3、output配置,就是elasticsearch的基本配置
output{
elasticsearch{
hosts=>["localhost:9200"]
index=>"test_out"
template=>"D:\logstash-6.4.0\bin\pgsql\es-template.json"
template_name=>"t-statistic-out-logstash"
template_overwrite=>true
document_type=>"out"
document_id=>"%{id}"
}
stdout{
codec=>json_lines
}
}
以上就是整个logstash的jdbc.conf
4、es-template.json的配置
{
"template":"t-statistis-out-template",
"order":1,
"settings":{
"index":{
"refresh_interval":"5s"
}
},
"mappings":{
"_default_":{
"_all":{"enabled":false},
"dynamic_templates":[
{
"message_field":{
"match":"message",
"match_mapping_type":"string",
"mapping":{"type":"string","index":"not_analyzed"}
}
},{
"string_fields":{
"match":"*",
"match_mapping_type":"string",
"mapping":{"type":"string","index":"not_analyzed"}
}
}
],
"properties":{
"@timestamp":{
"type":"date"
},
"@version":{
"type":"keyword"
},
"id":{
"type":"keyword"
},
"name":{
"type":"keyword"
},
"pp":{
"type":"keyword"
}
}
}
},
"aliases":{}
}
最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据
启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)
logstash.bat-f./pgsql/jdbc.conf
以上为个人经验,希望能给大家一个参考,也希望大家多多支持毛票票。如有错误或未考虑完全的地方,望不吝赐教。
