宋子宪博客

ActiveMQ发布订阅者模式Topic小demo

Topic发布订阅模式,一条信息可以被多个消费者使用
先启动消费者1号和消费者2号

/**
 * @author songzixian
 * @description ActiveMQ消费者
 */
public class JmsConsumer {

    public static final String ActiveMQ_URL="tcp://192.168.78.131:61616";
    public static final String Topic_NAME = "topic-szx";

      @Test
      public void test01() throws Exception{
        System.out.println("我是1号消费者");

        //1.创建连接工厂,按照指定的url地址,采用默认帐号密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);

        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();
        //3.创建会话session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic);
        Topic topic = session.createTopic(Topic_NAME);


        //5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);

        //使用监听器,有消息就处理没消息就关掉
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //如果列队消息不等于空,则执行
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到了消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        System.in.read();


        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();

        /**
         * 先生产 只启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗
         */

    }

    @Test
    public void test02() throws Exception{
        System.out.println("我是2号消费者");

        //1.创建连接工厂,按照指定的url地址,采用默认帐号密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);

        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();
        //3.创建会话session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic);
        Topic topic = session.createTopic(Topic_NAME);


        //5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);

        //使用监听器,有消息就处理没消息就关掉
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //如果列队消息不等于空,则执行
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到了消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        System.in.read();


        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();

        /**
         * 先生产 只启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗
         */

    }
}

再启动消息消费者

/**
 * @author songzixian
 * @create 2019-07-18 下午 6:20
 * @description ActiveMQ消息生产者
 */

public class JmsProduce_Topic  {

    public static final String ACTIVEMQ_URL = "tcp://192.168.78.131:61616";
    public static final String TOPIC_NAME="topic-szx";

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.创建会员session
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3.创建会员session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic)
        Topic topic = session.createTopic(TOPIC_NAME);

        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);

        //6.通过使用messageProducer生产3条消息发送到MQ的列队里
        for (int i = 1;i<=5;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("szx's blog-msg-----"+i);
            //8.通过messageProducer发送给mq
            messageProducer.send(textMessage);

        }

        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("topic_name消息发送到mq");

    }
}

消费者1号

消费者2号

当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »