如何在Springboot中使用Redis5的Stream

如何在Springboot中使用Redis5的Stream

关于Stream

一句话概括:Redis5的新数据类型,功能就是MQ。可以生产消息,消费消息。支持群组消费,以及消息确认。

在理解了Stream后,就可以继续往下看

SpringBoot整合

只需要整合进Redis就行。

POM.xml

springboot2默认使用lettuce作为客户端

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
</dependency>

配置

spring:
  redis:
    database: 0
    host: 192.168.1.103
    port: 6379
    password: "123456"
    timeout: 2000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

消息 和 消息ID的对象

我觉得要先说一下,这两个对象。因为以下的内容,都需要跟这两个对象打交道

消息对象的创建

使用 StreamRecords 的静态方法来创建消息实例。
一个stream消息有两个内容。可以理解为:一个是key,一个是value。
key和value都可以使用自定义的对象,字节,字符串来定义

ByteRecord rawBytes(Map<byte[], byte[]> raw) 

ByteBufferRecord rawBuffer(Map<ByteBuffer, ByteBuffer> raw) 

StringRecord string(Map<String, String> raw)

<S, K, V> MapRecord<S, K, V> mapBacked(Map<K, V> map)

<S, V> ObjectRecord<S, V> objectBacked(V value)

RecordBuilder<?> newRecord()  // 通过builder方式来创建消息

RecordId 表示消息ID

你读过上面的帖子,就会知道。一条消息的ID是唯一的。并且有2部分组成

// ----------- 读取ID属性的实例方法
// 是否是系统自动生成的
boolean shouldBeAutoGenerated();
// 获取原始的id字符串
String getValue();
// 获取序列号部分
long getSequence();
// 获取时间戳部分
long getTimestamp();

// ----------- 创建ID的静态方法
RecordId of(@Nullable String value)
RecordId of(long millisecondsTime, long sequenceNumber)
RecordId autoGenerate()

往Stream推送消息

使用RedisTemplate

@Autowired
private StringRedisTemplate stringRedisTemplate;

public void test () {
	// 创建消息记录, 以及指定stream
	StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("name", "KevinBlandy")).withStreamKey("mystream");
	RecordId recordId = this.stringRedisTemplate.opsForStream().add(stringRecord);
	// 是否是自动生成的
	boolean autoGenerated = recordId.shouldBeAutoGenerated();
	// id值
	String value = recordId.getValue();
	// 序列号部分
	long sequence = recordId.getSequence();
	// 时间戳部分
	long timestamp = recordId.getTimestamp();
}

使用RedisConnection

@Autowired
private RedisConnectionFactory redisConnectionFactory;

public void test () {
	// 创建消息记录, 以及指定stream
	ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap("name".getBytes(), "KevinBlandy".getBytes())).withStreamKey("mystream".getBytes());
	// 获取连接
	RedisConnection redisConnection = this.redisConnectionFactory.getConnection();
	RecordId recordId = redisConnection.xAdd(byteRecord);
	// 是否是自动生成的
	boolean autoGenerated = recordId.shouldBeAutoGenerated();
	// id值
	String value = recordId.getValue();
	// 序列号部分
	long sequence = recordId.getSequence();
	// 时间戳部分
	long timestamp = recordId.getTimestamp();
}

从Stream消费消息

阻塞消费

StreamConsumerRunner

使用 ApplicationRnner,在系统启动以后,初始化监听器。开始监听消费。

import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;


