手动控制SpringBoot的RabbitMQ消费者

背景

使用@RabbitHandler和@RabbitListener注解并指定消息队列后,默认会在应用启动的时候立即就会去消费消息队列中的消息,然而在处理某些场景下,我们可能需要手动停止监听或者手动开始监听,比如我们在调试应用的时候,会经常在本地的IDE中启动项目,这样本地代码和线上代码就成为了两个消费者,这时如果再有消息到达队列,则两个本地代码和线上代码会以“交替”的方式并行消费消息,导致消息被本地代码消费掉。

这不是我们想要的结果。比较理想的方案是不让RabbitMQ的Listener随应用的启动自动监听,而是通过修改配置中心(像config server或ctrip apollo)或者配置文件的配置并立即让配置生效,通过这样的“热修改”方式来动态启动和停止监听器。

步骤

设置autoStartup为false,禁止监听器自启动

1
2
3
4
5
6
@RabbitHandler
@RabbitListener(queues = "MyQueue", autoStartup= "false", id = "tonyListener")
public void sendNotice(Message message, Channel channel) throws IOException
{
... ...
}

根据listener ID获取对应容器,进行设置 stop/start

1
2
3
4
5
6
7
8
9
@Resource
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer("tonyListener");
if(!container.isRunning())
{
container.start();
log.info("started container");
}

附录

Apollo如何实现配置“热更新”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class MyMQListener implements ApplicationRunner 
{
// 在application.properties文件中配置mqListenerEnabled为true或false作为默认的配置
@Value("${mqListenerEnabled}")
private boolean mqListenerEnabled;

@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;


@Override
public void run(ApplicationArguments args) throws Exception
{
MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer("tonyListener");
if (mqListenerEnabled)
{
container.start();
}
else
{
container.stop();
}
}

// 监听apollo的修改
@ApolloConfigChangeListener
private void someChangeHandler(ConfigChangeEvent changeEvent)
{
logger.info("配置中心的配置发生变化");
if(changeEvent.isChanged("mqListenerEnabled") // 这里的mqListenerEnabled是Apollo中的一个配置
{
ConfigChange value = changeEvent.getChange(config);
if(value.getChangeType().equals(PropertyChangeType.MODIFIED)) // 如果是配置项发生变化
{
String newValue = value.getNewValue().toString(); // 获得变化后的新值

MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer(type);
if ("true".equals(newValue))
{
container.start();
}
else
{
container.stop();
}
}
}
}
}

Spring Boot配置动态更新

SpringBoot想实现配置动态更新需要借助SpringClound的配置组件,使用这样的方式去构建工程,确切的说你的工程已经从Spring Boot工程变成了Spring Cloud工程。而引入SpringCloud的核心目的就是这个@RefreshScope注解。被@RefreshScope修饰的注解才会被动态刷新属性值。

添加依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

开启动态上下文刷新

1
2
3
4
5
6
7
@Service
@RefreshScope
public class TestService
{
@Value("${switch.option}")
private String switchOption;
}

手动发送刷新请求

1
curl -X POST http://localhost:8080/actuator/refresh

RabbitListenerEndpointRegistry常用方法

停止所有队列的监听

1
2
3
4
5
6
7
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

public void stopAll()
{
this.rabbitListenerEndpointRegistry.stop();
}

获得监听容器监听了哪些队列

1
Collection<MessageListenerContainer> listenerContainers = this.rabbitListenerEndpointRegistry.getListenerContainers();

停止指定的队列的监听

1
2
MessageListenerContainer container = rabbitListenerEndpointRegistry.getListenerContainer("监听器的ID");
container.stop();

参考