Hystrix 监控数据聚合 Turbine【Finchley 版】

原文地址:https://windmt.com/2018/04/17/spring-cloud-6-turbine/

上一篇我们介绍了使用 Hystrix Dashboard 来展示 Hystrix 用于熔断的各项度量指标。通过 Hystrix Dashboard,我们可以方便的查看服务实例的综合情况,比如:服务调用次数、服务调用延迟等。但是仅通过 Hystrix Dashboard 我们只能实现对服务当个实例的数据展现,在生产环境我们的服务是肯定需要做高可用的,那么对于多实例的情况,我们就需要将这些度量指标数据进行聚合。下面,在本篇中,我们就来介绍一下另外一个工具:Turbine。

准备工作

在开始使用 Turbine 之前,我们先回顾一下上一篇中实现的架构,如下图所示:

其中,我们构建的内容包括:

  • eureka-server:服务注册中心
  • eureka-producer:服务提供者
  • eureka-consumer-hystrix:使用 Feign 和 Hystrix 实现的服务消费者
  • hystrix-dashboard:用于展示eureka-consumer-hystrix服务的 Hystrix 数据

创建 Turbine

下面,我们将在上述架构基础上,引入 Turbine 来对服务的 Hystrix 数据进行聚合展示。 这里我们将分别介绍两种聚合方式。

通过 HTTP 收集聚合

创建一个标准的 Spring Boot 工程,命名为:turbine。

POM 配置

在 pom.xml 中添加以下依赖

1234
<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-netflix-turbine</artifactId></dependency>

启动类

在启动类上使用@EnableTurbine注解开启 Turbine

12345678
@EnableTurbine@SpringBootApplicationpublic class TurbineApplication {

public static void main(String[] args) {        SpringApplication.run(TurbineApplication.class, args);    }}

配置文件

在 application.yml 加入 Eureka 和 Turbine 的相关配置

123456789101112131415
spring:  application:    name: turbineserver:  port: 8080management:  port: 8081eureka:  client:    service-url:      defaultZone: http://localhost:7000/eureka/turbine:  app-config: eureka-consumer-hystrix  cluster-name-expression: new String("default")  combine-host-port: true

参数说明

  • turbine.app-config参数指定了需要收集监控信息的服务名;
  • turbine.cluster-name-expression 参数指定了集群名称为 default,当我们服务数量非常多的时候,可以启动多个 Turbine 服务来构建不同的聚合集群,而该参数可以用来区分这些不同的聚合集群,同时该参数值可以在 Hystrix 仪表盘中用来定位不同的聚合集群,只需要在 Hystrix Stream 的 URL 中通过 cluster 参数来指定;
  • turbine.combine-host-port参数设置为true,可以让同一主机上的服务通过主机名与端口号的组合来进行区分,默认情况下会以 host 来区分不同的服务,这会使得在本地调试的时候,本机上的不同服务聚合成一个服务来统计。

注意:new String("default")这个一定要用 String 来包一下,否则启动的时候会抛出异常:

1
org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field ‘default‘ cannot be found on object of type ‘com.netflix.appinfo.InstanceInfo‘ - maybe not public or not valid?

测试

在完成了上面的内容构建之后,我们来体验一下 Turbine 对集群的监控能力。分别启动

  • eureka-server
  • eureka-producer
  • eureka-consumer-hystrix
  • turbine
  • hystrix-dashboard

访问 Hystrix Dashboard 并开启对 http://localhost:8080/turbine.stream 的监控,这时候,我们将看到针对服务 eureka-consumer-hystrix 的聚合监控数据。

此时的架构如下图所示:

通过消息代理收集聚合

Spring Cloud 在封装 Turbine 的时候,还实现了基于消息代理的收集实现。所以,我们可以将所有需要收集的监控信息都输出到消息代理中,然后 Turbine 服务再从消息代理中异步的获取这些监控信息,最后将这些监控信息聚合并输出到 Hystrix Dashboard 中。通过引入消息代理,我们的 Turbine 和 Hystrix Dashoard 实现的监控架构可以改成如下图所示的结构:

从图中我们可以看到,这里多了一个重要元素:RabbitMQ。对于 RabbitMQ 的安装与基本时候我们可以查看之前的 MQ 系列,这里不做过多的说明。下面,我们可以来构建一个新的应用来实现基于消息代理的 Turbine 聚合服务。

关于通过 MQ 的聚合,在 Finchley.RC1 版本下有好多坑,好在最后能正常运行了。

Turbine Stream

UPDATED:2018-06-01
Finchley.RC2 已经出了,下边提到的 bug 都已经被修复了,直接添加 @EnableTurbineStream 就可以正常使用了。最新代码实例请看:https://github.com/zhaoyibo/spring-cloud-study/tree/master/turbine-stream

需要注意一点的是,Turbine Stream 默认的端口已经从 8989 改为 8080 了。

UPDATED:2018-05-18
以下关于 Turbine Stream 的内容仅适用于 Finchley.RC1 版本。今天尝试一下最新的 Finchley.BUILD-SNAPSHOT 发现 netty 的默认端口已经从 8989 改到 8080,并且需要依赖spring-cloud-starter-netflix-hystrix,因为目前 BUILD-SNAPSHOT 依旧有 bug 不确定他们会怎么改,我就暂时先不更新了。等到 RC2 release 的时候再来更新一发。

创建一个标准的 Spring Boot 工程,命名为:turbine-stream-rabbitmq

POM

12345678
<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId></dependency><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

配置文件

1234567
spring:  application:    name: turbine-stream-rabbitmqeureka:  client:    service-url:      defaultZone: http://localhost:7000/eureka/

启动类

1234567891011121314
@SpringBootApplication@EnableTurbineStreampublic class TurbineStreamRabbitmqApplication {

public static void main(String[] args) {        SpringApplication.run(TurbineStreamRabbitmqApplication.class, args);    }

@Bean    public ConfigurableCompositeMessageConverter integrationArgumentResolverMessageConverter(CompositeMessageConverterFactory factory) {        return new ConfigurableCompositeMessageConverter(factory.getMessageConverterForAllRegistered().getConverters());    }

}

改造服务调用者

以之前的 eureka-consumer-hystrix 项目为基础,在 pom.xml 里加入以下依赖

12345678
<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-netflix-hystrix-stream</artifactId></dependency><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

再在启动类上加上@EnableHystrix注解

12345678910
@EnableHystrix@EnableFeignClients@SpringBootApplicationpublic class EurekaConsumerHystrixApplication {

public static void main(String[] args) {        SpringApplication.run(EurekaConsumerHystrixApplication.class, args);    }

}

测试

分别启动 eureka-consumer-hystrix、turbine-stream-rabbitmq 这两个项目,然后在 RabbitMQ 的管理后台可以看到,自动创建了一个 Exchange 和 Queue


看到这还是挺高兴的,但是……

当访问了一下 consumer 中的接口后,就开始了艰辛的爬坑路程……

遇到的坑

依赖

这个 Turbine Stream 之前应该是叫 Turbine AMQP,当时有个 spring-cloud-starter-turbine-amqp 依赖可以用,里边包装了相关的依赖,但是现在它被 deprecated 了,让用 spring-cloud-starter-netflix-turbine-stream 来代替,这就得靠自己来组合了。而坑主要就出在这里,至于哪些是必须的,哪些是添加了后就出问题的,还有依赖冲突的问题,都得靠自己来搞了。

JsonParseException

相关 issue:https://github.com/spring-cloud/spring-cloud-netflix/issues/2858

Turbine Stream 从 RabbitMQ 取数据的时候抛出以下异常:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
com.fasterxml.jackson.core.JsonParseException: Unexpected character (‘,‘ (code 44)): Expected space separating root-level values at [Source: (String)"123,34,111,114,105,103,105,110,34,58,123,34,104,111,115,116,34,58,34,49,55,50,46,49,54,46,49,48,54,46,57,51,34,44,34,112,111,114,116,34,58,57,48,49,51,44,34,115,101,114,118,105,99,101,73,100,34,58,34,101,117,114,101,107,97,45,99,111,110,115,117,109,101,114,45,104,121,115,116,114,105,120,34,44,34,105,100,34,58,34,97,112,112,108,105,99,97,116,105,111,110,45,49,34,125,44,34,101,118,101,110,116,34,58,34,109,101,115,115,97,103,101,34,44,34,100,97,116,97,34,58,123,34,116,121,112,101,34,58,34,72,121,11"[truncated 6105 chars]; line: 1, column: 5]    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1798) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:663) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:561) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:608) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1654) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1297) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:744) ~[jackson-core-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4129) ~[jackson-databind-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988) ~[jackson-databind-2.9.3.jar:2.9.3]    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2992) ~[jackson-databind-2.9.3.jar:2.9.3]    at org.springframework.cloud.netflix.turbine.stream.HystrixStreamAggregator.sendToSubject(HystrixStreamAggregator.java:73) ~[spring-cloud-netflix-turbine-stream-2.0.0.M8.jar:2.0.0.M8]    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:120) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:111) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:54) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:384) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:89) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:116) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:371) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:157) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.util.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:614) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.util.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:605) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:468) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:312) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:104) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:59) [spring-integration-amqp-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:217) [spring-integration-amqp-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar:na]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:214) [spring-integration-amqp-5.0.4.RELEASE.jar:5.0.4.RELEASE]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:785) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:769) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1010) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

