跟着小程学微服务-自己动手扩展分布式调用链

一、说在前面

微服务是当下最火的词语,现在很多公司都在推广微服务,当服务越来越多的时候,我们是否会纠结以下几个问题:

  • 面对一笔超时的订单,究竟是哪一步处理时间超长呢?
  • 数据由于并发莫名篡改,到底都谁有重大嫌疑呢?
  • 处理遗漏了一笔订单,曾经是哪个环节出错把它落下了?
  • 系统莫名的报错,究竟是哪一个服务报的错误?
  • 每个服务那么多实例服务器,如何快速定位到是哪一个实例服务器报错的呢?

现在很多系统都要求可用性达到99.9%以上,那么我们除了增加系统健壮性减少故障的同时,我们又如何在真正发生故障的时候,快速定位和解决问题,也将是我们的重中之重。

在做微服务框架选择的时候,Spring Cloud无疑是当下最火的,但是因为Spring Cloud是近二年的后起新秀,以及在使用方式上面的差别,目前在很多中小企业还是以dubbo为主,不过遗憾的是,dubbo从官方来讲已经不维护了,很多公司都是自己再去维护,那么今天我就来给大家介绍一下,我们是如何通过修改dubbo源码实现了分布式调用链的第一阶段:调用链日志的打印。

二、什么是分布式调用链

1、什么是调用链

基于Google Dapper论文,用户每次请求都会生成一个全局ID(traceId),通过它将不同系统的“孤立”的日志串在一起,重组成调用链。

2、调用链的调用过程

  1. 当用户发起一个请求时,首先到达前端A服务,然后分别对B服务和C服务进行RPC调用。
  2. B服务处理完给A做出响应,但是C服务还需要和后端的D服务和E服务交互之后再返还给A服务,最后由A服务来响应用户的请求。

3、对整个调用过程的追踪

  1. 请求到来生成一个全局TraceID,通过TraceID可以串联起整个调用链,一个TraceID代表一次请求。
  2. 除了TraceID外,还需要SpanID用于记录调用父子关系。每个服务会记录下Parent id和Span id,通过他们可以组织一次完整调用链的父子关系。
  3. 一个没有Parent id的span成为root span,可以看成调用链入口。
  4. 所有这些ID可用全局唯一的64位整数表示;
  5. 整个调用过程中每个请求都要透传TraceID和SpanID。
  6. 每个服务将该次请求附带的TraceID和附带的SpanID作为Parent id记录下,并且将自己生成的SpanID也记录下。
  7. 要查看某次完整的调用则只要根据TraceID查出所有调用记录,然后通过Parent id和Span id组织起整个调用父子关系。

最终的TraceId和SpanId的调用关系图如下所示:

三、基于Dubbo的实现

1、Dubbo的调用过程

在我们分析源码的时候,有一行代码是:

Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

这行代码实际上是利用SPI机制,动态加载指定的Protocol注入到ProtocolFilterWrapper中,再通过Wrapper访问到可执行的Invoker对象,Dubbo默认使用的是DubboProtocol最终通过netty的方式进行通信,具体调用过程请看下图:

可以看到基本的流程是:

InvokerInvocationHandler ->ClusterInvoker ->LoadBalance -> ProtocolFilterWrapper -> Protocol -> DubboInvoker

而在调用链的实现过程中技术难点主要是有二个:

  • 在哪里暂存调用链
  • 调用链信息如何传递

2、Dubbo协议下的调用链传递过程

那么在默认的Dubbo协议下,实现调用链的过程很简单只需要在应用项目或者Dubbo源码中使用如下代码就可以实现调用链的传递。

RpcContext.getContext().setAttachment(CallChainContext.TRACEID, traceIdValue);

RpcInvocation rpcInvocation = (RpcInvocation) inv;
rpcInvocation.setAttachment(CallChainContext.TRACEID, traceIdValue);
rpcInvocation.setAttachment(CallChainContext.SPANID, spanIdValue);

