宋子宪博客

Szx

ActiveMQ消费者和生成者启动顺序
场景一:先生产, 启动1号消费者能消费吗答:可以场景二: 先生产, 启动1号消费者能消费,然后在启动消费者2号,这...
扫描右侧二维码阅读全文
18
2019/07

ActiveMQ消费者和生成者启动顺序

场景一:先生产, 启动1号消费者能消费吗
答:可以

场景二: 先生产, 启动1号消费者能消费,然后在启动消费者2号,这时2号还能消费吗?
答:2号不可以消费,因为启动1号时消息已被消费完,列队中已经没有消息,所以2号不能再消费

场景三:先启动1号消费者,再启动2号消费者,然后再生成,请问1号和2号都可以消费吗?
答:1号和2号都能消费到,因为这时候 列队中没生成消息,所以先启动1号和2号再生成,这时候1号和2号顺序是一致的,所以这时候生产时,1号和2号消费各平均消费一半

先执行消息消费者

/**
 * @author songzixian
 * @description ActiveMQ消费者
 */
public class JmsConsumer {

    public static final String ActiveMQ_URL="tcp://192.168.78.131:61616";
    public static final String QUEUE_NAME = "queue01";

      @Test
      public void test01() throws Exception{
        System.out.println("我是1号消费者");

        //1.创建连接工厂,按照指定的url地址,采用默认帐号密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);

        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();
        //3.创建会话session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic);
        Queue queue = session.createQueue(QUEUE_NAME);


        //5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        //使用监听器,有消息就处理没消息就关掉
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //如果列队消息不等于空,则执行
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到了消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        System.in.read();


        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();

        /**
         * 先生产 只启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗
         */

    }

    @Test
    public void test02() throws Exception{
        System.out.println("我是2号消费者");

        //1.创建连接工厂,按照指定的url地址,采用默认帐号密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);

        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();
        //3.创建会话session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic);
        Queue queue = session.createQueue(QUEUE_NAME);


        //5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        //使用监听器,有消息就处理没消息就关掉
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //如果列队消息不等于空,则执行
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到了消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        System.in.read();


        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();

        /**
         * 先生产 只启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗
         */


    }
}

再执行消息生成者

/**
 * @author songzixian
 * @description
 */
public class JmsProduce {

    public static final  String ACTIVEMQ_URL = "tcp://192.168.78.131:61616";
    public static final  String QUEUE_NAME= "queue01";


    public static void main(String[] args) throws Exception{
        //1.创建工厂,安装url地址采用默认账户密码
        ActiveMQConnectionFactory activeMQConnectionFactory  = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2.通过连接工厂,获得连接Connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();

        //3.创建会话session (有两个参数,1.事物 2.签收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是队列还是主题)
         Queue queue = session.createQueue(QUEUE_NAME);

         //5.创建消息生产者
         MessageProducer messageProducer = session.createProducer(queue);
         //6.通过使用messageProducer生产3条消息发送到MQ队列中
         for(int  q= 1;q<=50;q++){

             //7.创建消息
             TextMessage textMessage = session.createTextMessage("msg----"+q);//理解为一个字符串
             //8.通过消息生产者发布消息
             messageProducer.send(textMessage);
         }

         //9.关闭资源
         messageProducer.close();
         session.close();
         connection.close();

        System.out.println("消息发送成功!");
    }
}
Last modification:July 18th, 2019 at 05:58 am
如果觉得这篇技术文章对您有帮助,可以请博主喝一杯饮料

Leave a Comment