dubbo+zipkin调用链监控(二)

*:first-child {
margin-top: 0 !important; }
body > *:last-child {
margin-bottom: 0 !important; }

a {
color: #4183C4; }
a.absent {
color: #cc0000; }
a.anchor {
display: block;
padding-left: 30px;
margin-left: -30px;
cursor: pointer;
position: absolute;
top: 0;
left: 0;
bottom: 0; }

h1, h2, h3, h4, h5, h6 {
margin: 20px 0 10px;
padding: 0;
font-weight: bold;
-webkit-font-smoothing: antialiased;
cursor: text;
position: relative; }

h1:hover a.anchor, h2:hover a.anchor, h3:hover a.anchor, h4:hover a.anchor, h5:hover a.anchor, h6:hover a.anchor {
background: url() no-repeat 10px center;
text-decoration: none; }

h1 tt, h1 code {
font-size: inherit; }

h2 tt, h2 code {
font-size: inherit; }

h3 tt, h3 code {
font-size: inherit; }

h4 tt, h4 code {
font-size: inherit; }

h5 tt, h5 code {
font-size: inherit; }

h6 tt, h6 code {
font-size: inherit; }

h1 {
font-size: 28px;
color: black; }

h2 {
font-size: 24px;
border-bottom: 1px solid #cccccc;
color: black; }

h3 {
font-size: 18px; }

h4 {
font-size: 16px; }

h5 {
font-size: 14px; }

h6 {
color: #777777;
font-size: 14px; }

p, blockquote, ul, ol, dl, li, table, pre {
margin: 15px 0; }

hr {
background: transparent url() repeat-x 0 0;
border: 0 none;
color: #cccccc;
height: 4px;
padding: 0;
}

body > h2:first-child {
margin-top: 0;
padding-top: 0; }
body > h1:first-child {
margin-top: 0;
padding-top: 0; }
body > h1:first-child + h2 {
margin-top: 0;
padding-top: 0; }
body > h3:first-child, body > h4:first-child, body > h5:first-child, body > h6:first-child {
margin-top: 0;
padding-top: 0; }

a:first-child h1, a:first-child h2, a:first-child h3, a:first-child h4, a:first-child h5, a:first-child h6 {
margin-top: 0;
padding-top: 0; }

h1 p, h2 p, h3 p, h4 p, h5 p, h6 p {
margin-top: 0; }

li p.first {
display: inline-block; }
li {
margin: 0; }
ul, ol {
padding-left: 30px; }

ul :first-child, ol :first-child {
margin-top: 0; }

dl {
padding: 0; }
dl dt {
font-size: 14px;
font-weight: bold;
font-style: italic;
padding: 0;
margin: 15px 0 5px; }
dl dt:first-child {
padding: 0; }
dl dt > :first-child {
margin-top: 0; }
dl dt > :last-child {
margin-bottom: 0; }
dl dd {
margin: 0 0 15px;
padding: 0 15px; }
dl dd > :first-child {
margin-top: 0; }
dl dd > :last-child {
margin-bottom: 0; }

blockquote {
border-left: 4px solid #dddddd;
padding: 0 15px;
color: #777777; }
blockquote > :first-child {
margin-top: 0; }
blockquote > :last-child {
margin-bottom: 0; }

img {
max-width: 100%; }

span.frame {
display: block;
overflow: hidden; }
span.frame > span {
border: 1px solid #dddddd;
display: block;
float: left;
overflow: hidden;
margin: 13px 0 0;
padding: 7px;
width: auto; }
span.frame span img {
display: block;
float: left; }
span.frame span span {
clear: both;
color: #333333;
display: block;
padding: 5px 0 0; }
span.align-center {
display: block;
overflow: hidden;
clear: both; }
span.align-center > span {
display: block;
overflow: hidden;
margin: 13px auto 0;
text-align: center; }
span.align-center span img {
margin: 0 auto;
text-align: center; }
span.align-right {
display: block;
overflow: hidden;
clear: both; }
span.align-right > span {
display: block;
overflow: hidden;
margin: 13px 0 0;
text-align: right; }
span.align-right span img {
margin: 0;
text-align: right; }
span.float-left {
display: block;
margin-right: 13px;
overflow: hidden;
float: left; }
span.float-left span {
margin: 13px 0 0; }
span.float-right {
display: block;
margin-left: 13px;
overflow: hidden;
float: right; }
span.float-right > span {
display: block;
overflow: hidden;
margin: 13px auto 0;
text-align: right; }

code, tt {
margin: 0 2px;
padding: 0 5px;
white-space: nowrap;
border: 1px solid #eaeaea;
background-color: #f8f8f8;
border-radius: 3px; }

pre code {
margin: 0;
padding: 0;
white-space: pre;
border: none;
background: transparent; }

.highlight pre {
background-color: #f8f8f8;
border: 1px solid #cccccc;
font-size: 13px;
line-height: 19px;
overflow: auto;
padding: 6px 10px;
border-radius: 3px; }

pre {
background-color: #f8f8f8;
border: 1px solid #cccccc;
font-size: 13px;
line-height: 19px;
overflow: auto;
padding: 6px 10px;
border-radius: 3px; }
pre code, pre tt {
background-color: transparent;
border: none; }

sup {
font-size: 0.83em;
vertical-align: super;
line-height: 0;
}

kbd {
display: inline-block;
padding: 3px 5px;
font-size: 11px;
line-height: 10px;
color: #555;
vertical-align: middle;
background-color: #fcfcfc;
border: solid 1px #ccc;
border-bottom-color: #bbb;
border-radius: 3px;
box-shadow: inset 0 -1px 0 #bbb
}

* {
-webkit-print-color-adjust: exact;
}
@media screen and (min-width: 914px) {
body {
margin:0 auto;
}
}
@media print {
table, pre {
page-break-inside: avoid;
}
pre {
word-wrap: break-word;
}
}
-->
code[class*="language-"],
pre[class*="language-"] {
background: #f5f2f0;
}

/* Inline code */
:not(pre) > code[class*="language-"] {
padding: .1em;
border-radius: .3em;
white-space: normal;
}

.token.comment,
.token.prolog,
.token.doctype,
.token.cdata {
color: slategray;
}

.token.punctuation {
color: #999;
}

.namespace {
opacity: .7;
}

.token.property,
.token.tag,
.token.boolean,
.token.number,
.token.constant,
.token.symbol,
.token.deleted {
color: #905;
}

.token.selector,
.token.attr-name,
.token.string,
.token.char,
.token.builtin,
.token.inserted {
color: #690;
}

.token.operator,
.token.entity,
.token.url,
.language-css .token.string,
.style .token.string {
color: #a67f59;
background: hsla(0, 0%, 100%, .5);
}

.token.atrule,
.token.attr-value,
.token.keyword {
color: #07a;
}

.token.function {
color: #DD4A68;
}

.token.regex,
.token.important,
.token.variable {
color: #e90;
}

.token.important,
.token.bold {
font-weight: bold;
}
.token.italic {
font-style: italic;
}

.token.entity {
cursor: help;
}
-->

去年的时候写过dubbo+zipkin调用链监控,最近看到zipkin2配合brave实现起来会比我之前的实现要简单很多,因为brave将很多交互的内容都封装起来了,不需要自己去写具体的实现,比如如何去构建span,如何去上报数据。

收集器抽象

由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象。

AbstractZipkinCollectorConfiguration

主要是针对下面两种收集方式的一些配置上的定义,最核心的是Sender接口的定义,http与kafka是两类完全不同的实现。

public abstract Sender getSender();

其次是协助性的构造函数,主要是配合构建收集器所需要的一些参数。

  • zipkinUrl

如果是http收集,那么对应的是zipkin api域名,如果是kafka,对应的是kafka集群的地址

  • topic

仅在收集方式为kafka是有效,http时传空值即可。

public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){
    this.zipkinUrl=zipkinUrl;
    this.serviceName=serviceName;
    this.topic=topic;
    this.tracing=this.tracing();
}

配置上报方式,这里统一采用异常上传,并且配置上报的超时时间。

protected AsyncReporter<Span> spanReporter() {
    return AsyncReporter
            .builder(getSender())
            .closeTimeout(500, TimeUnit.MILLISECONDS)
            .build(SpanBytesEncoder.JSON_V2);
}

下面这两方法,是配合应用构建span使用的。

注意那个sampler()方法,默认是什么也不做的意思,我们要想看到数据就需要配置成Sampler.ALWAYS_SAMPLE,这样才能真正将数据上报到zipkin服务器。

protected Tracing tracing() {
    this.tracing= Tracing
            .newBuilder()
            .localServiceName(this.serviceName)
            .sampler(Sampler.ALWAYS_SAMPLE)
            .spanReporter(spanReporter())
            .build();
    return this.tracing;
}

protected Tracing getTracing(){
    return this.tracing;
}

HttpZipkinCollectorConfiguration

主要是实现getSender方法,可以借用OkHttpSender这个对象来快速构建,api版本采用v2。

public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) {
        super(serviceName,zipkinUrl,null);
    }

    @Override
    public Sender getSender() {
        return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans");
    }
}