实际的 json 串类似以下格式:

1
{"origin":{"host":"172.16.106.93","port":9013,"serviceId":"eureka-consumer-hystrix","id":"application-1"},"event":"message","data":{"type":"HystrixCommand","name":"eureka-consumer-hystrix.HelloRemote#hello(String)","group":"eureka-producer","currentTime":1523938311789,"isCircuitBreakerOpen":false,"errorPercentage":0,"errorCount":0,"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":0,"rollingCountFallbackFailure":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":0,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":0,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":1,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1}}

上边这串字符串转为 byte[] 后就是

1
123,34,111,114,105,103,105,110,34,58,123,34,104,111,115,116,34,58,34,49,55,50,46,49,54,46,49,48,54,46,57,51,34,44,34,112,111,114,116,34,58,57,48,49,51,44,34,115,101,114,118,105,99,101,73,100,34,58,34,101,117,114,101,107,97,45,99,111,110,115,117,109,101,114,45,104,121,115,116,114,105,120,34,44,34,105,100,34,58,34,97,112,112,108,105,99,97,116,105,111,110,45,49,34,125,44,34,101,118,101,110,116,34,58,34,109,101,115,115,97,103,101,34,44,34,100,97,116,97,34,58,123,34,116,121,112,101,34,58,34,72,121,115,116,114,105,120,67,111,109,109,97,110,100,34,44,34,110,97,109,101,34,58,34,101,117,114,101,107,97,45,99,111,110,115,117,109,101,114,45,104,121,115,116,114,105,120,46,72,101,108,108,111,82,101,109,111,116,101,35,104,101,108,108,111,40,83,116,114,105,110,103,41,34,44,34,103,114,111,117,112,34,58,34,101,117,114,101,107,97,45,112,114,111,100,117,99,101,114,34,44,34,99,117,114,114,101,110,116,84,105,109,101,34,58,49,53,50,51,57,51,56,56,57,49,55,53,48,44,34,105,115,67,105,114,99,117,105,116,66,114,101,97,107,101,114,79,112,101,110,34,58,102,97,108,115,101,44,34,101,114,114,111,114,80,101,114,99,101,110,116,97,103,101,34,58,48,44,34,101,114,114,111,114,67,111,117,110,116,34,58,48,44,34,114,101,113,117,101,115,116,67,111,117,110,116,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,67,111,108,108,97,112,115,101,100,82,101,113,117,101,115,116,115,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,69,120,99,101,112,116,105,111,110,115,84,104,114,111,119,110,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,105,108,117,114,101,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,108,108,98,97,99,107,70,97,105,108,117,114,101,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,108,108,98,97,99,107,82,101,106,101,99,116,105,111,110,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,108,108,98,97,99,107,83,117,99,99,101,115,115,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,82,101,115,112,111,110,115,101,115,70,114,111,109,67,97,99,104,101,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,83,101,109,97,112,104,111,114,101,82,101,106,101,99,116,101,100,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,83,104,111,114,116,67,105,114,99,117,105,116,101,100,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,83,117,99,99,101,115,115,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,84,104,114,101,97,100,80,111,111,108,82,101,106,101,99,116,101,100,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,84,105,109,101,111,117,116,34,58,48,44,34,99,117,114,114,101,110,116,67,111,110,99,117,114,114,101,110,116,69,120,101,99,117,116,105,111,110,67,111,117,110,116,34,58,49,44,34,108,97,116,101,110,99,121,69,120,101,99,117,116,101,95,109,101,97,110,34,58,48,44,34,108,97,116,101,110,99,121,69,120,101,99,117,116,101,34,58,123,34,48,34,58,48,44,34,50,53,34,58,48,44,34,53,48,34,58,48,44,34,55,53,34,58,48,44,34,57,48,34,58,48,44,34,57,53,34,58,48,44,34,57,57,34,58,48,44,34,57,57,46,53,34,58,48,44,34,49,48,48,34,58,48,125,44,34,108,97,116,101,110,99,121,84,111,116,97,108,95,109,101,97,110,34,58,48,44,34,108,97,116,101,110,99,121,84,111,116,97,108,34,58,123,34,48,34,58,48,44,34,50,53,34,58,48,44,34,53,48,34,58,48,44,34,55,53,34,58,48,44,34,57,48,34,58,48,44,34,57,53,34,58,48,44,34,57,57,34,58,48,44,34,57,57,46,53,34,58,48,44,34,49,48,48,34,58,48,125,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,82,101,113,117,101,115,116,86,111,108,117,109,101,84,104,114,101,115,104,111,108,100,34,58,50,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,83,108,101,101,112,87,105,110,100,111,119,73,110,77,105,108,108,105,115,101,99,111,110,100,115,34,58,53,48,48,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,69,114,114,111,114,84,104,114,101,115,104,111,108,100,80,101,114,99,101,110,116,97,103,101,34,58,53,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,70,111,114,99,101,79,112,101,110,34,58,102,97,108,115,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,70,111,114,99,101,67,108,111,115,101,100,34,58,102,97,108,115,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,69,110,97,98,108,101,100,34,58,116,114,117,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,83,116,114,97,116,101,103,121,34,58,34,84,72,82,69,65,68,34,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,84,104,114,101,97,100,84,105,109,101,111,117,116,73,110,77,105,108,108,105,115,101,99,111,110,100,115,34,58,49,48,48,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,84,104,114,101,97,100,73,110,116,101,114,114,117,112,116,79,110,84,105,109,101,111,117,116,34,58,116,114,117,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,84,104,114,101,97,100,80,111,111,108,75,101,121,79,118,101,114,114,105,100,101,34,58,110,117,108,108,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,83,101,109,97,112,104,111,114,101,77,97,120,67,111,110,99,117,114,114,101,110,116,82,101,113,117,101,115,116,115,34,58,49,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,102,97,108,108,98,97,99,107,73,115,111,108,97,116,105,111,110,83,101,109,97,112,104,111,114,101,77,97,120,67,111,110,99,117,114,114,101,110,116,82,101,113,117,101,115,116,115,34,58,49,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,109,101,116,114,105,99,115,82,111,108,108,105,110,103,83,116,97,116,105,115,116,105,99,97,108,87,105,110,100,111,119,73,110,77,105,108,108,105,115,101,99,111,110,100,115,34,58,49,48,48,48,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,114,101,113,117,101,115,116,67,97,99,104,101,69,110,97,98,108,101,100,34,58,116,114,117,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,114,101,113,117,101,115,116,76,111,103,69,110,97,98,108,101,100,34,58,116,114,117,101,44,34,114,101,112,111,114,116,105,110,103,72,111,115,116,115,34,58,49,125,125

从上边异常可以看出,这其实就是将这个 byte[] 转为 String 的时候出错了。

在源码里找了好久,最后发现原来是启动的时候要初始化一个 ConfigurableCompositeMessageConverter,但是这个类默认的只提供以下 4 个MessageConverter

  • MappingJackson2MessageConverter
  • ByteArrayMessageConverter
  • ObjectStringMessageConverter
  • GenericMessageConverter

这 4 个 Converter 处理 byte[] -> String 的时候都会出问题(上边的异常就是MappingJackson2MessageConverter这个抛出的)。说到这你可能会问了,byte[] 转 String 有这么难吗?不就一个 new String(bytes) 就解决了。我也这么想啊,就差自己动手写了。

这时候发现了CompositeMessageConverterFactory,从名字上可以看出就是 MessageConverter的工厂类,似乎是一根救命稻草。它里边默认提供了 7 个 Converter,第一个就是ApplicationJsonMessageMarshallingConverter,看了里边的实现,这不正是我需要的嘛!

123456
if (message.getPayload() instanceof byte[] &&  targetClass.isAssignableFrom(String.class)) {    result = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);}else {    result = super.convertFromInternal(message, targetClass, conversionHint);}

所以就自己手动注入这个 bean

1234
@Beanpublic ConfigurableCompositeMessageConverter integrationArgumentResolverMessageConverter(CompositeMessageConverterFactory factory) {    return new ConfigurableCompositeMessageConverter(factory.getMessageConverterForAllRegistered().getConverters());}

其实这个 Bean 在ContentTypeConfiguration中已经声明了但没效果,我只是原封不动的 copy 出来。

1234
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)public ConfigurableCompositeMessageConverter configurableCompositeMessageConverter(CompositeMessageConverterFactory factory){    return new ConfigurableCompositeMessageConverter(factory.getMessageConverterForAllRegistered().getConverters());}

