jmockit 中文网 mq消息生产者

RocketMQ是我们常用的消息中间件,在运行单元测试时,我们可能不需要真正发送消息(除非是为了测试发送消息),也不想因为连结不上RocketMQ的Broker,NameServer而影响单元测试运行。
   那我们该如何Mock RocketMQ消息生产者呢?
  请看例子:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

//RocketMQ消息生产者 Mock 

public class RocetMQProducerMockingTest {

    // 把RocketMQ的生产者mock

    @BeforeClass

    public static void mockRocketMQ() {

        new RocketMQProducerMockUp();

    }

    @Test

    public void testSendRocketMQMessage() throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("test_producer");

        producer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");

        producer.start();

        for (int i = 0; i < 20; i++) {

            Message msg = new Message("testtopic""TagA", ("Hello " + i).getBytes());

            // 因为mq生产者已经mock,所以消息并不会真正的发送,即使nameServer连不上,也不影响单元测试的运行

            SendResult result = producer.send(msg);

            Assert.isTrue(result.getSendStatus() == SendStatus.SEND_OK);

            Assert.isTrue(result.getMsgId() != null);

        }

        producer.shutdown();

    }

}

最关键的类是RocketMQProducerMockUp,这个类改变了生产者默认实现。代码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

//MQ消息发送者 的MockUp(伪类) 

public class RocketMQProducerMockUp extends MockUp<DefaultMQProducer> {

    @Mock

    void init() throws MQClientException {

        // 构造函数也什么都不做

    }

    @Mock

    void start() throws MQClientException {

        // 启动,什么都不做 

    }

    @Mock

    void shutdown() {

        // 关闭,也什么都不做 

    }

    @Mock

    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException {

        // 欺骗调用方,返回不存在的消息队列,因为消息并不会真正发送嘛

        List<MessageQueue> queues = new ArrayList<MessageQueue>();

        MessageQueue q = new MessageQueue();

        q.setBrokerName("testbrokername");

        q.setQueueId(1);

        q.setTopic("testtopic");

        queues.add(q);

        return queues;

    }

    // 下面是对各个send方法的mock,都返回消息成功结果

    @Mock

    SendResult send(final Message msg)

            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        return newSuccessSendResult();

    }

    @Mock

    SendResult send(final Message msg, final long timeout)

            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        return newSuccessSendResult();

    }

    @Mock

    void send(final Message msg, final SendCallback sendCallback)

            throws MQClientException, RemotingException, InterruptedException {

        sendCallback.onSuccess(this.newSuccessSendResult());

    }

    @Mock

    void send(final Message msg, final SendCallback sendCallback, final long timeout)

            throws MQClientException, RemotingException, InterruptedException {

        sendCallback.onSuccess(this.newSuccessSendResult());

    }

    @Mock

    void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException {

    }

    @Mock

    SendResult send(final Message msg, final MessageQueue mq)

            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        return newSuccessSendResult();

    }

    @Mock

    SendResult send(final Message msg, final MessageQueue mq, final long timeout)

            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        return newSuccessSendResult();

    }

    @Mock

    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)

            throws MQClientException, RemotingException, InterruptedException {

        sendCallback.onSuccess(this.newSuccessSendResult());

    }

    @Mock

    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)

            throws MQClientException, RemotingException, InterruptedException {

        sendCallback.onSuccess(this.newSuccessSendResult());

    }

    @Mock

    void sendOneway(final Message msg, final MessageQueue mq)

            throws MQClientException, RemotingException, InterruptedException {

    }

    @Mock

    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)

            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        return newSuccessSendResult();

    }

    @Mock

    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout)

            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        return newSuccessSendResult();

    }

    @Mock

    void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback)

            throws MQClientException, RemotingException, InterruptedException {

        sendCallback.onSuccess(this.newSuccessSendResult());

    }

    @Mock

    void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback,

            final long timeout) throws MQClientException, RemotingException, InterruptedException {

        sendCallback.onSuccess(this.newSuccessSendResult());

    }

    @Mock

    void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)

            throws MQClientException, RemotingException, InterruptedException {

    }

    @Mock

    TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter,

            final Object arg) throws MQClientException {

        return newTransactionSendResult();

    }

    private TransactionSendResult newTransactionSendResult() {

        TransactionSendResult success = new TransactionSendResult();

        success.setSendStatus(SendStatus.SEND_OK);

        success.setMsgId(UUID.randomUUID().toString());

        MessageQueue q = new MessageQueue();

        q.setBrokerName("testbrokername");

        q.setQueueId(1);

        q.setTopic("testtopic");

        success.setMessageQueue(q);

        success.setLocalTransactionState(LocalTransactionState.COMMIT_MESSAGE);

        return success;

    }

    private SendResult newSuccessSendResult() {

        SendResult success = new SendResult();

        success.setSendStatus(SendStatus.SEND_OK);

        success.setMsgId(UUID.randomUUID().toString());

        MessageQueue q = new MessageQueue();

        q.setBrokerName("testbrokername");

        q.setQueueId(1);

        q.setTopic("testtopic");

        success.setMessageQueue(q);

        return success;

    }

}

 

