使用Apache Camel进行消息去重
题图:from Zoommy
为什么要消息去重
虚拟场景:
财务系统靠人力系统的通知向你发放薪水,正常情况是每月一次。不过,某天人力系统因为某些原因,重复发送了通知,(可能是以为财务系统没有收到通知,也可能是其他原因)。你以为你会收到两份薪水?
别逗了,公司不会允许这种事情发生的。财务系统针对同一个通知(3月发给你工资),不论收到几次,都只会处理一次。
这么做的原因显而易见,而这样的财务系统就是一个幂等消费者。
幂等消费者,Idempotent Consumer
什么是Idempotent Consumer?这是EIPs中的一个概念,指代幂等消息消费者,也即对于重复消息的处理应该有幂等的结果。
这就涉及到了消息消费者如何处理重复消息的问题。EIPs中只有一系列需要解决的系统整合问题的定义,而Apache Camel却是实实在在提供了几乎所有这些问题的解决方案。
针对幂等消费者的问题,Apache Camel提供了IdempotentConsumer类作为解决方案。
IdempotentConsumer类
Apache Camel提供idempotentConsumer类来解决幂等消费者的问题,这个类的构造函数通常需要两个参数,一个唯一ID的表达式,一种存储方式IdempotentRepository(redis,jpa等)。唯一的Message Exchange ID用来标识消息身份。而存储方式就是如何存储这些消息ID。
构造函数举例如下:
|
|
实际上,他工作的原理更像是一个消息过滤器。收到消息后去IdempotentRepository中查找Message ID是否已经存在,如果不存在,就会把Message ID放入IdempotentRepository,并且继续处理消息,如果消息处理失败则会再把Message ID从IdempotentRepository中移除。
默认IdempotentCusumer会丢弃重复消息,但是Camel允许我们自定义处理重复消息。具体方式可以参考官网,链接见文末。
Apache Camel提供了多种IdempotentRepository以供选择,常用的有:
- MemoryIdempotentRepository 基于内存
- RedisStringIdempotentRepository 基于Redis
- JpaMessageIdRepository 基于Jpa
- JdbcMessageIdRepository 基于Jdbc
更多请参见官网,链接见文末。
使用RedisStringIdempotentRepository进行去重
使用redis需要添加依赖compile("org.apache.camel:camel-spring-redis:2.17.0")
该依赖提供两个类RedisStringIdempotentRepository和RedisIdempotentRepository。前者是以<String,String>
类型存入redis,后者是<String, Set>
类型存入。前者支持设置超时时间。两者的构造方法都很简单,只需要提供一个RedisTemplate<String, String>
对象和一个处理进程名。他们会根据处理进程名与Message ID生成key存储在redis中。
配置示例代码如下
|
|
待解决问题
如果是做消息队列的消息去重,但是消息队列又开启了ack消息确认处理成功的机制,那么在队列进行失败消息重发时,Apache Camel应该如何处理?
暂时想到的解决方案是为去重设置合理的超时时间,保证队列重发时,Camel对该消息的去重也已超时。