博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ Message Groups
阅读量:4604 次
发布时间:2019-06-09

本文共 2581 字,大约阅读时间需要 8 分钟。

与Exclusive Consumer相比,Message Groups的对消息分组的粒度更细。具有相同groupId的消息会被投送到同一个消费者,除非这个消费者挂了。

代码示例:

Mesasge message = session.createTextMessage("
hey
");// 设置groupIdmessage.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");// 设置sequencemessage.setIntProperty("JMSXGroupSeq", -1);producer.send(message);

对应的代码在 org.apache.activemq.broker.region.Queue 中:

// 判断消息能否分发给消费者,返回true表示可以// Subscription 表示消费者,QueueMessageReference 表示消息protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node)             throws Exception {    // 默认为true    boolean result = true;    // Keep message groups together.    // 获取消息的"JMSXGroupID"属性    String groupId = node.getGroupID();    // 获取消息的"JMSXGroupSeq"属性    int sequence = node.getGroupSequence();    if (groupId != null) {        // MessageGroupMap是一个Map,键是groupId,值是消费者        MessageGroupMap messageGroupOwners = getMessageGroupOwners();        // If we can own the first, then no-one else should own the        // rest.        if (sequence == 1) {            assignGroup(subscription, messageGroupOwners, node, groupId);        } else {            // Make sure that the previous owner is still valid, we may            // need to become the new owner.            ConsumerId groupOwner;            // 根据groupId取出消费者            groupOwner = messageGroupOwners.get(groupId);            if (groupOwner == null) {                assignGroup(subscription, messageGroupOwners, node, groupId);            } else {                if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {                    // A group sequence < 1 is an end of group signal.                    if (sequence < 0) {                        messageGroupOwners.removeGroup(groupId);                        subscription.getConsumerInfo().                        setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);                    }                } else {                    result = false;                }            }        }    }    return result;}// 往MessageGroupMap中插入键值对protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners,                         MessageReference n, String groupId) throws IOException {    messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());    Message message = n.getMessage();    message.setJMSXGroupFirstForConsumer(true);    subs.getConsumerInfo().        setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);}

 

转载于:https://www.cnblogs.com/allenwas3/p/8671402.html

你可能感兴趣的文章
Bogart BogartAutoCode.vb
查看>>
hdu - 2266 How Many Equations Can You Find (简单dfs)
查看>>
UIView属性
查看>>
将博客搬至CSDN
查看>>
远程服务器git搭建
查看>>
牛人们的博客地址
查看>>
Zabbix是什么?
查看>>
源码:COCO微博
查看>>
面向对象预习随笔
查看>>
大数据概念炒作周期模型
查看>>
排序模型
查看>>
Dede推荐文章与热点文章不显示?
查看>>
React 3
查看>>
Topshelf 使用
查看>>
Linux --Apache服务搭建
查看>>
20145325张梓靖 实验三 "敏捷开发与XP实践"
查看>>
JavaScript面试题
查看>>
[转帖]架构师眼中的高并发架构
查看>>
ios的一些开源资源
查看>>
HTTP 错误 500.21 - Internal Server Error 解决方案
查看>>