OkHttpSender这个类需要引用这个包

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

KafkaZipkinCollectorConfiguration

同样也是实现getSender方法

public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) {
        super(serviceName,zipkinUrl,topic);
    }

    @Override
    public Sender getSender() {

        return KafkaSender
                .newBuilder()
                .bootstrapServers(super.getZipkinUrl())
                .topic(super.getTopic())
                .encoding(Encoding.JSON)
                .build();
    }
}

KafkaSender这个类需要引用这个包:

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-kafka11</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

收集器工厂

由于上面创建了两个收集器配置类,使用时只能是其中之一,所以实际运行的实例需要根据配置来动态生成。ZipkinCollectorConfigurationFactory就是负责生成收集器实例的。

private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration;

@Autowired
public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){
    if(Objects.equal("kafka", traceConfig.getZipkinSendType())){
        zipkinCollectorConfiguration=new KafkaZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl(),
                traceConfig.getZipkinKafkaTopic());
    }
    else {
        zipkinCollectorConfiguration = new HttpZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl());
    }
}

通过构建函数将我们的配置类TraceConfig注入进来,然后根据发送方式来构建实例。另外提供一个辅助函数:

public Tracing getTracing(){
    return this.zipkinCollectorConfiguration.getTracing();
}

过滤器