在DubboInvoker中最终通信的时候会将上述代码的RpcInvocation对象传递出去,那么我们只需要在接收端获取既可。

3、Hessian协议下的调用链传递过程

大家都知道,Dubbo在实现通信的协议上使用的有Netty、Hessian、Rest等方式,由于我们项目的特殊性,目前采用的是Dubbo的Hessian协议。

先看如下代码:

 protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {

        HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
        String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
        if ("httpclient".equals(client)) {
            hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
        } else if (client != null && client.length() > 0 && ! Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
            throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
        }
        int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        hessianProxyFactory.setConnectTimeout(timeout);
        hessianProxyFactory.setReadTimeout(timeout);
        return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
    }

通过代码可以看到,实际上在使用Hessian通信的时候并没有将RpcInvocation里面设定的TraceId和SpanId传递出去,调用在这一块中止了。

那我们如何自己来实现呢?

  • 第一步、我们在Dubbo源码中自己实现了一个Filter,用来产生TraceId和SpanId,以及最后的清理工作,请看代码如下:
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        // 将请求转换成HttpServletRequest请求
        HttpServletRequest httpServletRequest = (HttpServletRequest) request;

        try {
            archieveId(request);
        } catch (Throwable e) {
            log.log(Level.SEVERE, "traceId或spanId解析出错!", e);
        }

        try {
            chain.doFilter(request, response);
        } catch (IOException e) {
            //还原线程名称
            throw e;
        } catch (ServletException e) {
            //还原线程名称
            throw e;
        } finally {
            CallChainContext.getContext().clearContext();
        }
    }

在Filter中产生TraceId和SpanId以后,会将二个值放到我们封装好的CallChainContext中进行暂存。

  • 第二步、我们将HessianProxyFactory进行继承改造
public class HessianProxyWrapper extends HessianProxy {
    private static final long serialVersionUID = 353338409377437466L;

    private static final Logger log = Logger.getLogger(HessianProxyWrapper.class
            .getName());

    public HessianProxyWrapper(URL url, HessianProxyFactory factory, Class<?> type) {
        super(url, factory, type);
    }

    protected void addRequestHeaders(HessianConnection conn) {
        super.addRequestHeaders(conn);
        conn.addHeader("traceId", CallChainContext.getContext().getTraceId());
        conn.addHeader("spanId", CallChainContext.getContext().getSpanId());
    }
}

我们将CallChainContext中暂存的TraceId和SpanId放入到Hessian的header中。

继承Dubbo的HessianProxyFactory这个类,新类名是HessianProxyFactoryWrapper,在create方法中将HessianProxy替换为新封装的HessianProxyWrapper,代码如下:

public Object create(Class<?> api, URL url, ClassLoader loader) {
        if (api == null)
            throw new NullPointerException(
                    "api must not be null for HessianProxyFactory.create()");
        InvocationHandler handler = null;
                //将HessianProxy修改为HessianProxyWrapper
        handler = new HessianProxyWrapper(url, this, api);

        return Proxy.newProxyInstance(loader, new Class[] { api,
                HessianRemoteObject.class }, handler);
    }

修改后的HessianProtocol的代码如下:

   protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
       //新继承的
        HessianProxyFactoryWrapper hessianProxyFactory = new HessianProxyFactoryWrapper();

        String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
        if ("httpclient".equals(client)) {
            hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
        } else if (client != null && client.length() > 0 && ! Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
            throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
        }
        int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        hessianProxyFactory.setConnectTimeout(timeout);
        hessianProxyFactory.setReadTimeout(timeout);
        return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
    }

通过以上方式可以将我们产生的TraceId和SpanId通过Hessian的方式传递出去,我们在接收请求的时候,只需要使用如下代码的方式就可以获取到二个值。

String traceIdValue = request.getHeader("traceId");
String spanIdValue = request.getHeader("spanId"); 
  • 第三步、如何打印调用链信息

我们在项目中使用的是Logback的方式打印日志,首先想到的是继承一个ClassicConverter对象,实现Logback的自定义格式转换器,参考代码如下:

