详解SpringBoot+Dubbo集成ELK实战
前言
一直以来,日志始终伴随着我们的开发和运维过程。当系统出现了Bug,往往就是通过Xshell连接到服务器,定位到日志文件,一点点排查问题来源。
随着互联网的快速发展,我们的系统越来越庞大。依赖肉眼分析日志文件来排查问题的方式渐渐凸显出一些问题:
- 分布式集群环境下,服务器数量可能达到成百上千,如何准确定位?
- 微服务架构中,如何根据异常信息,定位其他各服务的上下文信息?
- 随着日志文件的不断增大,可能面临在服务器上不能直接打开的尴尬。
- 文本搜索太慢、无法多维度查询等
面临这些问题,我们需要集中化的日志管理,将所有服务器节点上的日志统一收集,管理,访问。
而今天,我们的手段的就是使用ElasticStack来解决它们。
一、什么是ElasticStack?
或许有人对Elastic感觉有一点点陌生,它的前生正是ELK,ElasticStack是ELKStack的更新换代产品。
ElasticStack分别对应了四个开源项目。
Beats
Beats平台集合了多种单一用途数据采集器,它负责采集各种类型的数据。比如文件、系统监控、Windows事件日志等。
Logstash
Logstash是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据。没错,它既可以采集数据,也可以转换数据。采集到了非结构化的数据,通过过滤器把他格式化成友好的类型。
Elasticsearch
Elasticsearch是一个基于JSON的分布式搜索和分析引擎。作为ElasticStack的核心,它负责集中存储数据。我们上面利用Beats采集数据,通过Logstash转换之后,就可以存储到Elasticsearch。
Kibana
最后,就可以通过Kibana,对自己的Elasticsearch中的数据进行可视化。
本文的实例是通过SpringBoot+Dubbo的微服务架构,结合ElasticStack来整合日志的。架构如下:
注意,阅读本文需要了解ELK组件的基本概念和安装。本文不涉及安装和基本配置过程,重点是如何与项目集成,达成上面的需求。
二、采集、转换
1、FileBeat
在SpringBoot项目中,我们首先配置Logback,确定日志文件的位置。
${user.dir}/logs/order.log ${user.dir}/logs/order.%d{yyyy-MM-dd}.log 7
Filebeat提供了一种轻量型方法,用于转发和汇总日志与文件。
所以,我们需要告诉FileBeat日志文件的位置、以及向何处转发内容。
如下所示,我们配置了FileBeat读取usr/local/logs路径下的所有日志文件。
-type:log #Changetotruetoenablethisinputconfiguration. enabled:true #Pathsthatshouldbecrawledandfetched.Globbasedpaths. paths: -/usr/local/logs/*.log
然后,告诉FileBeat将采集到的数据转发到Logstash。
#-----------------------------Logstashoutput-------------------------------- output.logstash: #TheLogstashhosts hosts:["192.168.159.128:5044"]
另外,FileBeat采集文件数据时,是一行一行进行读取的。但是FileBeat收集的文件可能包含跨越多行文本的消息。
例如,在开源框架中有意的换行:
2019-10-2920:36:04.427INFOorg.apache.dubbo.spring.boot.context.event.WelcomeLogoApplicationListener ::DubboSpringBoot(v2.7.1):https://github.com/apache/incubator-dubbo-spring-boot-project ::Dubbo(v2.7.1):https://github.com/apache/incubator-dubbo ::Discussgroup:dev@dubbo.apache.org
或者Java异常堆栈信息:
2019-10-2921:30:59.849INFOcom.viewscenes.order.controller.OrderControllerhttp-nio-8011-exec-2开始获取数组内容... java.lang.IndexOutOfBoundsException:Index:3,Size:0 atjava.util.ArrayList.rangeCheck(ArrayList.java:657) atjava.util.ArrayList.get(ArrayList.java:433)
所以,我们还需要配置multiline,以指定哪些行是单个事件的一部分。
multiline.pattern指定要匹配的正则表达式模式。
multiline.negate定义是否为否定模式。
multiline.match如何将匹配的行组合到事件中,设置为after或before。
听起来可能比较饶口,我们来看一组配置:
#TheregexpPatternthathastobematched.Theexamplepatternmatchesalllinesstartingwith[ multiline.pattern:'^\<|^[[:space:]]|^[[:space:]]+(at|\.{3})\b|^java.' #Definesifthepatternsetunderpatternshouldbenegatedornot.Defaultisfalse. multiline.negate:false #Matchcanbesetto"after"or"before".Itisusedtodefineiflinesshouldbeappendtoapattern #thatwas(not)matchedbeforeorafteroraslongasapatternisnotmatchedbasedonnegate. #Note:AfteristheequivalenttopreviousandbeforeistheequivalenttotonextinLogstash multiline.match:after
上面配置文件说的是,如果文本内容是以<或空格或空格+at+包路径或java.开头,那么就将此行内容当做上一行的后续,而不是当做新的行。
就上面的Java异常堆栈信息就符合这个正则。所以,FileBeat会将
java.lang.IndexOutOfBoundsException:Index:3,Size:0 atjava.util.ArrayList.rangeCheck(ArrayList.java:657) atjava.util.ArrayList.get(ArrayList.java:433)
这些内容当做开始获取数组内容...的一部分。
2、Logstash
在Logback中,我们打印日志的时候,一般会带上日志等级、执行类路径、线程名称等信息。
有一个重要的信息是,我们在ELK查看日志的时候,是否希望将以上条件单独拿出来做统计或者精确查询?
如果是,那么就需要用到Logstash过滤器,它能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。
那么,这时候就要先看我们在项目中,配置了日志以何种格式输出。
比如,我们最熟悉的JSON格式。先来看Logback配置:
{"log_time":"%d{yyyy-MM-ddHH:mm:ss.SSS}","level":"%level","logger":"%logger","thread":"%thread","msg":"%m"}
没错,Logstash过滤器中正好也有一个JSON解析插件。我们可以这样配置它:
input{ stdin{} } filter{ json{ source=>"message" } } output{ stdout{} }
这么一段配置就是说利用JSON解析器格式化数据。我们输入这样一行内容:
{ "log_time":"2019-10-2921:45:12.821", "level":"INFO", "logger":"com.viewscenes.order.controller.OrderController", "thread":"http-nio-8011-exec-1", "msg":"接收到订单数据." }
Logstash将会返回格式化后的内容:
但是JSON解析器并不太适用,因为我们打印的日志中msg字段本身可能就是JSON数据格式。
比如:
{ "log_time":"2019-10-2921:57:38.008", "level":"INFO", "logger":"com.viewscenes.order.controller.OrderController", "thread":"http-nio-8011-exec-1", "msg":"接收到订单数据.{"amount":1000.0,"commodityCode":"MK66923","count":5,"id":1,"orderNo":"1001"}" }
这时候JSON解析器就会报错。那怎么办呢?
Logstash拥有丰富的过滤器插件库,或者你对正则有信心,也可以写表达式去匹配。
正如我们在Logback中配置的那样,我们的日志内容格式是已经确定的,不管是JSON格式还是其他格式。
所以,笔者今天推荐另外一种:Dissect。
Dissect过滤器是一种拆分操作。与将一个定界符应用于整个字符串的常规拆分操作不同,此操作将一组定界符应用于字符串值。Dissect不使用正则表达式,并且速度非常快。
比如,笔者在这里以|当做定界符。
input{ stdin{} } filter{ dissect{ mapping=>{ "message"=>"%{log_time}|%{level}|%{logger}|%{thread}|%{msg}" } } } output{ stdout{} }
然后在Logback中这样去配置日志格式:
%d{yyyy-MM-ddHH:mm:ss.SSS}|%level|%logger|%thread|%m%n
最后同样可以得到正确的结果:
到此,关于数据采集和格式转换都已经完成。当然,上面的配置都是控制台输入、输出。
我们来看一个正儿八经的配置,它从FileBeat中采集数据,经由dissect转换格式,并将数据输出到elasticsearch。
input{ beats{ port=>5044 } } filter{ dissect{ mapping=>{ "message"=>"%{log_time}|%{level}|%{logger}|%{thread}|%{msg}" } } date{ match=>["log_time","yyyy-MM-ddHH:mm:ss.SSS"] target=>"@timestamp" } } output{ elasticsearch{ hosts=>["192.168.216.128:9200"] index=>"logs-%{+YYYY.MM.dd}" } }
不出意外的话,打开浏览器我们在Kibana中就可以对日志进行查看。比如我们查看日志等级为DEBUG的条目:
试想一下,我们在前端发送了一个订单请求。如果后端系统是微服务架构,可能会经由库存系统、优惠券系统、账户系统、订单系统等多个服务。如何追踪这一个请求的调用链路呢?
1、MDC机制
首先,我们要了解一下MDC机制。
MDC-MappedDiagnosticContexts,实质上是由日志记录框架维护的映射。其中应用程序代码提供键值对,然后可以由日志记录框架将其插入到日志消息中。
简而言之,我们使用了MDC.PUT(key,value),那么Logback就可以在日志中自动打印这个value。
在SpringBoot中,我们就可以先写一个HandlerInterceptor,拦截所有的请求,来生成一个traceId。
@Component publicclassTraceIdInterceptorimplementsHandlerInterceptor{ Snowflakesnowflake=newSnowflake(1,0); @Override publicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler){ MDC.put("traceId",snowflake.nextIdStr()); returntrue; } @Override publicvoidpostHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler,ModelAndViewmodelAndView){ MDC.remove("traceId"); } @Override publicvoidafterCompletion(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler,Exceptionex){} }
然后在Logback中配置一下,让这个traceId出现在日志消息中。
%d{yyyy-MM-ddHH:mm:ss.SSS}|%level|%logger|%thread|%X{traceId}|%m%n
2、DubboFilter
另外还有一个问题,就是在微服务架构下我们怎么让这个traceId来回透传。
熟悉Dubbo的朋友可能就会想到隐式参数。是的,我们就是利用它来完成traceId的传递。
@Activate(group={Constants.PROVIDER,Constants.CONSUMER},order=99) publicclassTraceIdFilterimplementsFilter{ @Override publicResultinvoke(Invoker>invoker,Invocationinvocation)throwsRpcException{ Stringtid=MDC.get("traceId"); StringrpcTid=RpcContext.getContext().getAttachment("traceId"); booleanbind=false; if(tid!=null){ RpcContext.getContext().setAttachment("traceId",tid); }else{ if(rpcTid!=null){ MDC.put("traceId",rpcTid); bind=true; } } try{ returninvoker.invoke(invocation); }finally{ if(bind){ MDC.remove("traceId"); } } } }
这样写完,我们就可以愉快的查看某一次请求所有的日志信息啦。比如下面的请求,订单服务和库存服务两个系统的日志。
四、总结
本文介绍了ElasticStack的基本概念。并通过一个SpringBoot+Dubbo项目,演示如何做到日志的集中化管理、追踪。
事实上,Kibana具有更多的分析和统计功能。所以它的作用不仅限于记录日志。
另外ElasticStack性能也很不错。笔者在一台虚拟机上,记录了100+万条用户数据,index大小为1.1G,查询和统计速度也不逊色。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。