多线程消费
消费线程池的配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadPoolTaskExecutorConfiguration {
@Bean(initMethod = "initialize", destroyMethod = "destroy")
public ThreadPoolTaskExecutor threadPoolTaskExecutor () {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setThreadNamePrefix("tream-consumer-");
return threadPoolTaskExecutor;
}
}
消费服务
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;
@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {
static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private StreamMessageListener streamMessageListener;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;
@Override
public void run(ApplicationArguments args) throws Exception {
String channelName = "channel";
String groupName = "group";
String consumerName = "consumer-1";
// 初始化stream
if(!this.stringRedisTemplate.hasKey("channel")) {
RecordId recordId = this.stringRedisTemplate.opsForStream().add(channelName, Collections.singletonMap("tryCreat", ""));
LOGGER.info("stream {} 初始化:{}", "channel", recordId);
}
// 初始化消费组
try {
String result = this.stringRedisTemplate.opsForStream().createGroup(channelName, "group");
LOGGER.info("消费组 {} 创建:{}", "group", result);
} catch (Exception e) {
LOGGER.error("消费组创建失败:{}", e.getMessage());
}
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
.builder()
.batchSize(2)
.executor(Executors.newSingleThreadExecutor())
.errorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
t.printStackTrace();
}
})
.pollTimeout(Duration.ZERO)
.serializer(new StringRedisSerializer())
.build();
// 创建 Container
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
// 群组消费:开始接收消息,设置为手动消费
streamMessageListenerContainer.receive(Consumer.from(groupName, consumerName),
StreamOffset.create(channelName, ReadOffset.lastConsumed()), this.streamMessageListener);
this.streamMessageListenerContainer = streamMessageListenerContainer;
this.streamMessageListenerContainer.start();
}
@Override
public void destroy() throws Exception {
this.streamMessageListenerContainer.stop();
}
}
消费者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>>{
static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageListener.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
@Async
public void onMessage(MapRecord<String, String, String> message) {
LOGGER.info("MessageId: " + message.getId());
LOGGER.info("Stream: " + message.getStream());
LOGGER.info("Body: " + message.getValue());
this.stringRedisTemplate.opsForStream().acknowledge("group", message);
}
}
客户端
private static final Logger LOGGER = LoggerFactory.getLogger(JpaApplicationTest.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
public void test () {
for (int x = 0 ;x < 100;x ++) {
RecordId recordId = this.stringRedisTemplate.opsForStream().add(MapRecord.create("channel", Collections.singletonMap("message", x + "")));
LOGGER.info("message push={}", recordId);
}
}
主函数开启异步驱动
@EnableAsync(proxyTargetClass = true)
消费日志
2020-08-21 17:54:24.619 INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener : Stream: channel
2020-08-21 17:54:24.619 INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener : Body: {message=768}
2020-08-21 17:54:24.634 INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener : MessageId: 1598003652317-0
2020-08-21 17:54:24.634 INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener : Stream: channel
2020-08-21 17:54:24.634 INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener : Body: {message=769}
2020-08-21 17:54:24.635 INFO 4128 --- [ream-consumer-6] i.s.jpa.runner.StreamMessageListener : MessageId: 1598003652326-0
2020-08-21 17:54:24.635 INFO 4128 --- [ream-consumer-6] i.s.jpa.runner.StreamMessageListener : Stream: channel
2020-08-21 17:54:24.635 INFO 4128 --- [ream-consumer-6] i.s.jpa.runner.StreamMessageListener : Body: {message=770}
2020-08-21 17:54:24.642 INFO 4128 --- [ream-consumer-5] i.s.jpa.runner.StreamMessageListener : MessageId: 1598003652334-0
可以看到是不同的线程在消费不同的消息