ActiveMQ消息持久机制中的JDBC小Demo
首先需要在ActiveMQ中配置JDBC数据源:https://songzixian.com/linuxcmot/859.html
然后执行消息生产类,消息会保存到数据库中,当运行消费者时会将数据库中的消息消费掉,被消费掉的消息自动删除
消息生产者类
/**
* @author songzixian
* @description 持久化jdbc消息生产者类
*/
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.78.138:61616";
public static final String QUEUE_NAME= "queue01";
public static void main(String[] args) throws Exception{
//1.创建工厂,安装url地址采用默认账户密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接Connection
Connection connection = activeMQConnectionFactory.createConnection();
//启动
connection.start();
//3.创建会话session 进行手动签收 (有两个参数,1.事物 2.签收)
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息生产者
MessageProducer messageProducer = session.createProducer(queue);
//设置消息非持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//6.通过使用messageProducer生产3条消息发送到MQ队列中
for(int q= 1;q<=5;q++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg----"+q);//理解为一个字符串
//8.通过消息生产者发布消息
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
//提交事物
session.commit();
session.close();
connection.close();
System.out.println("消息发送成功!");
}
}
消息消费者类
/**
* @author songzixian
* @description 持久化jdbc消息消费者
*/
public class JmsConsumer {
public static final String ActiveMQ_URL="tcp://192.168.78.138:61616";
public static final String QUEUE_NAME = "queue01";
@Test
public void test01() throws Exception{
System.out.println("我是1号消费者");
//1.创建连接工厂,按照指定的url地址,采用默认帐号密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);
//2.通过连接工厂,获得连接connection
Connection connection = activeMQConnectionFactory.createConnection();
//启动
connection.start();
//3.创建会话session
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是列队还是主题topic);
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
TextMessage textMessage = (TextMessage) messageConsumer.receive(4000L);
if (textMessage != null){
System.out.println("消费者接收到了消息"+textMessage.getText());
//手动签收 注意:需要客户的调用acknwledge方法消息才不会被重复消费
//textMessage.acknowledge();
}else {
break;
}
}
//关闭资源
messageConsumer.close();
//提交事物
session.commit();
session.close();
connection.close();
}
}
当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »