导包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
添加配置信息
application配置文件
# rocketMq地址
rocketmq.name-server=106.52.60.215:9876
# 生产者分组
rocketmq.producer.group=myGroup
rocketmq.producer.topics=topic1
# 消费者分组
rocketmq.consumer.group=myGroup
# topic
rocketmq.consumer.topics=topic1
# 表示顺序消费模式
rocketmq.consumer.consume-mode=ORDERLY
# 消费者的最大线程数,即消费消息的线程池大小。默认值为20,如果不需要处理大量的消息,可以将其调小。
rocketmq.consumer.consume-thread-max=1
# 表示每次消费消息的最大数量,即一次性消费的最大消息数。默认值为1,即每次只消费一条消息。如果需要批量消费消息,可以将其调大。但是需要注意的是,批量消费消息可能会影响消费的效率和消息的顺序性。
rocketmq.consumer.consume-message-batch-max-size=1
yml配置文件
rocketmq:
consumer:
consume-message-batch-max-size: 1
consume-mode: ORDERLY
consume-thread-max: 1
group: myGroup
topics: topic1
name-server: 106.52.60.215:9876
producer:
group: myGroup
topics: topic1
生产者发送消息
同步发现消息
在Spring Boot中,可以使用RocketMQTemplate
来发送消息。设置消息的延迟级别,可以使用RocketMQTemplate
的send(Message message, long timeout, int delayLevel)
方法,其中delayLevel
为延迟级别,单位为秒
RocketMQ
支持18个级别的延迟时间,分别为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* <p>
* Description: 消息生产者
* </p>
*
* @author songzixian
* @version v2.0.0
* @create 2023-03-11 21:10
* @see com.songzixain.controller
*/
@Slf4j
@RestController
public class MyProducer1 {
@Value("${rocketmq.producer.topics}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* description 同步发送延迟消息
*
* @param: []
* @return
* @Date 2023/3/11
*/
@GetMapping("syncSendTest")
public void sendDelayMsg() {
Blog blog = new Blog();
blog.setBlogName("宋子宪博客");
blog.setUrl("www.songzixian.com");
// delayTimeLevel代表延迟级别 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
int delayTimeLevel = 3;
Message<Blog> message = MessageBuilder.withPayload(blog)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel)
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 3000, delayTimeLevel);
log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
log.info("发送结果:{}", sendResult);
}
}
异步发送消息(推荐)
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* <p>
* Description: 消息生产者
* </p>
*
* @author songzixian
* @version v2.0.0
* @create 2023-03-11 11:34
* @see com.songzixain.controller
*/
@Slf4j
@RestController
public class DemoProducers {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* description 延迟消息发送
*
* @param: [user]
* @return
* @Date 2023/3/11
*/
@RequestMapping("/asyncSendTest")
public String asyncSendTest(){
Blog blog = new Blog();
blog.setBlogName("宋子宪博客");
blog.setUrl("www.songzixian.com");
// 构建消息体
Message<Blog> msg = MessageBuilder.withPayload(blog).build();
rocketMQTemplate.asyncSend("topic1", msg, new SendCallback() {
// 发送成功
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
}
// 发送失败
@Override
public void onException(Throwable throwable) {
log.info("消息发送失败,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
}
// ps:3 代表第三个延迟10s 延迟级别:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
},3000,3);
return "发送成功";
}
}
消息消费者
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* <p>
* Description: 消息生产者
* </p>
*
* @author songzixian
* @version v2.0.0
* @create 2023-03-11 11:34
* @see com.songzixain.controller
*/
@Slf4j
@RestController
public class MyProducer2 {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.topics}")
private String topic;
/**
* description 延迟消息发送
* 在上面的代码中,我们使用了RocketMQTemplate的syncSend方法来发送消息。
* 其中,第一个参数是消息的主题,第二个参数是消息内容,第三个参数是延迟时间(单位为毫秒)
* ,第四个参数是发送消息的重试次数。
* @param: [user]
* @return
* @Date 2023/3/11
*/
@RequestMapping("/asyncSendTest")
public String asyncSendTest(){
Blog blog = new Blog();
blog.setBlogName("宋子宪博客");
blog.setUrl("www.songzixian.com");
// 构建消息体
Message<Blog> msg = MessageBuilder.withPayload(blog).build();
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
// 发送成功
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
}
// 发送失败
@Override
public void onException(Throwable throwable) {
log.info("消息发送失败,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
}
// ps:3 代表第三个延迟10s 延迟级别:"messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
},3000,4);
return "发送成功";
}
}
启动测试
启动请求:http://localhost:8081/asyncSendTest
控制台打印
可以看到,消息生产者设置的延迟级别是3,对应延迟了10秒钟
延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
需要注意的是,顺序消费模式下,同一个消费者组内的消费者只会有一个线程消费同一个队列中的消息,这样才能保证消息的顺序性。
通过以上步骤,就可以使用RocketMQ实现消息延迟功能了。