@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {

	static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);

	@Value("${redis.stream.consumer}")
	private String consumer;

	@Autowired
	RedisConnectionFactory redisConnectionFactory;

	@Autowired
	ThreadPoolTaskExecutor threadPoolTaskExecutor;
	
	@Autowired
	StreamMessageListener streamMessageListener;

	@Autowired
	StringRedisTemplate stringRedisTemplate;

	private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

	@Override
	public void run(ApplicationArguments args) throws Exception {
		
		// 创建配置对象
		StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
				.builder()
				// 一次性最多拉取多少条消息
				.batchSize(10)
				// 执行消息轮询的执行器
				.executor(this.threadPoolTaskExecutor)
				// 消息消费异常的handler
				.errorHandler(new ErrorHandler() {
					@Override
					public void handleError(Throwable t) {
						// throw new RuntimeException(t);
						t.printStackTrace();
					}
				})
				// 超时时间,设置为0,表示不超时(超时后会抛出异常)
				.pollTimeout(Duration.ZERO)
				// 序列化器
				.serializer(new StringRedisSerializer())
				.build();

		// 根据配置对象创建监听容器对象
		StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
				.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
		
		// 使用监听容器对象开始监听消费(使用的是手动确认方式)
		streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1"), 
				StreamOffset.create("mystream", ReadOffset.lastConsumed()), this.streamMessageListener);

		this.streamMessageListenerContainer = streamMessageListenerContainer;
		// 启动监听
		this.streamMessageListenerContainer.start();

	}

	@Override
	public void destroy() throws Exception {
		this.streamMessageListenerContainer.stop();
	}
}

StreamMessageListener

实现函数接口 StreamListener<K, V extends Record<K, ?>> ,来自定义消息的消费实现

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;


@Component
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>>{
	
	static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageListener.class);
	
	@Autowired
	StringRedisTemplate stringRedisTemplate;
	
	@Override
	public void onMessage(MapRecord<String, String, String> message) {
		
		// 消息ID
		RecordId messageId = message.getId();
		
		// 消息的key和value
		Map<String, String> body = message.getValue();
		
		LOGGER.info("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body);
		
		// 通过RedisTemplate手动确认消息
		this.stringRedisTemplate.opsForStream().acknowledge("mystream", message);
	}
}

非阻塞消费

主要是通过StreamOperations 或者是 RedicConnection 的消费API来进行消息的随机消费

StreamOperations 中,关于读取操作的API

从RedisTemplate中获取到StreamOperations

StreamOperations<String, String, String> s = this.stringRedisTemplate.opsForStream();

StreamOperations 的读取 API

// 随机范围读取
<V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range)
<V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range, Limit limit)


// 根据消息ID或者偏移量读取
List<MapRecord<K, HK, HV>> read(StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(Consumer consumer, StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)
List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)

// 随机逆向范围读取
List<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range)
List<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range, Limit limit)
<V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range)
<V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range, Limit limit)

// 消费者信息
XInfoConsumers consumers(K key, String group);
// 消费者信息
XInfoGroups groups(K key);
// stream信息
XInfoStream info(K key);

// 获取消费组,消费者中未确认的消息
PendingMessagesSummary pending(K key, String group);
PendingMessages pending(K key, Consumer consumer)
PendingMessages pending(K key, String group, Range<?> range, long count)
PendingMessages pending(K key, String group, Range<?> range, long count)

测试

先通过Redis控制台创建stream以及group。

127.0.0.1:6379> XADD mystream * hello world
"1583208428680-0"
127.0.0.1:6379> XGROUP CREATE mystream group-1 $
OK

启动程序后,通过控制台往stream生产消息

127.0.0.1:6379> XADD mystream * name KevinBlandy
"1583208571017-0"

程序成功的消费了这条消息

2020-03-03 12:09:34.159  INFO 9344 --- [lTaskExecutor-1] i.s.c.r.stream.StreamMessageListener     : stream message。messageId=1583208571017-0, stream=mystream, body={name=KevinBlandy}

最后

对于Streram还有一些其他的操作。例如:通过RedisTemplate来发送消息,以及查看未ACK的消息,重新消费等等。 在这里没有一一列举。其实你如果学懂了Stream,那么我觉得这些API连蒙带猜也都知道是怎么用的。水到渠成的事儿,不难。

3 Likes

Consumer.from(“group-1”, “consumer-1”),这个如何进行初始化,按照你这样写会找不到 group-1

消费组需要预先创建,在Redis的cli创建,或者是用客户端创建。

客户端创建了,我在redistamplate的stream 对象如何将信息放进去,好像没有找到对应有关group操作的方法

StreamOperations有方法,可以创建group

String createGroup(K key, String group)
String createGroup(K key, ReadOffset readOffset, String group)

java.lang.NoSuchMethodError: org.springframework.data.redis.repository.configuration.RedisRepositoryConfigurationExtension.registerIfNotAlreadyRegistered
这个是什么错误呢

可能是版本问题导致的,这个方法是 2.1 版本后才有的。

Error:(13, 56) java: 程序包org.springframework.data.redis.connection.stream不存在
还需要引入哪些maven包呢

spring-data-redis

我用的是spring-boot-starter-parent2.1.11.RELEASE
spring-data-redis2.3.1.RELEASE
启动时报一下错误,是哪个不对呢

Description:

An attempt was made to call a method that does not exist. The attempt was made from the following location:

    org.springframework.data.redis.repository.configuration.RedisRepositoryConfigurationExtension.createMappingConfigBeanDef(RedisRepositoryConfigurationExtension.java:168)

The following method did not exist:

    org.springframework.data.repository.config.RepositoryConfigurationSource.getRequiredAttribute(Ljava/lang/String;Ljava/lang/Class;)Ljava/lang/Object;

The method's class, org.springframework.data.repository.config.RepositoryConfigurationSource, is available from the following locations:

    jar:file:/C:/Users/Yau/.m2/repository/org/springframework/data/spring-data-commons/2.1.14.RELEASE/spring-data-commons-2.1.14.RELEASE.jar!/org/springframework/data/repository/config/RepositoryConfigurationSource.class

It was loaded from the following location:

    file:/C:/Users/Yau/.m2/repository/org/springframework/data/spring-data-commons/2.1.14.RELEASE/spring-data-commons-2.1.14.RELEASE.jar


Action:

Correct the classpath of your application so that it contains a single, compatible version of org.springframework.data.repository.config.RepositoryConfigurationSource

Disconnected from the target VM, address: '127.0.0.1:49378', transport: 'socket'

Process finished with exit code 1

不要自定义spring-data-redis的版本号。

不定义spring-data-redis的版本号
运行编译时他报以下错误
Error:(13, 56) java: 程序包org.springframework.data.redis.connection.stream不存在
麻烦发一下pom.xml

不需要自己定义版本号,程序包不存在。你要先看依赖是不是正常的。

依赖是正常 运行编译时报Error:(13, 56) java: 程序包org.springframework.data.redis.connection.stream不存在

咋可能啊。编译时异常,ide肯定会在界面给异常提示的。

我引入了以下依赖 idea pom.xml界面没有给异常提醒…运行编译报错

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.25.Final</version>
		</dependency>
		

		<dependency>
			<groupId>commons-codec</groupId>
			<artifactId>commons-codec</artifactId>
			<version>1.11</version>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-io</artifactId>
			<version>1.3.2</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.20</version>
		</dependency>


		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>1.3.1</version>
		</dependency>

		<dependency>
			<groupId>tk.mybatis</groupId>
			<artifactId>mapper-spring-boot-starter</artifactId>
			<version>1.2.4</version>
		</dependency>

		<dependency>
			<groupId>com.github.pagehelper</groupId>
			<artifactId>pagehelper-spring-boot-starter</artifactId>
			<version>1.2.3</version>
		</dependency>


		<dependency>
		    <groupId>com.github.tobato</groupId>
		    <artifactId>fastdfs-client</artifactId>
		    <version>1.26.2</version>
		</dependency>
		
		<dependency>
		    <groupId>org.springframework</groupId>
		    <artifactId>spring-test</artifactId>
		</dependency>

		<dependency>
		    <groupId>com.google.zxing</groupId>
		    <artifactId>javase</artifactId>
		    <version>3.3.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
            <scope>provided</scope>
        </dependency>
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.session</groupId>
			<artifactId>spring-session-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>io.lettuce</groupId>
			<artifactId>lettuce-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
		</dependency>
	</dependencies>

redis只用3个依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>io.lettuce</groupId>
	<artifactId>lettuce-core</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
</dependency>

只用这3个依赖报错…
Error:(13, 56) java: 程序包org.springframework.data.redis.connection.stream不存在

我没辙了。

好吧 :joy:感谢帮助