PostgreSQL 数据同步到ES 搭建操作
安装python和dev开发包
[root@rtm2Packages]#rpm-ivhpython-devel-2.7.5-58.el7.x86_64.rpm 准备中...#################################[100%] 正在升级/安装... 1:python-devel-2.7.5-58.el7#################################[100%] [root@rtm2Packages]#ls
安装multicorn
[root@rtm2multicorn-1.3.5]#make Pythonversionis2.7 gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/errors.osrc/errors.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/python.osrc/python.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/query.osrc/query.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-I/usr/include/python2.7-I/usr/include/python2.7-I.-I./-I/opt/pgsql-10/include/server-I/opt/pgsql-10/include/internal-D_GNU_SOURCE-c-osrc/multicorn.osrc/multicorn.c gcc-Wall-Wmissing-prototypes-Wpointer-arith-Wdeclaration-after-statement-Wendif-labels-Wmissing-format-attribute-Wformat-security-fno-strict-aliasing-fwrapv-fexcess-precision=standard-O2-fPIC-shared-omulticorn.sosrc/errors.osrc/python.osrc/query.osrc/multicorn.o-L/opt/pgsql-10/lib-Wl,--as-needed-Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags-lpthread-ldl-lutil-lm-lpython2.7-lpthread-ldl-lutil-lm-lpython2.7-Xlinker-export-dynamic .//preflight-check.sh cpsql/multicorn.sqlsql/multicorn--1.3.5.sql [root@rtm2multicorn-1.3.5]#makeinstall Pythonversionis2.7 ...
安装pg-es-fdw-master
[root@rtm2multicorn-1.3.5]#cd../pg-es-fdw-master [root@rtm2pg-es-fdw-master]#ls demo.shditeLICENSEREADME.mdsetup.py [root@rtm2pg-es-fdw-master]#pythonsetup.pybuild runningbuild runningbuild_py creatingbuild creatingbuild/lib creatingbuild/lib/dite copyingdite/__init__.py->build/lib/dite [root@rtm2pg-es-fdw-master]#pythonsetup.pyinstall runninginstall runningbdist_egg runningegg_info creatingdite.egg-info writingdite.egg-info/PKG-INFO
安装插件multicorn
[postgres@rtm2~]$psql psql(10.3) Type"help"forhelp. postgres=#select*frompg_extension; extname|extowner|extnamespace|extrelocatable|extversion|extconfig|extcondition ---------+----------+--------------+----------------+------------+-----------+-------------- plpgsql|10|11|f|1.0|| (1row) postgres=#CREATEEXTENSIONmulticorn; CREATEEXTENSION postgres=#psql postgres=#select*frompg_extension; extname|extowner|extnamespace|extrelocatable|extversion|extconfig|extcondition -----------+----------+--------------+----------------+------------+-----------+-------------- plpgsql|10|11|f|1.0|| multicorn|10|2200|t|1.3.5|| (2rows) postgres=#CREATESERVERmulticorn_esFOREIGNDATAWRAPPERmulticornOPTIONS(wrapper'dite.ElasticsearchFDW'); CREATESERVER postgres=#
es
[root@rtm2config]#vielasticsearch.yml node.name:"es-node1" network.host:192.168.31.121 discovery.zen.ping.unicast.hosts:["192.168.31.121"]
[root@rtm2config]#vi/etc/sysctl.conf vm.max_map_count=262144 sysctl-p [root@rtm2config]#vi/etc/security/limits.conf #Endoffile rootsoftnofile65536 roothardnofile65536 rootsoftnproc4096 roothardnproc4096 ~
启动es
[root@rtm2bin]#ls elasticsearchelasticsearch.in.batelasticsearch-service-mgr.exeelasticsearch-service-x86.exeplugin.bat elasticsearch.batelasticsearch.in.shelasticsearch-service-x64.exepluginservice.bat [root@rtm2bin]#./bin/elasticsearch
test=#CREATEFOREIGNTABLEpp_es(idbigint,agebigint)SERVERmulticorn_esOPTIONS(host test(#'192.168.31.121',port'9200',node'es-node1',index'pp'); CREATEFOREIGNTABLE test=#
创建触发器和外部表
test=#CREATEORREPLACEFUNCTIONindex_pp()RETURNStriggerAS$def$ test$#BEGIN test$#INSERTINTOpp_es(id,age)VALUES test$#(NEW.id,NEW.age); test$#RETURNNEW; test$#END; test$#$def$LANGUAGEplpgsql; CREATEFUNCTION test=#CREATETRIGGERes_insert_ppAFTERINSERTONppFOREACHROWEXECUTEPROCEDUREindex_pp(); CREATETRIGGER test=#
新增数据测试
test=#insertintopp(id,age)values(1,11); INSERT01 test=#select*frompp; id|age ----+----- 1|11 (1row) test=#
检查es数据
[root@rtm2~]#curl'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty' { "took":104, "timed_out":false, "_shards":{ "total":5, "successful":5, "failed":0 }, "hits":{ "total":2, "max_score":1.0, "hits":[{ "_index":"es-node1", "_type":"pp", "_id":"1", "_score":1.0, "_source":{"age":"11"} },{ "_index":"es-node1", "_type":"pp", "_id":"2", "_score":1.0, "_source":{"age":"22"} }] } } [root@rtm2~]#
创建更新触发器
test=#CREATEORREPLACEFUNCTIONupdadeIndex_pp()RETURNStriggerAS$def$ BEGIN UPDATEpp_esSET id=NEW.id, age=NEW.age whereid=NEW.id; RETURNNEW; END; $def$LANGUAGEplpgsql; CREATEFUNCTION test=#^C test=# test=#CREATETRIGGERes_update_ppAFTERUPDATEOFid,ageONppFOREACHROWWHEN(OLD.*ISDISTINCT test(#FROMNEW.*)EXECUTEPROCEDUREupdadeIndex_pp(); CREATETRIGGER test=#
更新表数据
test=#select*frompp; id|age ----+----- 1|11 2|22 3|22 (3rows) test=#updateppaseta.age=33wherea.id=3; ERROR:column"a"ofrelation"pp"doesnotexist LINE1:updateppaseta.age=33wherea.id=3; ^ test=#updateppsetage=33whereid=3; UPDATE1 test=#select*frompp; id|age ----+----- 1|11 2|22 3|33 (3rows) test=#
es查询变更
[root@rtm2~]#curl'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty' { "took":4, "timed_out":false, "_shards":{ "total":5, "successful":5, "failed":0 }, "hits":{ "total":3, "max_score":1.0, "hits":[{ "_index":"es-node1", "_type":"pp", "_id":"1", "_score":1.0, "_source":{"age":"11"} },{ "_index":"es-node1", "_type":"pp", "_id":"2", "_score":1.0, "_source":{"age":"22"} },{ "_index":"es-node1", "_type":"pp", "_id":"3", "_score":1.0, "_source":{"age":"33"} }] } } [root@rtm2~]#
补充:logstash同步pgsql数据到Elasticsearch
一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置
二、配置步骤
1、input配置
input{ stdin{ } jdbc{ jdbc_connection_string=>"jdbc:postgresql://127.0.0.1:5432/world" jdbc_user=>"postgres" jdbc_password=>"zhang123" jdbc_driver_library=>"D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar" jdbc_driver_class=>"org.postgresql.Driver" jdbc_paging_enabled=>"true" jdbc_page_size=>"300000" use_column_value=>"true" tracking_column=>"id" statement_filepath=>"D:\logstash-6.4.0\bin\pgsql\jdbc.sql" schedule=>"*****" type=>"jdbc" jdbc_default_timezone=>"Asia/Shanghai" } }
2、filter配置
filter{ json{ source=>"message" remove_field=>["message"] } }
3、output配置,就是elasticsearch的基本配置
output{ elasticsearch{ hosts=>["localhost:9200"] index=>"test_out" template=>"D:\logstash-6.4.0\bin\pgsql\es-template.json" template_name=>"t-statistic-out-logstash" template_overwrite=>true document_type=>"out" document_id=>"%{id}" } stdout{ codec=>json_lines } }
以上就是整个logstash的jdbc.conf
4、es-template.json的配置
{ "template":"t-statistis-out-template", "order":1, "settings":{ "index":{ "refresh_interval":"5s" } }, "mappings":{ "_default_":{ "_all":{"enabled":false}, "dynamic_templates":[ { "message_field":{ "match":"message", "match_mapping_type":"string", "mapping":{"type":"string","index":"not_analyzed"} } },{ "string_fields":{ "match":"*", "match_mapping_type":"string", "mapping":{"type":"string","index":"not_analyzed"} } } ], "properties":{ "@timestamp":{ "type":"date" }, "@version":{ "type":"keyword" }, "id":{ "type":"keyword" }, "name":{ "type":"keyword" }, "pp":{ "type":"keyword" } } } }, "aliases":{} }
最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据
启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)
logstash.bat-f./pgsql/jdbc.conf
以上为个人经验,希望能给大家一个参考,也希望大家多多支持毛票票。如有错误或未考虑完全的地方,望不吝赐教。