springboot 2.2.7
你换成StreamRecords.rawBytes
试试看。
ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap("name".getBytes(), "myVal".getBytes())).withStreamKey("mystream".getBytes());
要不然就是你的消费端出现了问题,导致乱码。
嗯,这次xrange出来是正常的,消费端也正常了
1) 1) "1601174685689-0"2) 1) "name" 2) "KevinBlandy"
看来是推送消息的序列化的问题
找到原因了:
由于我推送消息时用的RedisTemplate是这样的:
@Autowired
private RedisTemplate<String, Object> redisTemplate;
他对value用的是JdkSerializationRedisSerializer进行序列化,如下图,(这也就是为啥stream和key都能正常获取,这两个用的是StringRedisSerializer)
改为StringRedisTemplate推送消息就可以了
我现在遇到一个问题,前面有人也提到,就是pending挂起后,我应该怎么处理这些数据?我希望我能重新执行,我需要调用什么方法?redisTemplate.opsForStream().pending调用这个后,可以根据哪些方法找到对应的数据,然后重新发布
对于pending
的消息,你可以获取后通过 XCLAIM
指令,重新指定它的消费者。至于spring-data-redis
中XCLAIM
指令的实现方法,我就忘记了,太久不用。
好的,我也没找到这个方法
我找到了,通过 RedisConnection
,可以获取到更底层的API。
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.execute(new RedisCallback<Object>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
// XCLAIM 指令的实现方法
connection.streamCommands().xClaim(key, group, newOwner, options);
connection.streamCommands().xClaim(key, group, newOwner, minIdleTime, recordIds);
connection.streamCommands().xClaimJustId(key, group, newOwner, options);
return null;
}
});
好的 谢谢
1、我也遇到 @runghuei 位网友遇到的问题,
org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 2 second(s)
2、在网上找了很多资料,都是让改回jedis客户端,但是改回jedis客户端,好像又不支持steam
3、有没有哪位解决了问题的,我在官网上看说不让设置超时时间,我把timeout去掉,使用默认的60s,还是会不定时出现这个,服务端的timeout是0
您好,这个问题设置再长还是会出现问题的吧,不是2000或者5000的问题,请问您最后怎么解决的?
您好,我们的场景一样,请问能交流下您最后怎么解决的吗?自动断开连接
写的不错,学到东西了
抱歉,我这边还没解决
您好,请问您当成mq使用,客户端使用lettuce监听是否也会出现自动断开,能请教下您是怎么解决的吗
XREADGROUP
了 message
,然后进行了一系列的业务增删改查,然后 XACK
的时候客户端与服务端失去了连接亦或超时了,导致 XACK
没有成功,但是 message
还在 Pending Entries List
中,这种时候可以报错,回滚一系列增删改查,但是下次连接上了的时候再去 XREADGROUP
的时候是先去Pending Entries List
中把没有 XACK
的message
消费掉还是重新到GROUP
中XREAD
出最新的消息到 Pending Entries List
中
昨天通过 Redis中的Stream类型数据 学习了下,如果消息一直存留在 Pending Entries List
中,超过一定时间未处理的话,消息是可以通过XCLAIM
转移的,如果转移次数( Pending Entries List
中delivery counter
属性)达到一定上限是可以认为是死信消息,可以通过XDEL
+ XACK
将消息指定为已处理,然后发现spring-boot-starter-data-redis
需要将spring-boot-starter
的version
升级到2.4.0
(之前的版本没去验证过)才会在org.springframework.data.redis.core.StreamOperations
中有PendingMessagesSummary pending(K key, String group);
相关的API
,
用schedule写个定时器 1分钟左右查询一次就可以保持连接了,可以解决这个问题。
另外,感谢博主,redisstream的博客真的很少,博主这一篇是最有用的,另外博主是在哪查的资料能说下吗?
当时就是在百度随便看了一些redis stream 的文章,把这个弄清楚后,再去看spring-data-redis的api,就很简单了。顺着找各个功能点的实现就行,
参数错误了,我是说消息一直没有被确认,
/**
* Acknowledge the given record as processed.
*
* @param group name of the consumer group.
* @param record the {@link Record} to acknowledge.
* @return length of acknowledged records. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xack">Redis Documentation: XACK</a>
*/
default Long acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getStream(), group, record.getId());
}
第一个参数是group,我的消息一直没有被确认。