public class CallChainConverter extends ClassicConverter {

    @Override
    public String convert(ILoggingEvent event) {
        Map<String,String> globalMap = CallChainContext.getContext().get();
        StringBuilder builder = new StringBuilder();

        if(null == globalMap) {
            globalMap = new HashMap<String, String>();
            CallChainContext.getContext().add(globalMap);
        } else {
            String traceId = globalMap.get("traceId");
            String spainId = globalMap.get("spanId");

            if(traceId == null) {
                traceId = String.valueOf(Thread.currentThread().getId());
            }

            if(spainId == null) {
                spainId = "1";
            }

            builder.append("GUID[");
            builder.append(traceId);
            builder.append("] - LEVEL[");
            builder.append(spainId);
            builder.append("] ");
        }

        return builder.toString();
    }
}

在Logback配置文件中进行如下修改:

<conversionRule conversionWord="callContext"  converterClass="com.ulpay.dubbox.core.util.CallChainConverter" />

<layout class="com.ulpay.dubbox.core.util.CallChainPatternLayout">
    <pattern>%d %-5p %c [%t] %callContext - %m%n</pattern>
</layout>

最终打印的日志格式如下样式:

[RMI TCP Connection(127.0.0.1:2181)] GUID[760a1fedd7ab4ff8a309cebaa01cc61d] - LEVEL[15.27.1]  - [执行时间] - [xxx项目] - [xxx服务.xxx方法] - 耗时[7]毫秒

4、采集日志信息实现分布式调用链界面展示

一个最简单的demo示意图如下:

  • 通过logstash采集日志到kafka
  • kafka负责提供数据给Hbase
  • 通过Hbase进行数据分析

最终效果展示图如下:

四、总结

对于分布式调用链来说,目前市面上有很多开源的工具,比如:pinpoint,Cat以及sky-walking等等,将这些工具与我们扩展的调用链日志结合起来将起到更好的效果。

出于公司的考虑,以上的代码采用的是伪代码,但也具有一定参考价值,我写这篇文章的目的也是希望能够给大家提供一些思路,希望大家能够多提建议,我会持续改进。

时间: 2024-07-30 03:51:02

跟着小程学微服务-自己动手扩展分布式调用链的相关文章

小程聊微服务-基于dubbo的mock测试系统

一.说在前面 基于微服务或者SOA的自动化测试系统每个公司都有自己的特有的,我今天就主要介绍一下,我们研发的一套mock测试系统. 二.目前面临的问题 1.测试人员面临的测试问题 我公司目前用的是基于Dubbo的微服务改造,服务之间的调用链路冗长,每个服务又是单独的团队在维护,每个团队又在不断的演进和维护各个服务,那么对测试人员将是非常大的挑战. 测试人员每次进行功能测试的时候,测试用例每次都需要重新写一遍,无法将测试用例的数据沉淀,尤其是做自动化测试的时候,测试人员准备测试数据就需要很长时间,

小程聊微服务-增艺眼中的自己主动化測试

假设说"生活不仅仅有眼前的苟且,还有诗和远方"的话,那么自己主动化測试可以说是非常多測试人员心中的"诗和远方". "诗和远方"OR"禁果" 測试自己主动化,须要持续改进.但因为其本身是一种过于激动人心的想法:用程序去測试程序--解放了測试人员的生产力.节省大量的人力成本.这就有点"禁果"的意思了. 一个常见的行动模式是:在实施自己主动化測试时,设定一些量化指标,比如依据业务.接口.模块设置的覆盖率. 技术团

从0开始学微服务

作为一名IT从业者,懈怠是一件奢侈的事情,因为在IT圈,原地踏步就等于退步. “微服务”这个名词已经广为流传,但是我觉得大部分的人也许同我一样,仅仅只是处于对这个概念的认知上:是的!今天我希望跟大家一起揭开它的神秘面纱:) <从 0 开始学微服务>专栏希望能够用通俗易懂的语言帮助你理解以上几个问题,同时也是希望能够由浅入深.由表及里系统为你讲解微服务的各个关键环节,帮你上手微服务. 四个核心模块. 入门微服务:将介绍微服务体系的基本原理和组成,帮你解答什么是微服务.什么时候适合微服务改造.微服

java微服务分布式调用链APM监控

几种分布式调用链监控组件的比较微服务架构下,服务按照不同的维度进行拆分,一次请求请求往往需要涉及到多个服务.互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发.可能使用不同的编程语言来实现.有可能布在了几千台服务器,横跨多个不同的数据中心.因此,就需要一些可以帮助理解系统行为.用于分析性能问题的工具,以便发生故障的时候,能够快速定位和解决问题. 分布式调用链监控组件在这样的环境下产生了.最出名的是谷歌公开的论文提到的 Dapper .开发Dapper是为了收集更多的复杂分

微服务-使用Redis实现分布式缓存

在单体中对于key信息和用户信息是放在内存中放的,通过session进行管理. 微服务是要放在分布式缓存中,以实现服务的无状态化. @Autowired private StringRedisTemplate redisTemplate; @Value("${file.prefix}") private String imgPrefix; /** * 1.首先通过缓存获取 * 2.不存在将从通过数据库获取用户对象 * 3.将用户对象写入缓存,设置缓存时间5分钟 * 4.返回对象 * @

跟着小程来学微服务--微服务思想

前言 一直对微服务非常感兴趣,因为公司的架构改造正好有机会能够接触微服务,买来一些书,请教了很多微服务大牛同时自己也做了很多总结,写成了80页ppt,算是我对微服务的一个认识吧,微服务本身不同的人有不同的理解,而我就从我自己的角度来谈谈微服务是什么. 目前市面上的不少书或者不少相关文章写的都是框架的使用,或者架构的介绍,其实对于刚入门不久的同学来说很容易造成微服务就是一堆框架和组件的堆砌,于是今天我将从理论和实践的角度来说说微服务. 现代互联网的方向是当企业发展到一定规模后,一定是大规模.云计算

从0开始学微服务(二) --- 2020年04月

各种框架的选型 1.注册中心选型 注册与发现有解决方案有两种: 1).应用内注册与发现的方式,最典型的是 Eureka 注册中心. Eureka 主要三个组件:Eureka Server 实现服务信息注册.存储.查询,Eureka Client 集成在服务端的注册中心 SDK,实现服务注册.反注册,以及服务订阅.服务更新等功能. 2).应用外注册与发现的方式,有 Consul.Nacos. Nacos 有一个简单 UI 的客户端,实现服务注册信息的存储,以及各种参数的配置. Nacos 支持基于

微服务实践分享(8) 控制调用中心

1.熔断 在微服务领域,熔断机制是从消费端保护微服务提供者的措施,当微服务的运行质量低于某个临界值时,启动熔断机制,暂停微服务调用一段时间,以保障后端的微服务不会因为持续过负荷而宕机. 2.降级 服务降级主要包括容错降级和屏蔽降级 屏蔽降级:1)throw null 不发起远程调用,直接返回空 2)throw exception 不发起远程调用,直接抛出指定异常 3)execute bean 不发起远程调用,直接执行本地模拟接口实现 服务降级是可逆操作,当系统压力恢复到一定值不需要降级服务时,要

【分布式事务】微服务架构下的分布式事务问题

一.基本概念 ACID理论:关系型数据库的事务满足 ACID 的特性,具有 ACID 特性的数据库支持数据的强一致性,保证了数据本身不会出现不一致.适用于传统的单体架构. CAP理论:在分布式系统下, 包含三个要素:Consistency(一致性).Availability(可用性).Partition tolerance(分区容错性),并且三者不可兼得.分布式系统要求保证分区容错性,只能在数据强一致性(C)和可用性(A)之间做平衡,即选择CP或者AP.比如Zookeeper为CP系统保证强一致