侧边栏壁纸
博主头像
coydone博主等级

记录学习,分享生活的个人站点

  • 累计撰写 306 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

ActiveMQ高级特性

coydone
2022-06-16 / 0 评论 / 0 点赞 / 624 阅读 / 10,341 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-01,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

异步投递及确认成功

同步发送

ActiveMQ官方说异步发送是很多模式下默认的传输方式,但是在发送非事物持久化消息的时候默认使用的是同步发送模式。同步发送时,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者,这个确认消息暗示生产者 broker 已经成功地将它发送的消息路由到目标目的并把消息保存到二级存储中。

同步发送持久消息能够提供更好的可靠性,但这潜在地影响了程序的响应速度,因为在接受到 broker 的确认消息之前应用程序或线程会被阻塞。如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。

异步发送

使用不同的模式对send 方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ 吞吐量的重要因素,使用异步发送可以提高系统的性能。在默认大多数情况下,AcitveMQ 是以异步模式发送消息。例外的情况:在没有使用事务的情况下,生产者以PERSISTENT 传送模式发送消息。在这种情况下,send 方法都是同步的,并且一直阻塞直到ActiveMQ 发回确认消息:消息已经存储在持久性数据存储中。这种确认机制保证消息不会丢失,但会造成生产者阻塞从而影响反应时间。高性能的程序一般都能容忍在故障情况下丢失少量数据。如果编写这样的程序,可以通过使用异步发送来提高吞吐量(甚至在使用PERSISTENT 传送模式的情况下)。

设置异步发送:

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("ACTIVEMQ_URL");
activeMQConnectionFactory.setUseAsyncSend(true);

异步发送丢失消息的场景

生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果服务端突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。正确的异步发送方法是需要接收回调的。

producer.send有带上AsyncCallback的方法。该方法中需要重写onSuccess方法和onException方法。onSuccess方法就是表示这条消息成功发送到MQ上,并接收到了MQ持久化后的回调。onException表示MQ返回一个入队异常的回执。在上面的示例中用的是CountDownLatch类在onSuccess中记录。主要是因为onSuccess方法中只能引用final对象。

代码测试:

public class TestMessageProducer {
    private static final String BROKER_URL = "tcp://192.168.81.130:61616";
    //private static final String BROKER_URL = "tcp://192.168.81.130:61616,tcp://192.168.81.130:61617,tcp://192.168.81.130:61618";
    private static final String QUEUE_NAME = "queue-test";
    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
        ((ActiveMQConnectionFactory) factory).setAlwaysSyncSend(true);//开启异步发送
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);

        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
        System.out.println(producer.getClass().getSimpleName());
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        TextMessage message = session.createTextMessage("我是一个消息----helo");
        //        producer.send(message); //因为上面的持久的方式,所以这种试用方式默认是使用同步模式

        producer.send(queue, message, new AsyncCallback() {
            @Override
            public void onSuccess() {
                System.out.println("消费发送成功,所处线程:"+Thread.currentThread().getName());
            }

            @Override
            public void onException(JMSException e) {
                System.out.println("消费发送失败,所处线程:"+Thread.currentThread().getName());
            }
        });
        System.out.println("主程序所处线程为:"+Thread.currentThread().getName());
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送成功");
    }
}

延迟投递和定时投递

概述

5.4版的ActiveMQ在ActiveMQ消息代理中内置了一个可选的持久性调度程序。通过在“ Xml配置”中将broker schedulerSupport属性设置为true 可以启用此功能。ActiveMQ客户端可以通过使用以下消息属性来利用延迟传递。

http://activemq.apache.org/delay-and-schedule-message-delivery.html

属性说明

测试案例:延时20秒

TextMessage message = session.createTextMessage("我是一个消息----helo");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,20000);
producer.send(message);

测试案例:延时10秒,投递10次,间隔5秒

TextMessage message = session.createTextMessage("我是一个消息----helo");
//多少毫秒后入队
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10000);
//隔多少毫秒再投
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,5000);
//总共投多少次
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,10);
producer.send(message);

测试案例:使用CRON表达式

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message);

其它说明

1、CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒。

2、此处Cron是Unix系统中任务调度器,它使用一个字符串来表示一个任务何时需要被执行。而不是Quartz里边的那个Cron表达式。

消费重试机制

ActiveMQ会在什么情况下重新发送消息?

ActiveMQ中的消息重发,指的是消息可以被broker重新分派给消费者,不一定的之前的消费者。重发消息之后,消费者可以重新消费。消息重发的情况有以下几种。

1、事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发。

2、使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发。

3、所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发。

4、消息被消费者拉取之后,超时没有响应ack,消息会被broker重发

