nginx日志导入elasticsearch的方法示例
将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分,以及将日志写入elasticsearch后的索引的创建。
1、配置nginx日志格式
log_formatmain'$remote_addr$http_x_forwarded_for[$time_local]$server_name$request' '$status$body_bytes_sent$http_referer' '"$http_user_agent"' '"$connection"' '"$http_cookie"' '$request_time' '$upstream_response_time';
2、安装配置filebeat,启用nginxmodule
tar-zxvffilebeat-6.2.4-linux-x86_64.tar.gz-C/usr/local cd/usr/local;ln-sfilebeat-6.2.4-linux-x86_64filebeat cd/usr/local/filebeat
启用nginx模块
./filebeatmodulesenablenginx
查看模块
./filebeatmoduleslist
创建配置文件
vim/usr/local/filebeat/blog_module_logstash.yml filebeat.modules: -module:nginx access: enabled:true var.paths:["/home/weblog/blog.cnfol.com_access.log"] #error: #enabled:true #var.paths:["/home/weblogerr/blog.cnfol.com_error.log"] output.logstash: hosts:["192.168.15.91:5044"]
启动filebeat
./filebeat-cblog_module_logstash.yml-e
3、配置logstash
tar-zxvflogstash-6.2.4.tar.gz/usr/local cd/usr/local;ln-slogstash-6.2.4logstash 创建一个nginx日志的pipline文件 cd/usr/local/logstash
logstash内置的模板目录
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
编辑grok-patterns添加一个支持多ip的正则
FORWORD(?:%{IPV4}[,]?[]?)+|%{WORD}
官方grok
http://grokdebug.herokuapp.com/patterns#
创建logstashpipline配置文件
#input{ #stdin{} #} #从filebeat接受数据 input{ beats{ port=>5044 host=>"0.0.0.0" } } filter{ #添加一个调试的开关 mutate{add_field=>{"[@metadata][debug]"=>true}} grok{ #过滤nginx日志 #match=>{"message"=>"%{NGINXACCESS_TEST2}"} #match=>{"message"=>'%{IPORHOST:clientip}#(?[^\#]*)#\[%{HTTPDATE:[@metadata][webtime]}\]#%{NOTSPACE:hostname}#%{WORD:verb}%{URIPATHPARAM:request}HTTP/%{NUMBER:httpversion}#%{NUMBER:response}#(?:%{NUMBER:bytes}|-)#(?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-)#(?:"(? [^#]*)")#(?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-)#(?:"(? [^#]*)")#%{NUMBER:request_time:float}#(?:%{NUMBER:upstream_response_time:float}|-)'} #match=>{"message"=>'(?:%{IPORHOST:clientip}|-)(?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-)\[%{HTTPDATE:[@metadata][webtime]}\](?:%{HOSTNAME:hostname}|-)%{WORD:method}%{URIPATHPARAM:request}HTTP/%{NUMBER:httpversion}%{NUMBER:response}(?:%{NUMBER:bytes}|-)(?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-)%{QS:agent}(?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-)(?:"(? [^#]*)")%{NUMBER:request_time:float}(?:%{NUMBER:upstream_response_time:float}|-)'} match=>{"message"=>'(?:%{IPORHOST:clientip}|-)%{FORWORD:http_x_forwarded_for}\[%{HTTPDATE:[@metadata][webtime]}\](?:%{HOSTNAME:hostname}|-)%{WORD:method}%{URIPATHPARAM:request}HTTP/%{NUMBER:httpversion}%{NUMBER:response}(?:%{NUMBER:bytes}|-)(?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-)%{QS:agent}(?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-)%{QS:cookie}%{NUMBER:request_time:float}(?:%{NUMBER:upstream_response_time:float}|-)'} } #将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp ruby{ #code=>"event.set('@read_timestamp',event.get('@timestamp'))" #将时区改为东8区 code=>"event.set('@read_timestamp',event.get('@timestamp').time.localtime+8*60*60)" } #将nginx的日志记录时间格式化 #格式化时间20/May/2015:21:05:56+0000 date{ locale=>"en" match=>["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ssZ"] } #将bytes字段由字符串转换为数字 mutate{ convert=>{"bytes"=>"integer"} } #将cookie字段解析成一个json #mutate{ #gsub=>["cookies",'\;',','] #} #如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip if[http_x_forwarded_for]=~","{ ruby{ code=>'event.set("http_x_forwarded_for",event.get("http_x_forwarded_for").split(",")[0])' } } #解析ip,获得ip的地理位置 geoip{ source=>"http_x_forwarded_for" ##只获取ip的经纬度、国家、城市、时区 fields=>["location","country_name","city_name","region_name"] } #将agent字段解析,获得浏览器、系统版本等具体信息 useragent{ source=>"agent" target=>"useragent" } #指定要删除的数据 #mutate{remove_field=>["message"]} #根据日志名设置索引名的前缀 ruby{ code=>'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])' } #将@timestamp格式化为2019.04.23 ruby{ code=>'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))' } #设置输出的默认索引名 mutate{ add_field=>{ #"[@metadata][index]"=>"%{@[metadata][index_pre]}_%{+YYYY.MM.dd}" "[@metadata][index]"=>"%{@[metadata][index_pre]}_%{@[metadata][index_day]}" } } #将cookies字段解析成json #mutate{ #gsub=>[ #"cookies",";",",", #"cookies","=",":" #] ##split=>{"cookies"=>","} #} #json_encode{ #source=>"cookies" #target=>"cookies_json" #} #mutate{ #gsub=>[ #"cookies_json",',','","', #"cookies_json",':','":"' #] #} #json{ #source=>"cookies_json" #target=>"cookies2" #} #如果grok解析存在错误,将错误独立写入一个索引 if"_grokparsefailure"in[tags]{ #if"_dateparsefailure"in[tags]{ mutate{ replace=>{ #"[@metadata][index]"=>"%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}" "[@metadata][index]"=>"%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}" } } #如果不存在错误就删除message }else{ mutate{remove_field=>["message"]} } } output{ if[@metadata][debug]{ #输出到rubydebuyg并输出metadata stdout{codec=>rubydebug{metadata=>true}} }else{ #将输出内容转换成"." stdout{codec=>dots} #将输出到指定的es elasticsearch{ hosts=>["192.168.15.160:9200"] index=>"%{[@metadata][index]}" document_type=>"doc" } } }
启动logstash
nohupbin/logstash-ftest_pipline2.conf&
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。