在dubbo的过滤器中实现数据上传的功能逻辑相对简单,一般都在invoke方法执行前记录数据,然后方法执行完成后再次记录数据。这个逻辑不变,有变化的是数据上报的实现,上一个版本是通过发http请求实现需要编码,现在可以直接借用brave所提供的span来帮助我们完成,有两重要的方法:

  • finish

方法源码如下,在完成的时候会填写上完成的时间并上报数据,这一般应用于同步调用场景。

public void finish(TraceContext context, long finishTimestamp) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish(Long.valueOf(finishTimestamp));
            this.reporter.report(span.toSpan());
        }
    }
}
  • flush 与上面finish方法的不同点在于,在报数据时没有完成时间,这应该是适用于一些异步调用但不关心结果的场景,比如dubbo所提供的oneway方式调用。
public void flush(TraceContext context) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish((Long)null);
            this.reporter.report(span.toSpan());
        }
    }
}

消费者

做为消费方,有一个核心功能就是将traceId以及spanId传递到服务提供方,这里还是通过dubbo提供的附加参数方式实现。

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if(!RpcTraceContext.getTraceConfig().isEnabled()){
        return invoker.invoke(invocation);
    }

    ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
            SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
    Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();

    if(null==RpcTraceContext.getTraceId()){
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(IdUtils.get());
        RpcTraceContext.setParentId(null);
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    else {
        RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    TraceContext traceContext= TraceContext.newBuilder()
            .traceId(RpcTraceContext.getTraceId())
            .parentId(RpcTraceContext.getParentId())
            .spanId(RpcTraceContext.getSpanId())
            .sampled(true)
            .build();

    Span span=tracer.toSpan(traceContext).start();

    invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
    invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));

    Result result = invoker.invoke(invocation);

    span.finish();

    return result;
}

