#1 前言 前一篇介绍了JMS有两种通信模型,一种是点对点通信,另一种是发布/订阅模型,本篇将会继续探讨这两种模型。本篇文章需要按照严谨的实验顺序才能获得相同的结果,这是因为消息持久化和持久订阅这两个特性的原因,在文章结尾和下一篇文章会做解答。** 所有的实验在启动之前都必须到管理后台删除相关的队列或者topic,否则数据也可能不同 **
在 中,由于设置了JMS通信端口的密码,所以下文在建立连接工厂时,都使用了密码。
为了方便,提取了一个专门创建connection的管理类。
public class ActiveMQManager { private static ConnectionFactory connectionFactory; static { // connectionFactory = new ActiveMQConnectionFactory("admin", "123456", "tcp://192.168.88.18:61616"); connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); } public static Connection createConnection() throws JMSException { return connectionFactory.createConnection(); }}
OK,下面开始正题吧。
#2 点对点模型 本节将实现消费者通过队列"test-queue"给消费者发布消息,消费者以异步方式获取消息并打印消息。先把消息生产者和消费者的代码贴一下。
##2.1 实验代码 ** 消息生产者 **:
public class Producer { public static final String QUEUE_NAME = "test-queue"; public static void main(String[] args) { System.out.println("Producer started!"); String message_body = "消息 : " + System.currentTimeMillis(); try { //获取连接 Connection connection = ActiveMQManager.createConnection(); //启动连接 connection.start(); //开启会话,第一个参数指定是否使用事务,第二个参数指示消费者是否需要手动应答自己已经接收到消息 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //建立队列 Queue queue = session.createQueue(QUEUE_NAME); //获取消息生产者对象 MessageProducer producer = session.createProducer(queue); //建立消息对象 Message message = session.createTextMessage(message_body); //发送消息 producer.send(message); System.out.println("成功发送消息:" + message_body); producer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Producer end!"); }}
** 消息消费者 **:
public class Consumer { public static void main(String[] args) throws IOException { System.out.println("Consumer started!"); try { //获取连接 Connection connection = ActiveMQManager.createConnection(); //启动连接 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列,与消息生产者使用的队列名一致 Queue queue = session.createQueue(Producer.QUEUE_NAME); //建立消费者 MessageConsumer consumer = session.createConsumer(queue); //监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Consumer 获取消息 ---->" + text); } catch (Exception e) { e.printStackTrace(); } } }); // 线程一直等待 System.in.read(); consumer.close(); session.close(); connection.stop(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Consumer end!"); }}
2.2 启动程序和小结论
** (1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者 **
消息可以被消费者正常接收。** (2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者 ** 消息可以被消费者正常接收,通过查看消息内容,可以知道该消息是停止MQ服务之前发送的。
** (3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者 ** 消息可以被消费者正常接收。
看到这里,大家可能会感到奇怪,为什么MQ服务重启后,之前发布的消息仍然可以被后面启动的消费者收到呢?这与activeMQ的持久化消息机制有关。下一篇会对此做解答。
#3 发布/订阅模型 本节将定义两个消息订阅者,它们共同订阅名叫"test-topic"这个主题,消息生产者发布消息后,两个消费者都能接收到同一个消息。
##2.1 实验代码 下面是生产者和订阅者的代码。
** 消息生产者 **
public class Producer { public static final String TOPIC_NAME = "test-topic"; public static void main(String[] args) { System.out.println("Producer started!"); String message_body = "消息 : " + System.currentTimeMillis(); try { //获取连接 Connection connection = ActiveMQManager.createConnection(); //启动连接 connection.start(); //开启会话,第一个参数指定是否使用事务,第二个参数指示消费者是否需要手动应答自己已经接收到消息 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //建立话题 Topic topic = session.createTopic(TOPIC_NAME); //获取消息生产者对象 MessageProducer producer = session.createProducer(topic); //建立消息对象 Message message = session.createTextMessage(message_body); //发送消息 producer.send(message); System.out.println("成功发送消息:" + message_body); producer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Producer end!"); }}
** 消息订阅者 ** :
public class Subscriber1 { public static void main(String[] args) throws IOException { System.out.println("Subscriber1 started!"); try { //获取连接 Connection connection = ActiveMQManager.createConnection(); //启动连接 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立话题 Topic topic = session.createTopic(Producer.TOPIC_NAME); //建立消费者 MessageConsumer consumer = session.createConsumer(topic); //监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Subscriber1 获取消息 ---->" + text); } catch (Exception e) { e.printStackTrace(); } } }); // 线程一直等待 System.in.read(); consumer.close(); session.close(); connection.stop(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Subscriber1 end!"); }}
3.2 启动程序和小结论
** (1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动两个消息订阅者 **
消息订阅者没有接收到任何消息。** (2)顺序:启动MQ broker服务 -> 启动两个消息订阅者 -> 启动消息发布者 **
两个消息订阅者都可以被消费者正常接收。** (3)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动两个消息订阅者 **
wtf!消息没有接收到,如果是重要的消息,那岂不是要哭死了。
对于第三条,由于MQ服务重启导致订阅者消息获取不到,activeMQ提供了解决方法,即持久主题订阅(durable topic subscription)机制,嗯,下一篇将会对该机制详细解读。