首先需要在ActiveMQ中配置JDBC数据源:https://songzixian.com/linuxcmot/859.html
然后执行消息生产类,消息会保存到数据库中,当运行消费者时会将数据库中的消息消费掉,被消费掉的消息自动删除
消息生产者类

/**
 * @author songzixian
 * @description 持久化jdbc消息生产者类
 */
public class JmsProduce {

    public static final  String ACTIVEMQ_URL = "tcp://192.168.78.138:61616";
    public static final  String QUEUE_NAME= "queue01";


    public static void main(String[] args) throws Exception{
        //1.创建工厂,安装url地址采用默认账户密码
        ActiveMQConnectionFactory activeMQConnectionFactory  = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2.通过连接工厂,获得连接Connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();

        //3.创建会话session 进行手动签收 (有两个参数,1.事物 2.签收)
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是队列还是主题)
         Queue queue = session.createQueue(QUEUE_NAME);

         //5.创建消息生产者
         MessageProducer messageProducer = session.createProducer(queue);
         //设置消息非持久化
         messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

         //6.通过使用messageProducer生产3条消息发送到MQ队列中
         for(int  q= 1;q<=5;q++){

             //7.创建消息
             TextMessage textMessage = session.createTextMessage("msg----"+q);//理解为一个字符串
             //8.通过消息生产者发布消息
             messageProducer.send(textMessage);

         }

         //9.关闭资源
         messageProducer.close();

         //提交事物
         session.commit();
         session.close();
         connection.close();

        System.out.println("消息发送成功!");
    }
}

消息消费者类

/**
 * @author songzixian
 * @description 持久化jdbc消息消费者
 */
public class JmsConsumer {

    public static final String ActiveMQ_URL="tcp://192.168.78.138:61616";
    public static final String QUEUE_NAME = "queue01";

      @Test
      public void test01() throws Exception{
        System.out.println("我是1号消费者");

        //1.创建连接工厂,按照指定的url地址,采用默认帐号密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);

        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动
        connection.start();
        //3.创建会话session
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

        //4.创建目的地(具体是列队还是主题topic);
        Queue queue = session.createQueue(QUEUE_NAME);


        //5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        while (true){
            TextMessage textMessage = (TextMessage) messageConsumer.receive(4000L);
            if (textMessage != null){
                System.out.println("消费者接收到了消息"+textMessage.getText());

                //手动签收 注意:需要客户的调用acknwledge方法消息才不会被重复消费
                //textMessage.acknowledge();
            }else {
                break;
            }
        }

        //关闭资源
        messageConsumer.close();
        //提交事物
        session.commit();
        session.close();
        connection.close();
    }
}

Last modification:July 21, 2019
如果觉得这篇技术文章对你有用,请随意赞赏