如何在Springboot中使用Redis5的Stream

springboot用2.2.7就解决了…编译运行无报错

org.springframework.boot
spring-boot-starter-parent
2.2.7.RELEASE

1 个赞

这个问题怎么解决呢

org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 2 second(s)
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:70)
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41)
	at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44)
	at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42)
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270)
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:471)
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:379)
	at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:529)
	at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:239)
	at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:305)
	at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:300)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
	at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
	at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:234)
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236)
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138)
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 2 second(s)
	at io.lettuce.core.ExceptionFactory.createTimeoutException(ExceptionFactory.java:51)
	at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:114)
	at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:69)
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
	at com.sun.proxy.$Proxy100.xreadgroup(Unknown Source)
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:377)
	... 14 more
1 个赞

Command timed out 这是命令执行超时了吧???检查链接。

设置配置文件spring.redis.timeout=5000
目前已解决

1 个赞

有没有办法让redis断线超时后自动重连的参数,不然的话redis断开连接后就无法获取stream信息了

据我所知。。好像没。

StreamMessageListenerContainerOptions.executor(this.threadPoolTaskExecutor) 这里的线程池我可以设置多线程嘛?我自己测试好像没用哎,大神

经过测试,springboot项目中,当消费端没有ack后,并没有重新消费,而是继续向下消费。这个问题怎么解决呢? 我试过改变读取策略为“0-0”,每次重启都会从头读没有ack的消息,但是也是只会消费一次就一直往下继续。

stream并不会自动消费未确认的消息,你需要自己通过XPENDING指令,来获取到指定消费组,消费者下未确认的消息,手动消费后,再手动确认。

StreamOperations中获取未确认消息的api

PendingMessagesSummary pending(K key, String group);
PendingMessages pending(K key, Consumer consumer)
PendingMessages pending(K key, String group, Range<?> range, long count)
PendingMessages pending(K key, String group, Range<?> range, long count)

我们现在是把它当成 mq 来用的,如果没有自动重试功能,有没有对应的listener来监听PEL 呢?或者有别的解决方案,来处理这种情况呢?(我们的问题是:存在某条信息没有消费成功当然也就没有被ack,目前他就一直在PEL里面,每次都只能手动去处理)

可以新启动一个线程,不断轮询未确认的消息。这是我目前想到的方法了。

好吧,谢谢了。 :+1: :+1: :+1:

有个问题再请教一下,如果想一次监听10条消息,怎么设置呢?

streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
		.builder()
		// 一次性最多拉取多少条消息
		.batchSize(10)

这个你有测试过吗?我测试的时候还是只能一条一条取。

StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("name", "KevinBlandy")).withStreamKey("mystream");
RecordId recordId = this.stringRedisTemplate.opsForStream().add(stringRecord);

推送消息出现乱码:

{

“name”: “��\u0000\u0005t\u0000\u000bKevinBlandy”
}

看不出来为什么乱码。。这也没中文啊?

我在控制台xrange出来的结果是这样的:
10) 1) “1601172903265-0”
2) 1) “name”
2) “��”
add的代码:

public RecordId add(Record<K, ?> record) {

	Assert.notNull(record, "Record must not be null");

	MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);

	ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());

	return execute(connection -> connection.xAdd(binaryRecord), true);
}

{
“name”: “��\u0000\u0005t\u0000\f测试一下”
}
用中文也能出来,但是前面一直多这一块