宋子宪博客

Szx

ActiveMQ发布订阅Topic中消息持久化模式小Demo
1.需要先运行一次消费者,等于像ActviMQ注册,等于订阅了2.然后再运行消息生产者3.主要运行过的消费者(订阅...
扫描右侧二维码阅读全文
19
2019/07

ActiveMQ发布订阅Topic中消息持久化模式小Demo

1.需要先运行一次消费者,等于像ActviMQ注册,等于订阅了
2.然后再运行消息生产者
3.主要运行过的消费者(订阅过),此时无论消费者是否在线,再此启动时都能收到消息,下次连接的时候也会把没有接收到的消息接收过来

消息消费者

/**
 * @author songzixian
 * @description Topic消费者
 */
public class JmsConsumer {

    public static final String ActiveMQ_URL="tcp://192.168.78.138:61616";
    public static final String Topic_NAME = "topic-szx";

      @Test
      public void test01() throws Exception{
        System.out.println("zhangsan订阅了");

        //1.创建连接工厂,按照指定的url地址,采用默认帐号密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);

        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();

        //用户wangwu订阅了
        connection.setClientID("wangwu");
        //3.创建会话session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic);
        Topic topic = session.createTopic(Topic_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"这是备注");

        //启动订阅
        connection.start();

        //设置等待
        Message message = topicSubscriber.receive();
        while(message != null){

            TextMessage textMessage = (TextMessage) message;
            //接收持久化订阅消息
            System.out.println("接收到了持久化topic:" + textMessage.getText());

            //设置持久化消息监听,不设置代表订阅者永久在线,可以设置指定时间例入设置3秒离线
            //message = topicSubscriber.receive("3000L");
            message = topicSubscriber.receive();
        }

      
        //关闭资源
        session.close();
        connection.close();


    }

}

再执行消息生产者

/**
 * @author songzixian
 * @create 2019-07-18 下午 6:20
 * @description Topic消息生产者
 */

public class JmsProduce_Topic  {

    public static final String ACTIVEMQ_URL = "tcp://192.168.78.138:61616";
    public static final String TOPIC_NAME="topic-szx";

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.创建会员session
        Connection connection = activeMQConnectionFactory.createConnection();


        //3.创建会员session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic)
        Topic topic = session.createTopic(TOPIC_NAME);

        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);

        //6设置订阅Topic消息的持久化
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        //启动
        connection.start();

        //7.通过使用messageProducer生产3条消息发送到MQ的列队里
        for (int i = 1;i<=5;i++){

            //8.通过messageProducer发送给mq
            TextMessage textMessage = (TextMessage) session.createTextMessage("topic-szx"+i);
            textMessage.setText("songzixian.com");

            messageProducer.send(textMessage);

        }


        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("topic_name消息发送到mq");

    }
}

Last modification:July 19th, 2019 at 04:25 am
如果觉得这篇技术文章对您有帮助,可以请博主喝一杯饮料

Leave a Comment