如何在Springboot中使用Redis5的Stream

springboot 2.2.7

你换成StreamRecords.rawBytes 试试看。

ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap("name".getBytes(), "myVal".getBytes())).withStreamKey("mystream".getBytes());

要不然就是你的消费端出现了问题,导致乱码。

嗯,这次xrange出来是正常的,消费端也正常了

 1)    1)   "1601174685689-0"2)      1)    "name" 2)    "KevinBlandy"

看来是推送消息的序列化的问题

1 个赞

找到原因了:
由于我推送消息时用的RedisTemplate是这样的:

@Autowired
private RedisTemplate<String, Object> redisTemplate;

他对value用的是JdkSerializationRedisSerializer进行序列化,如下图,(这也就是为啥stream和key都能正常获取,这两个用的是StringRedisSerializer)

改为StringRedisTemplate推送消息就可以了

1 个赞

我现在遇到一个问题,前面有人也提到,就是pending挂起后,我应该怎么处理这些数据?我希望我能重新执行,我需要调用什么方法?redisTemplate.opsForStream().pending调用这个后,可以根据哪些方法找到对应的数据,然后重新发布

对于pending的消息,你可以获取后通过 XCLAIM 指令,重新指定它的消费者。至于spring-data-redisXCLAIM指令的实现方法,我就忘记了,太久不用。

好的,我也没找到这个方法 :joy:

我找到了,通过 RedisConnection ,可以获取到更底层的API。

StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.execute(new RedisCallback<Object>() {
	@Override
	public String doInRedis(RedisConnection connection) throws DataAccessException {
		// XCLAIM 指令的实现方法
		connection.streamCommands().xClaim(key, group, newOwner, options);
		connection.streamCommands().xClaim(key, group, newOwner, minIdleTime, recordIds);
		connection.streamCommands().xClaimJustId(key, group, newOwner, options);
		return null;
	}
});

好的 谢谢

1、我也遇到 @runghuei 位网友遇到的问题,

org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 2 second(s)

2、在网上找了很多资料,都是让改回jedis客户端,但是改回jedis客户端,好像又不支持steam
3、有没有哪位解决了问题的,我在官网上看说不让设置超时时间,我把timeout去掉,使用默认的60s,还是会不定时出现这个,服务端的timeout是0

您好,这个问题设置再长还是会出现问题的吧,不是2000或者5000的问题,请问您最后怎么解决的?

您好,我们的场景一样,请问能交流下您最后怎么解决的吗?自动断开连接

写的不错,学到东西了

抱歉,我这边还没解决

1 个赞

您好,请问您当成mq使用,客户端使用lettuce监听是否也会出现自动断开,能请教下您是怎么解决的吗

XREADGROUPmessage ,然后进行了一系列的业务增删改查,然后 XACK 的时候客户端与服务端失去了连接亦或超时了,导致 XACK 没有成功,但是 message 还在 Pending Entries List 中,这种时候可以报错,回滚一系列增删改查,但是下次连接上了的时候再去 XREADGROUP 的时候是先去Pending Entries List中把没有 XACKmessage 消费掉还是重新到GROUPXREAD出最新的消息到 Pending Entries List

昨天通过 Redis中的Stream类型数据 学习了下,如果消息一直存留在 Pending Entries List 中,超过一定时间未处理的话,消息是可以通过XCLAIM转移的,如果转移次数( Pending Entries Listdelivery counter属性)达到一定上限是可以认为是死信消息,可以通过XDEL + XACK将消息指定为已处理,然后发现spring-boot-starter-data-redis需要将spring-boot-starterversion升级到2.4.0(之前的版本没去验证过)才会在org.springframework.data.redis.core.StreamOperations中有PendingMessagesSummary pending(K key, String group);相关的API

1 个赞

用schedule写个定时器 1分钟左右查询一次就可以保持连接了,可以解决这个问题。

另外,感谢博主,redisstream的博客真的很少,博主这一篇是最有用的,另外博主是在哪查的资料能说下吗?

当时就是在百度随便看了一些redis stream 的文章,把这个弄清楚后,再去看spring-data-redis的api,就很简单了。顺着找各个功能点的实现就行,

参数错误了,我是说消息一直没有被确认,

    	/**
	 * Acknowledge the given record as processed.
	 *
	 * @param group name of the consumer group.
	 * @param record the {@link Record} to acknowledge.
	 * @return length of acknowledged records. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xack">Redis Documentation: XACK</a>
	 */
	default Long acknowledge(String group, Record<K, ?> record) {
		return acknowledge(record.getStream(), group, record.getId());
	}

第一个参数是group,我的消息一直没有被确认。

2 个赞