原文地址:https://www.cnblogs.com/funkboy/p/12012559.html

时间: 2024-10-27 01:12:35

jmockit 中文网 mq消息生产者的相关文章

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产

阿里云ACE共创空间——MQ消息队列产品测试

一.产品背景消息队列是阿里巴巴集团自主研发的专业消息中间件. 产品基于高可用分布式集群技术,提供消息订阅和发布.消息轨迹查询.定时(延时)消息.资源统计.监控报警等一系列消息云服务,是企业级互联网架构的核心产品. MQ 目前提供 TCP .MQTT 两种协议层面的接入方式,支持 Java.C++ 以及 .NET 不同语言,方便不同编程语言开发的应用快速接入 MQ 消息云服务. 用户可以将应用部署在阿里云 ECS.企业自建云,或者嵌入到移动端.物联网设备中与 MQ 建立连接进行消息收发,同时本地开

MQ消息队列的12点核心原理总结

1. 消息生产者.消息者.队列 消息生产者Producer:发送消息到消息队列. 消息消费者Consumer:从消息队列接收消息. Broker:概念来自与Apache ActiveMQ,指MQ的服务端,帮你把消息从发送端传送到接收端. 消息队列Queue:一个先进先出的消息存储区域.消息按照顺序发送接收,一旦消息被消费处理,该消息将从队列中删除. 2.设计Broker主要考虑 1)消息的转储:在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机. 2)规范一种范式和通用的模式,以满

kafka 0.8.2 消息生产者 KafkaProducer

package com.hashleaf.kafka; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerReco

kafka 0.8.2 消息生产者 producer

package com.hashleaf.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 消息生产者 * @author xiaojf [email protected] * @since 2015-7-15 下午10:50:01 */

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

如何保证MQ消息必达

此文章属于笔记,原属58沈剑 一.MQ消息必达,架构上的两个核心设计点: 消息落地 消息超时.重传.确认 四大部件:发送端 接收端 服务端 固化存储组成 二.上半场消息必达以及消息重复问题 上半场的流程 发送端MQ-client 将消息发送给服务端MQ-server 服务端MQ-server将消息落地 服务端MQ-server 回ACK(表示确认) 2.如果3丢失 发送端在超时后,又会发送一遍,此时重发是MQ-client发起的,消息处理的是MQ-server 为了避免2 重复落地,对每条MQ消

【ActiveMQ】消息生产者自动注入报错:Could not autowire. No beans of &#39;JmsMessagingTemplate&#39; type found

使用ActiveMQ过程中,定义消息生产者: package com.sxd.jms.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination;

MQ消息队列配置

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/