提供者

@Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if(!RpcTraceContext.getTraceConfig().isEnabled()){
            return invoker.invoke(invocation);
        }

        Map<String, String> attaches = invocation.getAttachments();
        if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){
            return invoker.invoke(invocation);
        }

        Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY));
        Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY));

        attaches.remove(RpcTraceContext.TRACE_ID_KEY);
        attaches.remove(RpcTraceContext.SPAN_ID_KEY);
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(traceId);
        RpcTraceContext.setParentId(spanId);
        RpcTraceContext.setSpanId(IdUtils.get());

        ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
                SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
        Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();

        TraceContext traceContext= TraceContext.newBuilder()
                .traceId(RpcTraceContext.getTraceId())
                .parentId(RpcTraceContext.getParentId())
                .spanId(RpcTraceContext.getSpanId())
                .sampled(true)
                .build();
        Span span = tracer.toSpan(traceContext).start();

        Result result = invoker.invoke(invocation);

        span.finish();

        return result;

    }

异常流程

上面无论是消费者的过滤器还是服务提供者的过滤器,均未考虑服务在调用invoker.invoke时出错的场景,如果出错,后面的span.finish方法将不会按预期执行,也就记录不了信息。所以需要针对此问题做优化:可以在finally块中执行finish方法。

try {
    result = invoker.invoke(invocation);
}
finally {
    span.finish();
}

消费者在调用服务时,异步调用问题

上面过滤器中调用span.finish都是基于同步模式,而由于dubbo除了同步调用外还提供了两种调用方式

  • 异步调用 通过callback机制的异步
  • oneway

只发起请求并不等待结果的异步调用,无callback一说

针对上面两类异步再加上同步调用,我们要想准确记录服务真正的时间,需要在消费方的过滤器中做如下处理:

创建一个用于回调的处理类,它的主要目的是为了在回调成功时记录时间,这里无论是成功还是失败。

private class AsyncSpanCallback implements ResponseCallback{

    private Span span;

    public AsyncSpanCallback(Span span){
        this.span=span;
    }

    @Override
    public void done(Object o) {
        span.finish();
    }

    @Override
    public void caught(Throwable throwable) {
        span.finish();
    }
}

再在调用invoke方法时,如果是oneway方式,则调用flush方法结果,如果是同步则直接调用finish方法,如果是异步则在回调时调用finish方法。


Result result = null;
boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
    result = invoker.invoke(invocation);
}
finally {
    if(isOneway) {
        span.flush();
    }
    else if(!isAsync) {
        span.finish();
    }
}

待完善问题

过滤器中生成span的方式应该有更好的方法,还没有对brave做过多研究,后续想办法再优化下。另外我测试的场景是consumer调用provider,provider内部再调用provider2,我测试时发现第三步调用传递的parentId好像有点小问题,后续需要再确认下。

代码下载

https://github.com/jiangmin168168/jim-framework

原文地址:https://www.cnblogs.com/ASPNET2008/p/9757980.html

时间: 2024-07-31 02:50:01

dubbo+zipkin调用链监控(二)的相关文章

dubbo+zipkin调用链监控

图片描述(最多50字)收集器抽象 由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象. AbstractZipkinCollectorConfiguration 主要是针对下面两种收集方式的一些配置上的定义,最核心的是Sender接口的定义,http与kafka是两类完全不同的实现. public abstract Sender getSender();其次是协助性的构造函数,主要是配合构建收集器所需要的一些参数. zipkinUrl如果是http收集,那么对应

调用链监控

一.背景 以前都是单体应用,都在一个系统内完成.而现在都是微服务,一个请求进来,需要调用多个服务才能完成.出了问题,我们很难定位到底在哪个环节出了问题. 二.作用 1.快速定位问题.通过调用链监控系统,我们能很快定位到哪个服务出了问题. 2.项目拓扑图.当服务越来越复杂时,我们都无法准确知道服务之间都依赖关系.通过调用链监控系统,我们能清晰的生成项目的网络拓扑图. 3.优化系统.通过调用链监控系统,我们可以随时监控哪些请求慢了,在哪个环节慢了,系统的瓶颈等等,从而作出相应的优化. 三.原理 我们

