redis stream 消息队列的最新解决方案

可以实现消息队列的工具有很多,例如:

ZeroMQ、Posix、SquirrelMQ、Redis、QDBM、Tokyo Tyrant、HTTPSQS等(linux平台下)

各自具备各自的特性,不在展开讨论。

其中redis 的订阅的发布,在spring boot是如何实现的,我们来看看代码:

依赖:

  1. 

org.springframework.boot spring-boot-starter-parent 2.1.11.RELEASE

   2. 

org.springframework.boot spring-boot-starter-data-redis com.fasterxml.jackson.core jackson-databind org.apache.commons commons-pool2 org.projectlombok lombok

配置:

spring: application: name: example profiles: test redis: host: localhost port: 6379 password: database: 0 lettuce: pool: 10 min-idle: 10 max-active: 20 max-wait: 10000

注册监听器:

监听消息:

发送信息:

redisTemplate.convertAndSend(channel,message)

这样下来,感觉似乎很完美,但是,无法保证消息真的/一定被消费了,例如客户端关机,宕机,或重启,都会导致数据丢失。因为,这些信息一旦发出,就没有历史记录。专业点就是,不可持久化 和没有ack确认机制。

那么 redis 5.x版本 的stream数据结构很好的解决了这个问题,不但可以发布订阅,还可以能持久化,最重要的是 有ack机制,极大程度上保证了数据的一致性。

好了,尝试搞起来!

进入官网:spring.io

选择spring data

打开 spring data redis 的文档

最新版本 2020-7-20发布的 2.3.2版本。其对应的srping cloud Hoxton SR6,spring boot 2.3.2.RELEASE

依赖和配置和之前一样,

注意的是:暂时只支持 lettuce 连接池,

下面看看如何实现。

先实现监听器接口:StreamListener

只有一个方法,onMessage(V var) ===>你可以 lombok实现也行。

我们下面做个测试接收下消息。

然后 注册 监听器:

发送消息:

上面的意思是有两种方式,一个是传入字节数组,另一个则是 Map, 也可以说是任何对象。

翻开 StringRecord 源码:

看官方提供的第二个构造 接受的是 string → json

再看 StreamRecords.string(data),其参数为 Map<String,String>

最后测试下:

简单的从web端push一条消息。

查看结果,如果报错:

可能是 redis版本问题,更新redis 版本 5x以上

接下来,在消费完消息后 加上ack确认机制:

结果:

image

分组 group 可以在客户端创建 ,也可以程序启动时初始化:

StreamOperations streamOperations = redisTemplate.opsForStream();
streamOperations.createGroup(“mystream”, “group-name”);
参数:stream,group,另一个构造自行尝试吧。

最后别忘记了在注册监听器加上 group;

这个 name 就是当前消费者的名字,随便写。比如写当前当前ip 192.168.0.110

redis stream 的消息队列最新解决方案 演示到此结束了,其中会有点坑需要自己爬,祝你好运!

你得把图片复制过来,重新粘贴一遍。。。。简书的资源做了防盗链。在社区加载不出来 :joy:

经过测试,springboot项目中,当消费端没有ack后,并没有重新消费,而是继续向下消费。这个问题怎么解决呢? 我试过改变读取策略为“0-0”,每次重启都会从头读没有ack的消息,但是也是只会消费一次就一直往下继续。