重发指的是消息经过broker重新进行转发给消费者,经过测试,1和2的情况消息重发会发送给原来的消费者,3和4可以转发消息给别的消费者。累计次数超过设置的maximumRedeliveries时消息都会都会进入死信队列。

消息的重发时间间隔和重发次数:间隔 1,次数 6 (6次之后进入死信队列)。

1、collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15。

2、maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认值为6。

3、maximumRedeliveryDelay:最大传送延迟,只在useExponentialBackOff为true时有效(V5.5) ,假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。

4、initialRedeliveryDelay:初始重发延迟时间,默认1000L。

5、redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效, 默认1000L。

6、useCollisionAvoidance:启用防止冲突功能,默认false。

7、useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false。

8、backoffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。默认是5。

Java配置

public static void main(String[] args) throws JMSException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    // 是否在每次尝试重新发送失败后,增长这个等待时间
    redeliveryPolicy.setUseExponentialBackOff(true);
    // 重发次数,默认为6次 这里设置为1次
    redeliveryPolicy.setMaximumRedeliveries(2);
    // 重发时间间隔,默认为1秒
    redeliveryPolicy.setMaximumRedeliveries(1000);
    // 第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
    redeliveryPolicy.setBackOffMultiplier(2);
    // 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为
    // 20ms,
    // 第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
    redeliveryPolicy.setMaximumRedeliveryDelay(1000);
    connectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
    Queue queue = session.createQueue(QUEUE_NAME);
    MessageConsumer consumer = session.createConsumer(queue);

    TextMessage message = (TextMessage) consumer.receive(1000);
    System.out.println("消费者接收到消息:"+message.getText());
    consumer.close();
    session.close();
    connection.close();
    System.out.println("消息消费成功");
}

SpringBoot配置:在配置类中

@Bean
@Primary
public ActiveMQConnectionFactory activeMQConnectionFactory(){
    ActiveMQConnectionFactory acticeMQConnectionFactory = new ActiveMQConnectionFactory(username,password,borkerUrl);
    acticeMQConnectionFactory.setUseAsyncSend(true);//同步还是异步
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    // 是否在每次尝试重新发送失败后,增长这个等待时间
    redeliveryPolicy.setUseExponentialBackOff(true);
    // 重发次数,默认为6次 这里设置为1次
    redeliveryPolicy.setMaximumRedeliveries(2);
    // 重发时间间隔,默认为1秒
    redeliveryPolicy.setMaximumRedeliveries(1000);
    // 第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
    redeliveryPolicy.setBackOffMultiplier(2);
    // 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为
    // 20ms,
    // 第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
    redeliveryPolicy.setMaximumRedeliveryDelay(1000);
    connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
}

死信队列

简介

官方介绍:http://activemq.apache.org/message-redelivery-and-dlq-handling.html。

DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息。

就是一条消息现次被重发了多次后(默认6次),将会被activemq移入"死信队列"。开发人员可以在这个Queue中查看处理出错的消息,进行人工处理。

开发中的情况

一般生产环境中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。

核心业务队列,就是处理正常的消息,另一个就是处理处理异常情况的。

由上图中可以看到,在第三方物流系统故障期间,所有的订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程。监控第三方物流系统是否正常。能否请求。不停的监视。一旦发现对方恢复正常。这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。

配置方式

activemq.xml

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="DLQ."
                                                  useQueueForQueueMessages="true" />
                </deadLetterStrategy>
            </policyEntry>
            <policyEntry topic=">" >
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

配置说明:

queuePrefix==代表死信队列的前缀 最后的名字为DLQ.队列名。

userQueueForTopicMessages 表示是否把Topic的DealLetter保存到Queue中,默认为true。

userQueueForQueneMessages 表示是否把Queue的DealLetter保存到Queue中,默认为true。

重试多次,看失败6次之后进入死信队列的名字

防止重复调用

官网文档:http://activemq.apache.org/redelivery-policy。

网络延时传输中,会造成进行MQ重试。在重试过程中。可能会有重复消费的问题。

如果消息是做数据库的插入操作。给这个消息做一个唯一主键,那么就算出理重复消费的情况。就会有主键冲突,避免数据库出现脏数据。

如果上面两种情况还不行,准备一个第三方服务来做消费记录。如redis,给消息分配一个全局的ID,只要消费过该消息,把<id,Message>发K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录就可以了。

ActiveMQ应用场景

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行、并行。

a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

a、可以控制活动的人数。

b、可以缓解短时间内高流量压垮应用。

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:

日志采集客户端,负责日志数据采集,定时写受写入Kafka队列。

Kafka消息队列,负责日志数据的接收,存储和转发。

日志处理应用:订阅并消费kafka队列中的日志数据 。

消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

点对点通讯:

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

0

评论区