ActiveMQ发布订阅者模式Topic小demo
Topic发布订阅模式,一条信息可以被多个消费者使用
先启动消费者1号和消费者2号
/**
* @author songzixian
* @description ActiveMQ消费者
*/
public class JmsConsumer {
public static final String ActiveMQ_URL="tcp://192.168.78.131:61616";
public static final String Topic_NAME = "topic-szx";
@Test
public void test01() throws Exception{
System.out.println("我是1号消费者");
//1.创建连接工厂,按照指定的url地址,采用默认帐号密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);
//2.通过连接工厂,获得连接connection
Connection connection = activeMQConnectionFactory.createConnection();
//启动
connection.start();
//3.创建会话session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是列队还是主题topic);
Topic topic = session.createTopic(Topic_NAME);
//5.创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//使用监听器,有消息就处理没消息就关掉
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果列队消息不等于空,则执行
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到了消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
/**
* 先生产 只启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗
*/
}
@Test
public void test02() throws Exception{
System.out.println("我是2号消费者");
//1.创建连接工厂,按照指定的url地址,采用默认帐号密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQ_URL);
//2.通过连接工厂,获得连接connection
Connection connection = activeMQConnectionFactory.createConnection();
//启动
connection.start();
//3.创建会话session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是列队还是主题topic);
Topic topic = session.createTopic(Topic_NAME);
//5.创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//使用监听器,有消息就处理没消息就关掉
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果列队消息不等于空,则执行
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到了消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
/**
* 先生产 只启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗
*/
}
}
再启动消息消费者
/**
* @author songzixian
* @create 2019-07-18 下午 6:20
* @description ActiveMQ消息生产者
*/
public class JmsProduce_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.78.131:61616";
public static final String TOPIC_NAME="topic-szx";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.创建会员session
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会员session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是列队还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//6.通过使用messageProducer生产3条消息发送到MQ的列队里
for (int i = 1;i<=5;i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("szx's blog-msg-----"+i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("topic_name消息发送到mq");
}
}
消费者1号
消费者2号