Fatal Exception thrown on Scheduler.Worker thread

这个的异常信息如下:

123456789101112131415161718192021222324252627282930313233343536
Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)    at java.util.concurrent.FutureTask.run(FutureTask.java:266)    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)    at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.AbstractMethodError    at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77)    at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704)    at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1056)    at io.netty.channel.AbstractChannel.write(AbstractChannel.java:290)    at io.reactivex.netty.channel.DefaultChannelWriter.writeOnChannel(DefaultChannelWriter.java:165)    at io.reactivex.netty.protocol.http.server.HttpServerResponse.writeOnChannel(HttpServerResponse.java:195)    at io.reactivex.netty.channel.DefaultChannelWriter.write(DefaultChannelWriter.java:83)    at io.reactivex.netty.channel.DefaultChannelWriter.writeAndFlush(DefaultChannelWriter.java:65)    at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)    at rx.observers.Subscribers$5.onNext(Subscribers.java:235)    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)    at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:120)    at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublish.java:585)    at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.java:283)    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)    at rx.internal.schedulers.SchedulePeriodicHelper$1.call(SchedulePeriodicHelper.java:72)    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)    ... 7 more

同时在 Hystrix Dashboard 中 monitor 相应的地址的会提示 “Unable to connect to Command Metric Stream.” 并在 Console 里报错:

1
EventSource‘s response has a MIME type ("text/plain") that is not "text/event-stream". Aborting the connection.

这个问题真不知道怎么搞了,先弃坑了,有空了再研究吧。

2018-05-06 更新:
这个问题可以详见 Github 上的 这个 issus 感谢 @MadeInChina

为了本文的完整性,我这里也说明下这个问题的解决思路和解决办法。

这个异常信息关键的一行是是第 22 行

1
at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)

我们就打开 TurbineStreamConfiguration 看一下这 106 行到底写了点啥

如果这时你点进去看 ServerSentEvent 这个类,问题其实就明了了(我当时就是困在了response.writeAndFlush这个方法上,根本没注意到 ServerSentEvent)

这个类直接报错,因为没有完整实现 ByteBufHolder 里边的方法,从这你已经能看出来应该是相关依赖的问题了。我们这里注意一下这个类所在的 JAR 包 io.reactivex:rxnetty:0.4.9,然后再看一下它实现的这个接口

这个接口的变更记录可以看这里。最后一次的修改是在 2016.5.17 添加了几个方法,也就是说 ByteBufHolder 在io.netty:netty-buffer:4.1.0.Final(2016.5.25)就已经被修改了,更不用说io.netty:netty-buffer:4.1.23.Final(2018.4.4)了。而io.reactivex:rxnetty:0.4.9是一个 2015.5.6 发布的 jar 包。

那我们就用io.reactivex:rxnetty:0.4.20最新的版本来试试,pom.xml 里添加以下内容

123456
<dependency>    <groupId>io.reactivex</groupId>    <artifactId>rxnetty</artifactId>    <version>0.4.20</version>    <scope>runtime</scope></dependency>

再次启动并测试

12
$ curl http://localhost:8989curl: (52) Empty reply from server

发现依旧报错,控制台里错误如下

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
2018-05-06 23:29:37.056  WARN 52477 --- [o-eventloop-3-2] i.n.c.AbstractChannelHandlerContext      : An exception ‘{}‘ [enable DEBUG level for full stacktrace] was thrown by a user handler‘s exceptionCaught() method while handling the following exception:

java.lang.NoSuchMethodError: rx.internal.operators.NotificationLite.instance()Lrx/internal/operators/NotificationLite;    at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:243) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:241) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.UnicastContentSubject$State.<init>(UnicastContentSubject.java:197) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:132) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:122) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:117) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter$RequestState.createRxRequest(ServerRequestResponseConverter.java:176) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter$RequestState.access$100(ServerRequestResponseConverter.java:168) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:87) ~[rx-netty-0.3.18.jar:na]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-codec-4.1.23.Final.jar:4.1.23.Final]    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) [netty-codec-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241) [netty-handler-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59) [rx-netty-0.3.18.jar:na]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.23.Final.jar:4.1.23.Final]    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.23.Final.jar:4.1.23.Final]    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-05-06 23:29:37.057 ERROR 52477 --- [o-eventloop-3-2] i.r.netty.server.DefaultErrorHandler     : Unexpected error in RxNetty.

java.lang.NullPointerException: null    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.invokeContentOnNext(ServerRequestResponseConverter.java:160) ~[rx-netty-0.3.18.jar:na]    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:96) ~[rx-netty-0.3.18.jar:na]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-codec-4.1.23.Final.jar:4.1.23.Final]    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) [netty-codec-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241) [netty-handler-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59) [rx-netty-0.3.18.jar:na]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) [netty-transport-4.1.23.Final.jar:4.1.23.Final]    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.23.Final.jar:4.1.23.Final]    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.23.Final.jar:4.1.23.Final]    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

就仅看异常栈的第一行,你会发现 UnicastContentSubject 这个类在当前环境下有三个

将三个都打开来看一下,会发现io.reactivex:rxnetty那两个看起来问题不大,而com.netflix.rxnetty:rx-netty这个直接又是报错,依旧是未完全实现接口。

根据 DEBUG 的信息来看,而实际使用的就是com.netflix.rxnetty:rx-netty:0.3.18里边的这个,那就分析一下依赖吧,把这个残疾的老古董(2014.11.6)给排除掉

123456789101112131415161718192021222324252627282930313233343536373839
$ mvn dependency:tree -Dverbose -Dincludes=*:*netty*[INFO] Scanning for projects...[INFO][INFO] ----------< com.windmt:spring-cloud-turbine-stream-rabbitmq >-----------[INFO] Building turbine-stream-rabbitmq 0.0.1-SNAPSHOT[INFO] --------------------------------[ jar ]---------------------------------[INFO][INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli) @ spring-cloud-turbine-stream-rabbitmq ---[INFO] Verbose not supported since maven-dependency-plugin 3.0[INFO] com.windmt:spring-cloud-turbine-stream-rabbitmq:jar:0.0.1-SNAPSHOT[INFO] +- org.springframework.cloud:spring-cloud-starter-netflix-turbine-stream:jar:2.0.0.RC1:compile[INFO] |  +- org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:jar:2.0.0.RC1:compile[INFO] |  |  \- org.springframework.cloud:spring-cloud-starter-netflix-ribbon:jar:2.0.0.RC1:compile[INFO] |  |     \- com.netflix.ribbon:ribbon:jar:2.2.5:compile[INFO] |  |        +- com.netflix.ribbon:ribbon-transport:jar:2.2.5:runtime[INFO] |  |        |  +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime[INFO] |  |        |  \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime[INFO] |  |        \- io.reactivex:rxnetty:jar:0.4.9:runtime[INFO] |  \- com.netflix.turbine:turbine-core:jar:2.0.0-DP.2:compile[INFO] |     \- com.netflix.rxnetty:rx-netty:jar:0.3.18:compile[INFO] |        +- io.netty:netty-codec-http:jar:4.1.23.Final:compile[INFO] |        |  \- io.netty:netty-codec:jar:4.1.23.Final:compile[INFO] |        \- io.netty:netty-transport-native-epoll:jar:4.1.23.Final:compile[INFO] |           +- io.netty:netty-common:jar:4.1.23.Final:compile[INFO] |           +- io.netty:netty-buffer:jar:4.1.23.Final:compile[INFO] |           +- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile[INFO] |           \- io.netty:netty-transport:jar:4.1.23.Final:compile[INFO] |              \- io.netty:netty-resolver:jar:4.1.23.Final:compile[INFO] \- org.springframework.cloud:spring-cloud-stream-binder-rabbit:jar:2.0.0.RELEASE:compile[INFO]    \- org.springframework.boot:spring-boot-starter-amqp:jar:2.0.1.RELEASE:compile[INFO]       \- org.springframework.amqp:spring-rabbit:jar:2.0.3.RELEASE:compile[INFO]          \- com.rabbitmq:http-client:jar:2.0.1.RELEASE:compile[INFO]             \- io.projectreactor.ipc:reactor-netty:jar:0.7.6.RELEASE:compile[INFO]                +- io.netty:netty-handler:jar:4.1.23.Final:compile[INFO]                \- io.netty:netty-handler-proxy:jar:4.1.23.Final:compile[INFO]                   \- io.netty:netty-codec-socks:jar:4.1.23.Final:compile[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------

最终 pom.xml 里的依赖坐标如下(这也是能正常启动 Turbine Stream 的最小配置了,Spring Cloud 的版本为 Finchley.RC1):

1234567891011121314151617181920212223242526
<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>    <exclusions>        <exclusion>            <groupId>com.netflix.rxnetty</groupId>            <artifactId>rx-netty</artifactId>        </exclusion>        <exclusion>            <groupId>io.reactivex</groupId>            <artifactId>rxnetty</artifactId>        </exclusion>    </exclusions></dependency>

<dependency>    <groupId>io.reactivex</groupId>    <artifactId>rxnetty</artifactId>    <version>0.4.20</version>    <scope>runtime</scope></dependency>

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>

再次测试就能正常收到 SSE 了

123456
$ curl http://localhost:8989event: messagedata: {"rollingCountFallbackFailure":0,"rollingCountFallbackSuccess":0,"propertyValue_circuitBreakerRequestVolumeThreshold":"20","propertyValue_circuitBreakerForceOpen":false,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":"10000","latencyTotal_mean":0,"type":"HystrixCommand","rollingCountResponsesFromCache":0,"TypeAndName":"TypeAndName=>HystrixCommand_consumer-feign-hystrix-stream.HelloRemote#hello(String)","rollingCountTimeout":0,"propertyValue_executionIsolationStrategy":"THREAD","instanceId":"application-1","rollingCountFailure":0,"rollingCountExceptionsThrown":0,"latencyExecute_mean":0,"isCircuitBreakerOpen":false,"errorCount":0,"group":"producer","rollingCountSemaphoreRejected":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountShortCircuited":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerSleepWindowInMilliseconds":"5000","currentConcurrentExecutionCount":0,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":"10","errorPercentage":0,"rollingCountThreadPoolRejected":0,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_requestCacheEnabled":true,"rollingCountFallbackRejection":0,"propertyValue_requestLogEnabled":true,"rollingCountSuccess":0,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":"10","InstanceKey":"InstanceKey=>application-1","propertyValue_circuitBreakerErrorThresholdPercentage":"50","propertyValue_circuitBreakerForceClosed":false,"name":"consumer-feign-hystrix-stream.HelloRemote#hello(String)","reportingHosts":1,"propertyValue_executionIsolationThreadPoolKeyOverride":"null","propertyValue_executionIsolationThreadTimeoutInMilliseconds":"1000"}

event: messagedata: {"currentCorePoolSize":10,"currentLargestPoolSize":8,"currentActiveCount":0,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":"10000","currentMaximumPoolSize":10,"currentQueueSize":0,"type":"HystrixThreadPool","currentTaskCount":8,"TypeAndName":"TypeAndName=>HystrixThreadPool_producer","currentCompletedTaskCount":8,"rollingMaxActiveThreads":0,"instanceId":"application-1","InstanceKey":"InstanceKey=>application-1","name":"producer","reportingHosts":1,"currentPoolSize":8,"propertyValue_queueSizeRejectionThreshold":"5","rollingCountThreadsExecuted":0}

控制台也不报错了

1234
2018-05-06 23:57:25.833  INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : SSE Request Received2018-05-06 23:57:25.843  INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : Starting aggregation2018-05-06 23:57:32.115  INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : Unsubscribing RxNetty server connection2018-05-06 23:57:32.116  INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : Unsubscribing aggregation.

然后在 Hystrix Dashboard 的地址栏里输入 http://localhost:8989 就能看到了(/turbine.stream可加可不加,如果要修改端口号,在 Spring Boot 的配置文件中修改 turbine.stream.port

原文地址:https://www.cnblogs.com/eyesfree/p/10365445.html

时间: 2024-10-11 09:13:27

Hystrix 监控数据聚合 Turbine【Finchley 版】的相关文章

Spring Cloud(四):服务容错保护 Hystrix【Finchley 版】

Spring Cloud(四):服务容错保护 Hystrix[Finchley 版] 发表于 2018-04-15 |  更新于 2018-05-07 | 分布式系统中经常会出现某个基础服务不可用造成整个系统不可用的情况,这种现象被称为服务雪崩效应.为了应对服务雪崩,一种常见的做法是手动服务降级.而 Hystrix 的出现,给我们提供了另一种选择. Hystrix [h?st'r?ks] 的中文含义是 "豪猪",豪猪周身长满了刺,能保护自己不受天敌的伤害,代表了一种防御机制,这与 Hy

Spring Cloud(九):配置中心(消息总线)【Finchley 版】

Spring Cloud(九):配置中心(消息总线)[Finchley 版] 发表于 2018-04-19 |  更新于 2018-05-07 | 我们在 Spring Cloud(七):配置中心(Git.Refresh) 中讲到,如果需要客户端获取到最新的配置信息需要执行refresh,我们可以利用 Webhook 的机制每次提交代码发送请求来刷新客户端,当客户端越来越多的时候,需要每个客户端都执行一遍,这种方案就不太适合了.使用 Spring Cloud Bus 可以完美解决这一问题. Sp

Spring Cloud(十一):服务网关 Zuul(过滤器)【Finchley 版】

Spring Cloud(十一):服务网关 Zuul(过滤器)[Finchley 版] 发表于 2018-04-23 |  更新于 2018-05-07 | 在上篇文章中我们了解了 Spring Cloud Zuul 作为网关所具备的最基本功能:路由(Router).本文我们将关注 Spring Cloud Zuul 的另一核心功能:过滤器(Filter). Filter 的作用 我们已经能够实现请求的路由功能,所以我们的微服务应用提供的接口就可以通过统一的 API 网关入口被客户端访问到了.但

Spring Cloud(十二):分布式链路跟踪 Sleuth 与 Zipkin【Finchley 版】

Spring Cloud(十二):分布式链路跟踪 Sleuth 与 Zipkin[Finchley 版] 发表于 2018-04-24 | 随着业务发展,系统拆分导致系统调用链路愈发复杂一个前端请求可能最终需要调用很多次后端服务才能完成,当整个请求变慢或不可用时,我们是无法得知该请求是由某个或某些后端服务引起的,这时就需要解决如何快读定位服务故障点,以对症下药.于是就有了分布式系统调用跟踪的诞生. 现今业界分布式服务跟踪的理论基础主要来自于 Google 的一篇论文<Dapper, a Larg

跟我学Spring Cloud(Finchley版)-13-通用方式使用Hystrix

本节详细讲解使用Hystrix的通用方式. 简介 Hystrix是由Netflix开源的一个延迟和容错库,用于隔离访问远程系统.服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性.Hystrix主要通过以下几点实现延迟和容错. 包裹请求 使用HystrixCommand(或HystrixObservableCommand)包裹对依赖的调用逻辑,每个命令在独立线程中执行.这使用到了设计模式中的"命令模式". 跳闸机制 当某服务的错误率超过一定阈值时,Hystrix可以自动或者手

微服务熔断限流Hystrix之流聚合

简介 上一篇介绍了 Hystrix Dashboard 监控单体应用的例子,在生产环境中,监控的应用往往是一个集群,我们需要将每个实例的监控信息聚合起来分析,这就用到了 Turbine 工具.Turbine有一个重要的功能就是汇聚监控信息,并将汇聚到的监控信息提供给Hystrix Dashboard来集中展示和监控. 流程 实验 工程说明 工程名 端口 作用 eureka-server 8761 注册中心 service-hi 8762 服务提供者 service-consumer 8763 服

通过 JMX 获取Hadoop/HBase监控数据

概述 说到对Hadoop和 HBase的集群监控,大家知道的和用的最多的可能还是第三方的监控工具,cacti,ganglia,zabbix之类的.玩的深一些的,会用 zenoss之类的.这些工具确实不错,也能发挥很大的作用,但时间长了总感觉监控粒度还是比较粗,不够详细.毕竟是第三方的监控,即便Hadoop自带 了ganglia的接口,也还是觉得不够. 其实Hadoop本身是带有监控接口的,各公司的发行版还有自己定制的接口,不过可能知道的人就不太多了. 其实这个接口特别简单,但是非常详细,也非常方

如何从Zabbix数据库中获取监控数据

做过Zabbix的同学都知道,Zabbix通过专用的Agent或者SNMP收集相关的监控数据,然后存储到数据库里面实时在前台展示.Zabbix监控数据主要分为以下两类: 历史数据:history相关表,从history_uint表里面可以查询到设备监控项目的最大,最小和平均值,即存储监控数据的原始数据. 趋势数据:trends相关表,趋势数据是经过Zabbix计算的数据,数据是从history_uint里面汇总的,从trends_uint可以查看到监控数据每小时最大,最小和平均值流量. Zabb

XenApp/XenDesktop监控数据查询、提取

在XenDesktop中,Director为管理员提供了整个平台的监控和健康状态的信息,让管理员方便的了解Citrix的平台的运行状态以及实时发生的故障.这些监控数据从哪儿来?在XenDesktop的先前版本中,Director中的大多数数据都是通过直接访问Broker Service API来检索的.使用此API的缺点是此服务不包括对使用的历史信息的检索,即只能检索实时会话信息.如果管理员需要向以前的监控组件Edgesight那样检索历史数据,比如管理员可能想知道目前有多少会话处于活动状态,以