博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ - 初体验,探讨JMS通信模型
阅读量:6689 次
发布时间:2019-06-25

本文共 6619 字,大约阅读时间需要 22 分钟。

hot3.png

#1 前言 前一篇介绍了JMS有两种通信模型,一种是点对点通信,另一种是发布/订阅模型,本篇将会继续探讨这两种模型。本篇文章需要按照严谨的实验顺序才能获得相同的结果,这是因为消息持久化和持久订阅这两个特性的原因,在文章结尾和下一篇文章会做解答。** 所有的实验在启动之前都必须到管理后台删除相关的队列或者topic,否则数据也可能不同 **

在 中,由于设置了JMS通信端口的密码,所以下文在建立连接工厂时,都使用了密码。

为了方便,提取了一个专门创建connection的管理类。

public class ActiveMQManager {    private static ConnectionFactory connectionFactory;    static {        // connectionFactory = new ActiveMQConnectionFactory("admin", "123456", "tcp://192.168.88.18:61616");        connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");    }    public static Connection createConnection() throws JMSException {        return connectionFactory.createConnection();    }}

OK,下面开始正题吧。

#2 点对点模型 本节将实现消费者通过队列"test-queue"给消费者发布消息,消费者以异步方式获取消息并打印消息。先把消息生产者和消费者的代码贴一下。

##2.1 实验代码 ** 消息生产者 **:

public class Producer {    public static final String QUEUE_NAME = "test-queue";    public static void main(String[] args) {        System.out.println("Producer started!");        String message_body = "消息 : " + System.currentTimeMillis();        try {            //获取连接            Connection connection = ActiveMQManager.createConnection();            //启动连接            connection.start();            //开启会话,第一个参数指定是否使用事务,第二个参数指示消费者是否需要手动应答自己已经接收到消息            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);            //建立队列            Queue queue = session.createQueue(QUEUE_NAME);            //获取消息生产者对象            MessageProducer producer = session.createProducer(queue);            //建立消息对象            Message message = session.createTextMessage(message_body);            //发送消息            producer.send(message);            System.out.println("成功发送消息:" + message_body);            producer.close();            session.close();            connection.close();        } catch (Exception e) {            e.printStackTrace();        }        System.out.println("Producer end!");    }}

** 消息消费者 **:

public class Consumer {    public static void main(String[] args) throws IOException {        System.out.println("Consumer started!");        try {            //获取连接            Connection connection = ActiveMQManager.createConnection();            //启动连接            connection.start();            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            //建立队列,与消息生产者使用的队列名一致            Queue queue = session.createQueue(Producer.QUEUE_NAME);            //建立消费者            MessageConsumer consumer = session.createConsumer(queue);            //监听消息            consumer.setMessageListener(new MessageListener() {                @Override                public void onMessage(Message message) {                    try {                        TextMessage textMessage = (TextMessage) message;                        String text = textMessage.getText();                        System.out.println("Consumer 获取消息 ---->" + text);                    } catch (Exception e) {                        e.printStackTrace();                    }                }            });            // 线程一直等待            System.in.read();            consumer.close();            session.close();            connection.stop();        } catch (Exception e) {            e.printStackTrace();        }        System.out.println("Consumer end!");    }}

2.2 启动程序和小结论

** (1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者 **

消息可以被消费者正常接收。

** (2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者 ** 消息可以被消费者正常接收,通过查看消息内容,可以知道该消息是停止MQ服务之前发送的。

** (3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者 ** 消息可以被消费者正常接收。

看到这里,大家可能会感到奇怪,为什么MQ服务重启后,之前发布的消息仍然可以被后面启动的消费者收到呢?这与activeMQ的持久化消息机制有关。下一篇会对此做解答。

#3 发布/订阅模型 本节将定义两个消息订阅者,它们共同订阅名叫"test-topic"这个主题,消息生产者发布消息后,两个消费者都能接收到同一个消息。

##2.1 实验代码 下面是生产者和订阅者的代码。

** 消息生产者 **

public class Producer {    public static final String TOPIC_NAME = "test-topic";    public static void main(String[] args) {        System.out.println("Producer started!");        String message_body = "消息 : " + System.currentTimeMillis();        try {            //获取连接            Connection connection = ActiveMQManager.createConnection();            //启动连接            connection.start();            //开启会话,第一个参数指定是否使用事务,第二个参数指示消费者是否需要手动应答自己已经接收到消息            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);            //建立话题            Topic topic = session.createTopic(TOPIC_NAME);            //获取消息生产者对象            MessageProducer producer = session.createProducer(topic);            //建立消息对象            Message message = session.createTextMessage(message_body);            //发送消息            producer.send(message);            System.out.println("成功发送消息:" + message_body);            producer.close();            session.close();            connection.close();        } catch (Exception e) {            e.printStackTrace();        }        System.out.println("Producer end!");    }}

** 消息订阅者 ** :

public class Subscriber1 {    public static void main(String[] args) throws IOException {        System.out.println("Subscriber1 started!");        try {            //获取连接            Connection connection = ActiveMQManager.createConnection();            //启动连接            connection.start();            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            //建立话题            Topic topic = session.createTopic(Producer.TOPIC_NAME);            //建立消费者            MessageConsumer consumer = session.createConsumer(topic);            //监听消息            consumer.setMessageListener(new MessageListener() {                @Override                public void onMessage(Message message) {                    try {                        TextMessage textMessage = (TextMessage) message;                        String text = textMessage.getText();                        System.out.println("Subscriber1 获取消息 ---->" + text);                    } catch (Exception e) {                        e.printStackTrace();                    }                }            });            // 线程一直等待            System.in.read();            consumer.close();            session.close();            connection.stop();        } catch (Exception e) {            e.printStackTrace();        }        System.out.println("Subscriber1 end!");    }}

3.2 启动程序和小结论

** (1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动两个消息订阅者 **

消息订阅者没有接收到任何消息。

** (2)顺序:启动MQ broker服务 -> 启动两个消息订阅者 -> 启动消息发布者 **

两个消息订阅者都可以被消费者正常接收。

** (3)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动两个消息订阅者 **

wtf!消息没有接收到,如果是重要的消息,那岂不是要哭死了。

对于第三条,由于MQ服务重启导致订阅者消息获取不到,activeMQ提供了解决方法,即持久主题订阅(durable topic subscription)机制,嗯,下一篇将会对该机制详细解读。

代码

转载于:https://my.oschina.net/thinwonton/blog/888627

你可能感兴趣的文章
oo第三次博客作业
查看>>
人工智能简介
查看>>
PAT (Advanced Level) 1075. PAT Judge (25)
查看>>
08. Web大前端时代之:HTML5+CSS3入门系列~H5 Web存储
查看>>
MongoDB复制
查看>>
jdk1.8-LinkedList源码分析
查看>>
【转】Linux世界驰骋——文件系统和设备管理
查看>>
Arcgis 抽稀矢量数据
查看>>
BZOJ 3524主席树裸题 (雾)
查看>>
IO多路复用
查看>>
爬取抽屉热搜榜文章
查看>>
MySQL 之【视图】【触发器】【存储过程】【函数】【事物】【数据库锁】【数据库备份】...
查看>>
杭电ACM--2008数值统计
查看>>
面向对象复习
查看>>
hibernate 异常
查看>>
实现一个圆形进度条
查看>>
多线程(初级篇)
查看>>
验证码识别技术 Captcha Decode Technology
查看>>
Window下通过charles代理抓取iphone/android手机Https请求乱码问题处理
查看>>
优化SQl语句的十个重要步骤
查看>>