ActiveMQ发布订阅Topic中消息持久化模式小Demo
1.需要先运行一次消费者,等于像ActviMQ注册,等于订阅了
2.然后再运行消息生产者
3.主要运行过的消费者(订阅过),此时无论消费者是否在线,再此启动时都能收到消息,下次连接的时候也会把没有接收到的消息接收过来
消息消费者
/**
* @author songzixian
* @description Topic消费者
*/
public class JmsConsumer {
public static final String ActiveMQ_URL="tcp://192.168.78.138:61616";
public static final String Topic_NAME = "topic-szx";
@Test
public void test01() throws Exception{
System.out.println("zhangsan订阅了");
//1.创建连接工厂,按照指定的url地址,采用默认帐号密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);
//2.通过连接工厂,获得连接connection
Connection connection = activeMQConnectionFactory.createConnection();
//用户wangwu订阅了
connection.setClientID("wangwu");
//3.创建会话session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是列队还是主题topic);
Topic topic = session.createTopic(Topic_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"这是备注");
//启动订阅
connection.start();
//设置等待
Message message = topicSubscriber.receive();
while(message != null){
TextMessage textMessage = (TextMessage) message;
//接收持久化订阅消息
System.out.println("接收到了持久化topic:" + textMessage.getText());
//设置持久化消息监听,不设置代表订阅者永久在线,可以设置指定时间例入设置3秒离线
//message = topicSubscriber.receive("3000L");
message = topicSubscriber.receive();
}
//关闭资源
session.close();
connection.close();
}
}
再执行消息生产者
/**
* @author songzixian
* @create 2019-07-18 下午 6:20
* @description Topic消息生产者
*/
public class JmsProduce_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.78.138:61616";
public static final String TOPIC_NAME="topic-szx";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.创建会员session
Connection connection = activeMQConnectionFactory.createConnection();
//3.创建会员session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是列队还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//6设置订阅Topic消息的持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//启动
connection.start();
//7.通过使用messageProducer生产3条消息发送到MQ的列队里
for (int i = 1;i<=5;i++){
//8.通过messageProducer发送给mq
TextMessage textMessage = (TextMessage) session.createTextMessage("topic-szx"+i);
textMessage.setText("songzixian.com");
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("topic_name消息发送到mq");
}
}