.Net Core 商城微服务项目系列(十):使用SkyWalking构建调用链监控(2019-02-13 13:25)

SkyWalking的安装和简单使用已经在前面一篇介绍过了,本篇我们将在商城中添加SkyWalking构建调用链监控. 顺带一下怎么把ES设置为Windows服务,cd到ES的bin文件夹,运行elasticsearch-service.bat install. 首先我们需要在每个服务里通过NuGet引用SkyAPM.Agent.AspNetCore,完成之后我们添加配置文件skyapm.json,可以通过SkyWalking的脚本命令自动生成,也可以手动新建,这里贴一下: { "SkyWalk

spring cloud 学习(8) - sleuth &amp; zipkin 调用链跟踪

业务复杂的微服务架构中,往往服务之间的调用关系比较难梳理,一次http请求中,可能涉及到多个服务的调用(eg: service A -> service B -> service C...),如果想分析各服务间的调用关系,以及各服务的响应耗时,找出有性能瓶颈的服务,这时zipkin就派上用场,它是Twitter公司开源的一个tracing系统,官网地址为: http://zipkin.io/ , spring cloud可以跟它无疑集成. 使用步骤: 一.微服务方 1.1 添加依赖jar包 c

springboot 项目添加jaeger调用链监控

1.添加maven依赖<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-spring-cloud-starter</artifactId> <version>0.1.8</version> </dependency> <dependency> <groupId>com.uber.j

java微服务分布式调用链APM监控

几种分布式调用链监控组件的比较微服务架构下,服务按照不同的维度进行拆分,一次请求请求往往需要涉及到多个服务.互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发.可能使用不同的编程语言来实现.有可能布在了几千台服务器,横跨多个不同的数据中心.因此,就需要一些可以帮助理解系统行为.用于分析性能问题的工具,以便发生故障的时候,能够快速定位和解决问题. 分布式调用链监控组件在这样的环境下产生了.最出名的是谷歌公开的论文提到的 Dapper .开发Dapper是为了收集更多的复杂分

从Consumer分析Dubbo调用链

入手 继上一篇不成熟的源码分析经历之后,为了搞清楚Consumer是如何与Provider通信的,于是又一言不合翻看起了源码.好,进入正题,依旧从RegistryDirectory这个核心类入手: // 这里的入参urls是所有可用的provider的url private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvo

跟着小程学微服务-自己动手扩展分布式调用链

一.说在前面 微服务是当下最火的词语,现在很多公司都在推广微服务,当服务越来越多的时候,我们是否会纠结以下几个问题: 面对一笔超时的订单,究竟是哪一步处理时间超长呢? 数据由于并发莫名篡改,到底都谁有重大嫌疑呢? 处理遗漏了一笔订单,曾经是哪个环节出错把它落下了? 系统莫名的报错,究竟是哪一个服务报的错误? 每个服务那么多实例服务器,如何快速定位到是哪一个实例服务器报错的呢? 现在很多系统都要求可用性达到99.9%以上,那么我们除了增加系统健壮性减少故障的同时,我们又如何在真正发生故障的时候,快

服务化改造实践(三) | Dubbo + Zipkin

随着业务的发展,应用的规模不断的扩大,传统的应用架构无法满足诉求,服务化架构改造势在必行,以 Dubbo 为代表的分布式服务框架成为了服务化改造架构中的基石.随着微服务理念逐渐被大众接受,应用进一步向更细粒度拆分,并且,不同的应用由不同的开发团队独立负责,整个分布式系统变得十分复杂.没有人能够清晰及时的知道当前系统整体的依赖关系.当出现问题时,也无法及时知道具体是链路上的哪个环节出了问题. 在这个背景下,Google 发表了 Dapper 的论文,描述了如何通过一个分布式追踪系统解决上述问题.基