手动控制SpringBoot的RabbitMQ消费者
背景
使用@RabbitHandler
和@RabbitListener
注解并指定消息队列后,默认会在应用启动的时候立即就会去消费消息队列中的消息,然而在处理某些场景下,我们可能需要手动停止监听或者手动开始监听,比如我们在调试应用的时候,会经常在本地的IDE中启动项目,这样本地代码和线上代码就成为了两个消费者,这时如果再有消息到达队列,则两个本地代码和线上代码会以“交替”的方式并行消费消息,导致消息被本地代码消费掉。
这不是我们想要的结果。比较理想的方案是不让RabbitMQ的Listener随应用的启动自动监听,而是通过修改配置中心(像config server或ctrip apollo)或者配置文件的配置并立即让配置生效,通过这样的“热修改”方式来动态启动和停止监听器。
步骤
设置autoStartup为false,禁止监听器自启动
1 2 3 4 5 6
| @RabbitHandler @RabbitListener(queues = "MyQueue", autoStartup= "false", id = "tonyListener") public void sendNotice(Message message, Channel channel) throws IOException { ... ... }
|
根据listener ID获取对应容器,进行设置 stop/start
1 2 3 4 5 6 7 8 9
| @Resource private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer("tonyListener"); if(!container.isRunning()) { container.start(); log.info("started container"); }
|
附录
Apollo如何实现配置“热更新”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class MyMQListener implements ApplicationRunner { @Value("${mqListenerEnabled}") private boolean mqListenerEnabled; @Autowired private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @Override public void run(ApplicationArguments args) throws Exception { MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer("tonyListener"); if (mqListenerEnabled) { container.start(); } else { container.stop(); } } @ApolloConfigChangeListener private void someChangeHandler(ConfigChangeEvent changeEvent) { logger.info("配置中心的配置发生变化"); if(changeEvent.isChanged("mqListenerEnabled") { ConfigChange value = changeEvent.getChange(config); if(value.getChangeType().equals(PropertyChangeType.MODIFIED)) { String newValue = value.getNewValue().toString(); MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer(type); if ("true".equals(newValue)) { container.start(); } else { container.stop(); } } } } }
|
Spring Boot配置动态更新
SpringBoot想实现配置动态更新需要借助SpringClound的配置组件,使用这样的方式去构建工程,确切的说你的工程已经从Spring Boot
工程变成了Spring Cloud
工程。而引入SpringCloud的核心目的就是这个@RefreshScope
注解。被@RefreshScope
修饰的注解才会被动态刷新属性值。
添加依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies>
|
开启动态上下文刷新
1 2 3 4 5 6 7
| @Service @RefreshScope public class TestService { @Value("${switch.option}") private String switchOption; }
|
手动发送刷新请求
1
| curl -X POST http://localhost:8080/actuator/refresh
|
RabbitListenerEndpointRegistry常用方法
停止所有队列的监听
1 2 3 4 5 6 7
| @Autowired private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
public void stopAll() { this.rabbitListenerEndpointRegistry.stop(); }
|
获得监听容器监听了哪些队列
1
| Collection<MessageListenerContainer> listenerContainers = this.rabbitListenerEndpointRegistry.getListenerContainers();
|
停止指定的队列的监听
1 2
| MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer("监听器的ID"); container.stop();
|
参考