nginx日志导入elasticsearch的方法示例
将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分,以及将日志写入elasticsearch后的索引的创建。
1、配置nginx日志格式
log_formatmain'$remote_addr$http_x_forwarded_for[$time_local]$server_name$request' '$status$body_bytes_sent$http_referer' '"$http_user_agent"' '"$connection"' '"$http_cookie"' '$request_time' '$upstream_response_time';
2、安装配置filebeat,启用nginxmodule
tar-zxvffilebeat-6.2.4-linux-x86_64.tar.gz-C/usr/local cd/usr/local;ln-sfilebeat-6.2.4-linux-x86_64filebeat cd/usr/local/filebeat
启用nginx模块
./filebeatmodulesenablenginx
查看模块
./filebeatmoduleslist
创建配置文件
vim/usr/local/filebeat/blog_module_logstash.yml filebeat.modules: -module:nginx access: enabled:true var.paths:["/home/weblog/blog.cnfol.com_access.log"] #error: #enabled:true #var.paths:["/home/weblogerr/blog.cnfol.com_error.log"] output.logstash: hosts:["192.168.15.91:5044"]
启动filebeat
./filebeat-cblog_module_logstash.yml-e
3、配置logstash
tar-zxvflogstash-6.2.4.tar.gz/usr/local cd/usr/local;ln-slogstash-6.2.4logstash 创建一个nginx日志的pipline文件 cd/usr/local/logstash
logstash内置的模板目录
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
编辑grok-patterns添加一个支持多ip的正则
FORWORD(?:%{IPV4}[,]?[]?)+|%{WORD}
官方grok
http://grokdebug.herokuapp.com/patterns#
创建logstashpipline配置文件
#input{
#stdin{}
#}
#从filebeat接受数据
input{
beats{
port=>5044
host=>"0.0.0.0"
}
}
filter{
#添加一个调试的开关
mutate{add_field=>{"[@metadata][debug]"=>true}}
grok{
#过滤nginx日志
#match=>{"message"=>"%{NGINXACCESS_TEST2}"}
#match=>{"message"=>'%{IPORHOST:clientip}#(?[^\#]*)#\[%{HTTPDATE:[@metadata][webtime]}\]#%{NOTSPACE:hostname}#%{WORD:verb}%{URIPATHPARAM:request}HTTP/%{NUMBER:httpversion}#%{NUMBER:response}#(?:%{NUMBER:bytes}|-)#(?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-)#(?:"(?[^#]*)")#(?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-)#(?:"(?[^#]*)")#%{NUMBER:request_time:float}#(?:%{NUMBER:upstream_response_time:float}|-)'}
#match=>{"message"=>'(?:%{IPORHOST:clientip}|-)(?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-)\[%{HTTPDATE:[@metadata][webtime]}\](?:%{HOSTNAME:hostname}|-)%{WORD:method}%{URIPATHPARAM:request}HTTP/%{NUMBER:httpversion}%{NUMBER:response}(?:%{NUMBER:bytes}|-)(?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-)%{QS:agent}(?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-)(?:"(?[^#]*)")%{NUMBER:request_time:float}(?:%{NUMBER:upstream_response_time:float}|-)'}
match=>{"message"=>'(?:%{IPORHOST:clientip}|-)%{FORWORD:http_x_forwarded_for}\[%{HTTPDATE:[@metadata][webtime]}\](?:%{HOSTNAME:hostname}|-)%{WORD:method}%{URIPATHPARAM:request}HTTP/%{NUMBER:httpversion}%{NUMBER:response}(?:%{NUMBER:bytes}|-)(?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-)%{QS:agent}(?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-)%{QS:cookie}%{NUMBER:request_time:float}(?:%{NUMBER:upstream_response_time:float}|-)'}
}
#将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
ruby{
#code=>"event.set('@read_timestamp',event.get('@timestamp'))"
#将时区改为东8区
code=>"event.set('@read_timestamp',event.get('@timestamp').time.localtime+8*60*60)"
}
#将nginx的日志记录时间格式化
#格式化时间20/May/2015:21:05:56+0000
date{
locale=>"en"
match=>["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ssZ"]
}
#将bytes字段由字符串转换为数字
mutate{
convert=>{"bytes"=>"integer"}
}
#将cookie字段解析成一个json
#mutate{
#gsub=>["cookies",'\;',',']
#}
#如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
if[http_x_forwarded_for]=~","{
ruby{
code=>'event.set("http_x_forwarded_for",event.get("http_x_forwarded_for").split(",")[0])'
}
}
#解析ip,获得ip的地理位置
geoip{
source=>"http_x_forwarded_for"
##只获取ip的经纬度、国家、城市、时区
fields=>["location","country_name","city_name","region_name"]
}
#将agent字段解析,获得浏览器、系统版本等具体信息
useragent{
source=>"agent"
target=>"useragent"
}
#指定要删除的数据
#mutate{remove_field=>["message"]}
#根据日志名设置索引名的前缀
ruby{
code=>'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
}
#将@timestamp格式化为2019.04.23
ruby{
code=>'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
}
#设置输出的默认索引名
mutate{
add_field=>{
#"[@metadata][index]"=>"%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
"[@metadata][index]"=>"%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
}
}
#将cookies字段解析成json
#mutate{
#gsub=>[
#"cookies",";",",",
#"cookies","=",":"
#]
##split=>{"cookies"=>","}
#}
#json_encode{
#source=>"cookies"
#target=>"cookies_json"
#}
#mutate{
#gsub=>[
#"cookies_json",',','","',
#"cookies_json",':','":"'
#]
#}
#json{
#source=>"cookies_json"
#target=>"cookies2"
#}
#如果grok解析存在错误,将错误独立写入一个索引
if"_grokparsefailure"in[tags]{
#if"_dateparsefailure"in[tags]{
mutate{
replace=>{
#"[@metadata][index]"=>"%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
"[@metadata][index]"=>"%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
}
}
#如果不存在错误就删除message
}else{
mutate{remove_field=>["message"]}
}
}
output{
if[@metadata][debug]{
#输出到rubydebuyg并输出metadata
stdout{codec=>rubydebug{metadata=>true}}
}else{
#将输出内容转换成"."
stdout{codec=>dots}
#将输出到指定的es
elasticsearch{
hosts=>["192.168.15.160:9200"]
index=>"%{[@metadata][index]}"
document_type=>"doc"
}
}
}
启动logstash
nohupbin/logstash-ftest_pipline2.conf&
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。