Amazon SQS 消息队列服务

Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。

队列操作

创建队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
CreateQueueRequest create_request = new CreateQueueRequest(QUEUE_NAME)
        .addAttributesEntry("DelaySeconds", "60")
        .addAttributesEntry("MessageRetentionPeriod", "86400");

try {
    sqs.createQueue(create_request);
} catch (AmazonSQSException e) {
    if (!e.getErrorCode().equals("QueueAlreadyExists")) {
        throw e;
    }
}
列出队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
ListQueuesResult lq_result = sqs.listQueues();
System.out.println("Your SQS Queue URLs:");
for (String url : lq_result.getQueueUrls()) {
    System.out.println(url);
}
获取队列Url
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
删除队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
sqs.deleteQueue(queue_url);

消息操作

发送消息
SendMessageRequest send_msg_request = new SendMessageRequest()
        .withQueueUrl(queueUrl)
        .withMessageBody("hello world")
        .withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
批量发送消息
SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
        .withQueueUrl(queueUrl)
        .withEntries(
                new SendMessageBatchRequestEntry(
                        "msg_1", "Hello from message 1"),
                new SendMessageBatchRequestEntry(
                        "msg_2", "Hello from message 2")
                        .withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
获取消息
List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
删除消息
for (Message m : messages) {
    sqs.deleteMessage(queueUrl, m.getReceiptHandle());
}

使用JMS方法

发送消息
public class TextMessageSender {
public static void main(String args[]) throws JMSException {
    ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);

    ExampleCommon.setupLogging();

    // Create the connection factory based on the config
    SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                    .withRegion(config.getRegion().getName())
                    .withCredentials(config.getCredentialsProvider())
            );

    // Create the connection
    SQSConnection connection = connectionFactory.createConnection();

    // Create the queue if needed
    ExampleCommon.ensureQueueExists(connection, config.getQueueName());

    // Create the session
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );

    sendMessages(session, producer);

    // Close the connection. This closes the session automatically
    connection.close();
    System.out.println( "Connection closed" );
}

private static void sendMessages( Session session, MessageProducer producer ) {
    BufferedReader inputReader = new BufferedReader(
        new InputStreamReader( System.in, Charset.defaultCharset() ) );

    try {
        String input;
        while( true ) {
            System.out.print( "Enter message to send (leave empty to exit): " );
            input = inputReader.readLine();
            if( input == null || input.equals("" ) ) break;

            TextMessage message = session.createTextMessage(input);
            producer.send(message);
            System.out.println( "Send message " + message.getJMSMessageID() );
        }
    } catch (EOFException e) {
        // Just return on EOF
    } catch (IOException e) {
        System.err.println( "Failed reading input: " + e.getMessage() );
    } catch (JMSException e) {
        System.err.println( "Failed sending message: " + e.getMessage() );
        e.printStackTrace();
    }
}
}
同步接收消息
public class SyncMessageReceiver {
public static void main(String args[]) throws JMSException {
ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);

ExampleCommon.setupLogging();

// Create the connection factory based on the config
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
        new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard()
                .withRegion(config.getRegion().getName())
                .withCredentials(config.getCredentialsProvider())
        );

// Create the connection
SQSConnection connection = connectionFactory.createConnection();

// Create the queue if needed
ExampleCommon.ensureQueueExists(connection, config.getQueueName());

// Create the session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

connection.start();

receiveMessages(session, consumer);

// Close the connection. This closes the session automatically
connection.close();
System.out.println( "Connection closed" );
}

private static void receiveMessages( Session session, MessageConsumer consumer ) {
try {
    while( true ) {
        System.out.println( "Waiting for messages");
        // Wait 1 minute for a message
        Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
        if( message == null ) {
            System.out.println( "Shutting down after 1 minute of silence" );
            break;
        }
        ExampleCommon.handleMessage(message);
        message.acknowledge();
        System.out.println( "Acknowledged message " + message.getJMSMessageID() );
    }
} catch (JMSException e) {
    System.err.println( "Error receiving from SQS: " + e.getMessage() );
    e.printStackTrace();
}
}
}
异步接收消息
public class AsyncMessageReceiver {
public static void main(String args[]) throws JMSException, InterruptedException {
    ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);

    ExampleCommon.setupLogging();          

    // Create the connection factory based on the config
    SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                    .withRegion(config.getRegion().getName())
                    .withCredentials(config.getCredentialsProvider())
            );

    // Create the connection
    SQSConnection connection = connectionFactory.createConnection();

    // Create the queue if needed
    ExampleCommon.ensureQueueExists(connection, config.getQueueName());

    // Create the session
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

    ReceiverCallback callback = new ReceiverCallback();
    consumer.setMessageListener( callback );

    // No messages are processed until this is called
    connection.start();

    callback.waitForOneMinuteOfSilence();
    System.out.println( "Returning after one minute of silence" );

    // Close the connection. This closes the session automatically
    connection.close();
    System.out.println( "Connection closed" );
}

