Apache Camel 路由消息时去重

使用Apache Camel进行消息去重

题图:from Zoommy

为什么要消息去重

虚拟场景:

财务系统靠人力系统的通知向你发放薪水,正常情况是每月一次。不过,某天人力系统因为某些原因,重复发送了通知,(可能是以为财务系统没有收到通知,也可能是其他原因)。你以为你会收到两份薪水?

别逗了,公司不会允许这种事情发生的。财务系统针对同一个通知(3月发给你工资),不论收到几次,都只会处理一次。

这么做的原因显而易见,而这样的财务系统就是一个幂等消费者。

幂等消费者,Idempotent Consumer

什么是Idempotent Consumer?这是EIPs中的一个概念,指代幂等消息消费者,也即对于重复消息的处理应该有幂等的结果。

这就涉及到了消息消费者如何处理重复消息的问题。EIPs中只有一系列需要解决的系统整合问题的定义,而Apache Camel却是实实在在提供了几乎所有这些问题的解决方案。

针对幂等消费者的问题,Apache Camel提供了IdempotentConsumer类作为解决方案。

IdempotentConsumer类

Apache Camel提供idempotentConsumer类来解决幂等消费者的问题,这个类的构造函数通常需要两个参数,一个唯一ID的表达式,一种存储方式IdempotentRepository(redis,jpa等)。唯一的Message Exchange ID用来标识消息身份。而存储方式就是如何存储这些消息ID。

构造函数举例如下:

1
2
3
4
5
6
// 可见,该构造方法只需要一个MessageID表达式和一种用来存储MessageID的存储方式
public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository);
addOutput(answer);
return answer;
}

实际上,他工作的原理更像是一个消息过滤器。收到消息后去IdempotentRepository中查找Message ID是否已经存在,如果不存在,就会把Message ID放入IdempotentRepository,并且继续处理消息,如果消息处理失败则会再把Message ID从IdempotentRepository中移除。

默认IdempotentCusumer会丢弃重复消息,但是Camel允许我们自定义处理重复消息。具体方式可以参考官网,链接见文末。

Apache Camel提供了多种IdempotentRepository以供选择,常用的有:

  1. MemoryIdempotentRepository 基于内存
  2. RedisStringIdempotentRepository 基于Redis
  3. JpaMessageIdRepository 基于Jpa
  4. JdbcMessageIdRepository 基于Jdbc

更多请参见官网,链接见文末。

使用RedisStringIdempotentRepository进行去重

使用redis需要添加依赖compile("org.apache.camel:camel-spring-redis:2.17.0")

该依赖提供两个类RedisStringIdempotentRepository和RedisIdempotentRepository。前者是以<String,String>类型存入redis,后者是<String, Set>类型存入。前者支持设置超时时间。两者的构造方法都很简单,只需要提供一个RedisTemplate<String, String>对象和一个处理进程名。他们会根据处理进程名与Message ID生成key存储在redis中。

配置示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Camel路由配置
*/
@Configuration
public class CamelConfiguration {
@Resource
private RedisTemplate redisTemplate;
@Bean
RouteBuilder myRouter() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// 使用idempotent和camel-redis组件进行去重
// 设置处理进程名为camel-idem
RedisStringIdempotentRepository idempotentRepo = new RedisStringIdempotentRepository(redisTemplate, "camel-idem");
// 设置超时为20秒
idempotentRepo.setExpiry(20);
// 根据taskId和tdrId去重
// 将unique头设置为FINISH+taskId的值+tdrId的值
// 将unique头作为Message ID进行去重
from("jms:queue:" + Destination.TASK_FINISH_PUBLISH)
.setHeader("unique")
.simple("FINISH${body[taskId]}${body[tdrId]}")
.idempotentConsumer(header("unique"), idempotentRepo)
.to(
"jms:queue:" + Destination.CONSUMER_TASK_TASK_FINISH,
"jms:queue:" + Destination.CONSUMER_GROWTH_TASK_FINISH,
"jms:queue:" + Destination.CONSUMER_APPINFO_TASK_FINISH
);
// 根据taskId去重
from("jms:queue:" + Destination.TASK_UNDELIVER_PUBLISH)
.setHeader("unique")
.simple("UNDELIVER${body[taskId]}")
.idempotentConsumer(header("unique"), idempotentRepo)
.to(
"jms:queue:" + Destination.CONSUMER_TASK_TASK_UNDELIVER,
"jms:queue:" + Destination.CONSUMER_GROWTH_TASK_UNDELIVER
);
}
};
}
}

待解决问题

如果是做消息队列的消息去重,但是消息队列又开启了ack消息确认处理成功的机制,那么在队列进行失败消息重发时,Apache Camel应该如何处理?

暂时想到的解决方案是为去重设置合理的超时时间,保证队列重发时,Camel对该消息的去重也已超时。

Apache Camel Idempotent文档

链接地址:http://camel.apache.org/idempotent-consumer.html