前言 用来学习MQ使用的 demo场景本文环境CentOS7/8、JDK8、RocketMQ 5.1.4ACL1.0稳定版支持任意时间戳定时消息包含服务器离线安装、JVM内存调优、开启ACL账号密码、启动/停止/重启命令、防火墙放行端口、SpringBoot整合带密码连接、生产者发送定时消息、消费者消费并调用业务接口适配课程定时推送场景。一、服务器前置准备1. 环境要求JDK 1.8 已配置环境变量服务器开放端口9876(NameServer)、10911(Broker)# 防火墙放行端口firewall-cmd --add-port9876/tcp--permanentfirewall-cmd --add-port10911/tcp--permanentfirewall-cmd--reload# 查看端口是否放行firewall-cmd --list-ports2. 创建安装目录mkdir-p/usr/local/rocketmqcd/usr/local/rocketmq二、下载解压RocketMQ1. 下载二进制包官网下载地址https://rocketmq.apache.org/dowloading/releases选择rocketmq-all-5.1.4-bin-release.zip上传到服务器/usr/local/rocketmq目录2. 解压unziprocketmq-all-5.1.4-bin-release.zipmvrocketmq-all-5.1.4-bin-release rocketmq5.1.4cdrocketmq5.1.4# 定义环境变量临时生效永久写入/etc/profileexportROCKETMQ_HOME/usr/local/rocketmq/rocketmq5.1.4三、修改JVM内存低配服务器必改否则启动OOM1. 修改NameServer内存vimbin/runserver.sh# 找到JAVA_OPT修改Xms XmxJAVA_OPT${JAVA_OPT}-Xms256m -Xmx256m -Xmn128m2. 修改Broker内存vimbin/runbroker.shJAVA_OPT${JAVA_OPT}-Xms256m -Xmx256m -Xmn128m四、开启ACL密码鉴权设置账号密码步骤1开启Broker ACL开关vimconf/broker.conf# 文件末尾追加aclEnabletrue# 允许自动创建Topic测试环境autoCreateTopicEnabletrue步骤2配置账号、密码、权限plain_acl.ymlvimconf/plain_acl.yml完整配置业务账号仅允许发送/消费课程推送Topicaccounts:# 管理员账号运维使用-accessKey:mq_adminsecretKey:Admin2026#RmqwhiteRemoteAddress:127.0.0.1,服务器内网IPadmin:true# 业务应用账号Java项目连接使用用户名密码-accessKey:course_push_appsecretKey:Push666888whiteRemoteAddress:0.0.0.0# 所有IP可访问生产填业务服务IPadmin:falsedefaultTopicPerm:DENYdefaultGroupPerm:DENY# 仅允许操作课程推送Topic发布PUB 订阅SUBtopicPerms:-course_push_topicPUB|SUBgroupPerms:-course-push-consumer-groupSUBglobalWhiteRemoteAddresses:-127.0.0.1accessKey 用户名secretKey 密码修改完成保存重启Broker生效五、RocketMQ 启动/停止/重启完整命令进入mq根目录执行cd /usr/local/rocketmq/rocketmq5.1.41. 启动顺序先NameServer后Broker启动NameServer# 后台启动输出日志到nohup.outnohupshbin/mqnamesrv# 查看启动日志出现 boot success 代表成功tail-f~/logs/rocketmqlogs/namesrv.log启动Broker绑定本机IP# 替换为你的服务器公网/内网IPnohupshbin/mqbroker-n127.0.0.1:9876# 查看Broker日志tail-f~/logs/rocketmqlogs/broker.log2. 停止服务顺序先Broker后NameServer# 停止Brokershbin/mqshutdown broker# 停止NameServershbin/mqshutdown namesrv3. 重启流程# 1.停止shbin/mqshutdown brokershbin/mqshutdown namesrv# 2.等待5秒sleep5# 3.重新启动nohupshbin/mqnamesrvsleep3nohupshbin/mqbroker-n127.0.0.1:98764. 验证服务连通内置工具测试exportNAMESRV_ADDR127.0.0.1:9876# 发送测试消息未开ACL可用开启ACL会报错需代码带AK/SKshbin/tools.sh org.apache.rocketmq.example.quickstart.Producershbin/tools.sh org.apache.rocketmq.example.quickstart.Consumer六、SpringBoot Java Demo带ACL密码定时消息生产消费1. Maven依赖 pom.xml?xml version1.0 encodingUTF-8?projectxmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.15/versionrelativePath//parentgroupIdcom.demo/groupIdartifactIdrocketmq-acl-demo/artifactIdversion1.0.0/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependencies!-- SpringBoot Web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- RocketMQ Starter 带ACL支持 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.3/version/dependency!-- Redis 幂等防重复推送 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency!-- JSON序列化 --dependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.32/version/dependency/dependencies/project2. application.yml 配置核心配置mq地址ACL账号密码spring:application:name:rocketmq-course-pushredis:host:127.0.0.1port:6379# mq 配置rocketmq:# 服务器 NameServer 地址替换为你的服务器IPname-server:127.0.0.1:9876producer:group:course-push-producer-group# ACL账号密码plain_acl.yml配置的业务账号access-key:course_push_appsecret-key:Push666888send-message-timeout:5000consumer:access-key:course_push_appsecret-key:Push6668883. 消息实体类 CoursePushMsg.javapackagecom.demo.entity;importlombok.Data;DatapublicclassCoursePushMsg{privateLongcourseId;privateStringtitle;privateStringcontent;privateLongendTimeStamp;// 课程结束时间戳毫秒}4. 生产者服务发送定时消息 CoursePushProducer.javapackagecom.demo.service;importcom.alibaba.fastjson2.JSON;importcom.demo.entity.CoursePushMsg;importorg.apache.rocketmq.client.producer.SendMessageWithTimeStampRequest;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.nio.charset.StandardCharsets;ServicepublicclassCoursePushProducer{AutowiredprivateRocketMQTemplaterocketMQTemplate;privatestaticfinalStringTOPICcourse_push_topic;/** * 发送课程结束定时推送消息指定精确到期时间戳 * param msg 课程消息实体 */publicvoidsendCourseTimedMsg(CoursePushMsgmsg){byte[]bodyJSON.toJSONString(msg).getBytes(StandardCharsets.UTF_8);SendMessageWithTimeStampRequestrequestSendMessageWithTimeStampRequest.builder().topic(TOPIC).body(body).deliveryTimeStamp(msg.getEndTimeStamp()).build();try{rocketMQTemplate.syncSendWithTimestamp(request);System.out.println(定时消息发送成功课程IDmsg.getCourseId());}catch(Exceptione){e.printStackTrace();thrownewRuntimeException(发送MQ定时消息失败);}}}5. 模拟推送接口客户端 PushApiClient.javapackagecom.demo.client;importorg.springframework.stereotype.Component;importorg.springframework.web.client.RestTemplate;importjavax.annotation.Resource;ComponentpublicclassPushApiClient{ResourceprivateRestTemplaterestTemplate;/** * 业务推送接口根据课程ID批量推送学员消息 */publicvoidpushNotice(LongcourseId){// 替换为你自己的推送服务接口地址Stringurlhttp://127.0.0.1:8080/api/push/send?courseIdcourseId;StringresprestTemplate.postForObject(url,null,String.class);System.out.println(调用推送接口完成返回resp);}}6. 消费者监听消息、校验课程、调用推送接口 CoursePushConsumer.javapackagecom.demo.consumer;importcom.alibaba.fastjson2.JSON;importcom.demo.client.PushApiClient;importcom.demo.entity.CoursePushMsg;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Component;importjava.util.List;importjava.util.concurrent.TimeUnit;ComponentRocketMQMessageListener(topiccourse_push_topic,consumerGroupcourse-push-consumer-group)publicclassCoursePushConsumerimplementsMessageListenerConcurrently{AutowiredprivatePushApiClientpushApiClient;AutowiredprivateRedisTemplateString,StringredisTemplate;OverridepublicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContextcontext){for(MessageExtmsg:msgs){try{StringbodyStrnewString(msg.getBody());CoursePushMsgmsgDataJSON.parseObject(bodyStr,CoursePushMsg.class);LongcourseIdmsgData.getCourseId();longmsgScheduleTsmsg.getDeliveryTimestamp();longrealEndTsmsgData.getEndTimeStamp();// 1. 过滤旧消息修改课程结束时间产生的过期定时消息if(Math.abs(msgScheduleTs-realEndTs)60*1000){System.out.println(旧定时消息丢弃 courseId:courseId);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 2. Redis幂等防止重复推送StringredisKeypush:course:courseId;if(Boolean.TRUE.equals(redisTemplate.hasKey(redisKey))){System.out.println(该课程已推送跳过);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 3. 核心逻辑调用业务推送接口pushApiClient.pushNotice(courseId);// 4. 成功后写入幂等标记7天过期redisTemplate.opsForValue().set(redisKey,1,7,TimeUnit.DAYS);}catch(Exceptione){e.printStackTrace();// 消费异常MQ自动重试returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}7. 测试接口 Controller TestMqController.javapackagecom.demo.controller;importcom.demo.entity.CoursePushMsg;importcom.demo.service.CoursePushProducer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;importjava.time.LocalDateTime;importjava.time.ZoneOffset;importjava.time.format.DateTimeFormatter;RestControllerpublicclassTestMqController{AutowiredprivateCoursePushProducerproducer;/** * 测试发送定时推送消息 * 访问示例http://127.0.0.1:8080/send/push?courseId1001endTime2026-06-25 18:30:00 */GetMapping(/send/push)publicStringsendPushMsg(RequestParamLongcourseId,RequestParamStringendTime){DateTimeFormatterformatterDateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss);LocalDateTimelocalDateTimeLocalDateTime.parse(endTime,formatter);longtimeStamplocalDateTime.toInstant(ZoneOffset.of(8)).toEpochMilli();CoursePushMsgmsgnewCoursePushMsg();msg.setCourseId(courseId);msg.setTitle(课程已结束提醒);msg.setContent(本节课学习完成请完成课后作业);msg.setEndTimeStamp(timeStamp);producer.sendCourseTimedMsg(msg);return定时消息提交成功到期自动推送学员;}}8. 启动类 RocketMqApplication.javapackagecom.demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.web.client.RestTemplate;SpringBootApplicationpublicclassRocketMqApplication{publicstaticvoidmain(String[]args){SpringApplication.run(RocketMqApplication.class,args);}BeanpublicRestTemplaterestTemplate(){returnnewRestTemplate();}}七、业务场景说明课程修改结束时间解决方案新增课程保存课程endTime到数据库调用接口发送定时消息修改课程结束时间更新数据库endTime重新调用发送接口生成新定时消息旧消息到期消费时时间不匹配直接丢弃不会重复推送取消课程推送数据库增加pushStatus2标记消费时查询状态直接跳过推送逻辑八、常见踩坑问题连接报错ACL权限不足检查yml的access-key、secret-key和plain_acl.yml完全一致确认topicPerms配置了PUB|SUB权限定时消息不生效RocketMQ版本必须5.x以上才支持syncSendWithTimestamp任意时间戳定时服务器外网无法连接防火墙放行9876、10911端口broker启动时绑定公网IPnohup sh bin/mqbroker -n 服务器公网IP:9876 重复推送Redis幂等标记 数据库推送状态双重兜底启动内存不足修改runserver.sh、runbroker.sh的Xms/Xmx降低内存九、总结生产环境必须开启ACL设置账号密码禁止裸奔无鉴权RocketMQ定时消息无法删除采用「数据库存真实时间消费过滤旧消息」方案适配课程修改时间场景SpringBoot starter自动携带ACL鉴权信息无需手动编写RPC钩子开发极简启停顺序先NameServer后Broker停止顺序相反重启前必须完整关闭进程