private static class ReceiverCallback implements MessageListener {
    // Used to listen for message silence
    private volatile long timeOfLastMessage = System.nanoTime();

    public void waitForOneMinuteOfSilence() throws InterruptedException {
        for(;;) {
            long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;
            long remainingTillOneMinuteOfSilence =
                TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;
            if( remainingTillOneMinuteOfSilence < 0 ) {
                break;
            }
            TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            ExampleCommon.handleMessage(message);
            message.acknowledge();
            System.out.println( "Acknowledged message " + message.getJMSMessageID() );
            timeOfLastMessage = System.nanoTime();
        } catch (JMSException e) {
            System.err.println( "Error processing message: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}
}


https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-messages.html
https://docs.amazonaws.cn/en_us/AWSSimpleQueueService/latest/SQSDeveloperGuide/code-examples.html

原文地址:https://www.cnblogs.com/helloz/p/9314915.html

时间: 2024-08-01 00:22:24

Amazon SQS 消息队列服务的相关文章

2.OpenStack-安装消息队列服务

安装消息队列服务(安装在控制器上) yum install rabbitmq-server -y systemctl start mariadb.service 配置消息队列服务 systemctl enable rabbitmq-server.service systemctl restart rabbitmq-server.service 修改密码 rabbitmqctl change_password guest Abcd1234 Creating user "openstack"

C#中使用消息队列服务

C#中使用Windows消息队列服务 http://www.cnblogs.com/xinhaijulan/archive/2010/08/22/1805768.html http://h2appy.blog.51cto.com/609721/184323 http://www.cnblogs.com/isdavid/archive/2012/08/16/2642867.html http://www.cnblogs.com/beniao/archive/2008/06/26/1229934.h

浅析腾讯云分布式高可靠消息队列服务CMQ架构

在分布式大行其道的今天,我们在系统内部.平台之间广泛运用消息中间件进行数据交换及解耦.CMQ是腾讯云内部自研基于的高可靠.强一致.可扩展分布式消息队列,在腾讯内部包括微信手机QQ业务红包.腾讯话费充值.广告订单等都有广泛使用.目前已上线腾讯云对外开放,本文对腾讯云CMQ核心技术原理进行分享介绍. CMQ消息队列主要适用于金融.交易.订单等对可靠性.可用性有较高要求的业务场景. 以腾讯充值系统为例,该充值系统通过CMQ 对交易模块.发货部分.结算系统进行异步解耦.削峰填谷,一方面大大降低了模块间耦

NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明

简单消息队列服务 HTTPSQS

HTTPSQS(HTTP?Simple?Queue?Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储. 队列(Queue)又称先进先出表(First In First Out),即先进入队列的元素,先从队列中取出.加入元素的一头叫"队头",取出元素的一头叫"队尾".利用消息队列可以很好地异步处理数据传送和存储,当你频繁地向数据库

使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,"消息队列"是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时

HTTP协议级消息队列服务组件UCMQ

UCMQ是一款轻量的HTTP协议级消息队列服务组件,项目的最初原型来自"张宴"的HTTPSQS. 基本特性: 支持标准的HTTP协议( GET/POST方法),支持长连接(keep-alive): 请求响应非常快速,入队列.出队列速度超过10000次/秒: 每个UCMQ实例支持多队列,队列通过操作接口自动创建: 单个队列默认限制存储100w未读消息,可以不限制(非必要建议限制): 可以在不停止服务的情况下便捷地修改单个队列的属性(大小限制/只读锁/延时队列/同步频率): 可以实时查看队

【转】NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明

消息队列服务Kafka揭秘:痛点、优势以及适用场景

摘要: 消息队列Kafka是一个分布式的.高吞吐量.高可扩展性消息队列服务,广泛用于日志收集.监控数据聚合.流式数据处理.在线和离线分析等,是大数据生态中不可或缺的产品之一,阿里云提供全托管服务,用户无需部署运维,更专业.更可靠.更安全.本文就将带你走进消息队列Kafka. 摘要:消息队列Kafka是一个分布式的.高吞吐量.高可扩展性消息队列服务,广泛用于日志收集.监控数据聚合.流式数据处理.在线和离线分析等,是大数据生态中不可或缺的产品之一,阿里云提供全托管服务,用户无需部署运维,更专业.更可