可以实现消息队列的工具有很多,例如:
ZeroMQ、Posix、SquirrelMQ、Redis、QDBM、Tokyo Tyrant、HTTPSQS等(linux平台下)
各自具备各自的特性,不在展开讨论。
其中redis 的订阅的发布,在spring boot是如何实现的,我们来看看代码:
依赖:
1.
org.springframework.boot spring-boot-starter-parent 2.1.11.RELEASE
2.
org.springframework.boot spring-boot-starter-data-redis com.fasterxml.jackson.core jackson-databind org.apache.commons commons-pool2 org.projectlombok lombok
配置:
spring: application: name: example profiles: test redis: host: localhost port: 6379 password: database: 0 lettuce: pool: 10 min-idle: 10 max-active: 20 max-wait: 10000
注册监听器:
监听消息:
发送信息:
redisTemplate.convertAndSend(channel,message)
这样下来,感觉似乎很完美,但是,无法保证消息真的/一定被消费了,例如客户端关机,宕机,或重启,都会导致数据丢失。因为,这些信息一旦发出,就没有历史记录。专业点就是,不可持久化 和没有ack确认机制。
那么 redis 5.x版本 的stream数据结构很好的解决了这个问题,不但可以发布订阅,还可以能持久化,最重要的是 有ack机制,极大程度上保证了数据的一致性。
好了,尝试搞起来!
进入官网:spring.io
选择spring data
打开 spring data redis 的文档
最新版本 2020-7-20发布的 2.3.2版本。其对应的srping cloud Hoxton SR6,spring boot 2.3.2.RELEASE
依赖和配置和之前一样,
注意的是:暂时只支持 lettuce 连接池,
下面看看如何实现。
先实现监听器接口:StreamListener
只有一个方法,onMessage(V var) ===>你可以 lombok实现也行。
我们下面做个测试接收下消息。
然后 注册 监听器:
发送消息:
上面的意思是有两种方式,一个是传入字节数组,另一个则是 Map, 也可以说是任何对象。
翻开 StringRecord 源码:
看官方提供的第二个构造 接受的是 string → json
再看 StreamRecords.string(data),其参数为 Map<String,String>
最后测试下:
简单的从web端push一条消息。
查看结果,如果报错:
可能是 redis版本问题,更新redis 版本 5x以上
接下来,在消费完消息后 加上ack确认机制:
结果:
分组 group 可以在客户端创建 ,也可以程序启动时初始化:
StreamOperations streamOperations = redisTemplate.opsForStream();
streamOperations.createGroup(“mystream”, “group-name”);
参数:stream,group,另一个构造自行尝试吧。
最后别忘记了在注册监听器加上 group;
这个 name 就是当前消费者的名字,随便写。比如写当前当前ip 192.168.0.110