redis streams队列多线程问题

CloseWorkerMessageListerner

package cn.chinacancer.queen.consumer;

import cn.chinacancer.queen.config.Config;
import cn.chinacancer.queen.mapper.compute.*;
import cn.chinacancer.queen.util.AliyunUtil;
import cn.chinacancer.queen.util.TaskUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class CloseWorkerMessageListerner implements StreamListener<String, MapRecord<String, String, String>> {
    private static final Logger logger = LoggerFactory.getLogger(CloseWorkerMessageListerner.class);

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Autowired
    Config config;

    @Autowired
    @Qualifier("computeTransactionManager")
    DataSourceTransactionManager dataSourceTransactionManager;

    @Autowired
    TaskMapper taskMapper;

    @Autowired
    TaskCmdParamMapper taskCmdParamMapper;

    @Autowired
    PipelineMapper pipelineMapper;

    @Autowired
    TaskFileMapper taskFileMapper;

    @Autowired
    TaskMetaMapper taskMetaMapper;

    @Autowired
    PendingTaskMapper pendingTaskMapper;

    @Autowired
    TaskUtil taskUtil;

    @Autowired
    AliyunUtil aliyunUtil;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        RecordId messageId = message.getId();
        Map<String, String> body = message.getValue();
        logger.debug("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body);
        this.stringRedisTemplate.opsForStream().delete(config.closeWorkerStreamKey, messageId);

        System.out.println("Close WorkerMessage OnMessage");

        try {
            this.sleepSomeTime(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Close WorkerMessage OnMessage end");
    }

    private void sleepSomeTime(int sleepTime) throws InterruptedException {
        Thread.sleep(sleepTime);
    }
}

CloseWorkerConsumerRunner

package cn.chinacancer.queen.consumer.runner;

import cn.chinacancer.queen.config.Config;
import cn.chinacancer.queen.consumer.CloseWorkerMessageListerner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

import java.time.Duration;
import java.util.Collections;

@Component
public class CloseWorkerConsumerRunner implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(CloseWorkerConsumerRunner.class);

    @Autowired
    Config config;

    @Autowired
    RedisConnectionFactory redisConnectionFactory;

    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    CloseWorkerMessageListerner streamMessageListener;

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

    @Override
    public void run(ApplicationArguments args)  {

        try{
            // 判断 stream 是否初始化,未初始化则进行初始化
            if (!Boolean.TRUE.equals(stringRedisTemplate.hasKey(config.closeWorkerStreamKey))) {
                // 往 stream 发送消息,会自动创建 stream
                RecordId recordId = stringRedisTemplate.opsForStream().add(config.closeWorkerStreamKey, Collections.singletonMap("_up", "up"));

                // 删除创建
                stringRedisTemplate.opsForStream().delete(config.closeWorkerStreamKey, recordId);
            }

            // 创建 消费者组
            stringRedisTemplate.opsForStream().createGroup(config.closeWorkerStreamKey, config.closeWorkerStreamGroup);
        }catch (Exception e){

        }
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(0);
        taskExecutor.afterPropertiesSet();
        taskExecutor.setThreadNamePrefix("tentech");
        taskExecutor.initialize();

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .batchSize(10)
                .executor(taskExecutor)
                .errorHandler(new ErrorHandler() {
                    @Override
                    public void handleError(Throwable throwable) {

                    }
                })
                .pollTimeout(Duration.ofMillis(100))
                .serializer(new StringRedisSerializer())
                .build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
                .create(this.redisConnectionFactory, streamMessageListenerContainerOptions);

        streamMessageListenerContainer.receiveAutoAck(Consumer.from(config.closeWorkerStreamGroup, config.closeWorkerStreamUser),
                StreamOffset.create(config.closeWorkerStreamKey, ReadOffset.lastConsumed()), this.streamMessageListener);
        this.streamMessageListenerContainer = streamMessageListenerContainer;

        this.streamMessageListenerContainer.start();
    }

    @Override
    public void destroy() throws Exception {
        this.streamMessageListenerContainer.stop();
    }
}

关于这里的线程池我设置了多个线程,最后执行还是串行的有大神知道怎么样多线程消费嘛?

1,你的executor是怎么配置的
2,看日志,确定消息的消费是线程池中的线程执行的
3,你怎么认为是“串行”执行的

实际执行的时候用到的是线程池的线程,但是只有一个线程

同时并发的消费几条消息,也是同一个线程在执行业务?

是的,我同时发了10条信息,只能一条一条的执行,而且线程名字都是tentech1

那玄学了啊。。你这线程池不会只有一个线程吧?

但是我设置的是最小5个最大10个

跟这个没关系,具体什么原因我一下也不知道。

这个跟redis设置有关吗?

应该没吧。。redis负责推送消息到客户端,客户端怎么消费跟redis没关系。

:pensive:一脸懵逼,网上也看不到redis多线程的案例

你可以把完整的关键代码贴出来,不要截图。我抽时间试一下。看看能不能找到问题。

@Component
public class CloseWorkerMessageListerner implements StreamListener<String, MapRecord<String, String, String>> {
    private static final Logger logger = LoggerFactory.getLogger(CloseWorkerMessageListerner.class);

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        RecordId messageId = message.getId();
        Map<String, String> body = message.getValue();
        logger.debug("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body);

        System.out.println("Close WorkerMessage OnMessage");

        try {
            this.sleepSomeTime(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Close WorkerMessage OnMessage end");
    }

    private void sleepSomeTime(int sleepTime) throws InterruptedException {
        Thread.sleep(sleepTime);
    }
}

@Component
public class CloseWorkerConsumerRunner implements ApplicationRunner, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(CloseWorkerConsumerRunner.class);

@Autowired
Config config;

@Autowired
RedisConnectionFactory redisConnectionFactory;

@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Autowired
CloseWorkerMessageListerner streamMessageListener;

@Autowired
StringRedisTemplate stringRedisTemplate;

private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

@Override
public void run(ApplicationArguments args)  {

    try{
        // 判断 stream 是否初始化,未初始化则进行初始化
        if (!Boolean.TRUE.equals(stringRedisTemplate.hasKey(config.closeWorkerStreamKey))) {
            // 往 stream 发送消息,会自动创建 stream
            RecordId recordId = stringRedisTemplate.opsForStream().add(config.closeWorkerStreamKey, Collections.singletonMap("_up", "up"));

            // 删除创建
            stringRedisTemplate.opsForStream().delete(config.closeWorkerStreamKey, recordId);
        }

        // 创建 消费者组
        stringRedisTemplate.opsForStream().createGroup(config.closeWorkerStreamKey, config.closeWorkerStreamGroup);
    }catch (Exception e){

    }
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.setQueueCapacity(0);
    taskExecutor.afterPropertiesSet();
    taskExecutor.setThreadNamePrefix("tentech");
    taskExecutor.initialize();

    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
            .builder()
            .batchSize(10)
            .executor(taskExecutor)
            .errorHandler(new ErrorHandler() {
                @Override
                public void handleError(Throwable throwable) {

                }
            })
            .pollTimeout(Duration.ofMillis(100))
            .serializer(new StringRedisSerializer())
            .build();

    StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
            .create(this.redisConnectionFactory, streamMessageListenerContainerOptions);

    streamMessageListenerContainer.receiveAutoAck(Consumer.from(config.closeWorkerStreamGroup, config.closeWorkerStreamUser),
            StreamOffset.create(config.closeWorkerStreamKey, ReadOffset.lastConsumed()), this.streamMessageListener);
    this.streamMessageListenerContainer = streamMessageListenerContainer;

    this.streamMessageListenerContainer.start();
}

@Override
public void destroy() throws Exception {
    this.streamMessageListenerContainer.stop();
}

}

大佬有空看看嘛

我也不知道这个是 怎么回事。

嗨。。。咱也不看代码注释,我看了一下

/**
 * Configure a {@link Executor} to run stream polling {@link Task}s.
 *
 * @param executor must not be null.
 * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
 */
public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor) {

	Assert.notNull(executor, "Executor must not be null!");

	this.executor = executor;
	return this;
}

第一行注释的翻译:配置{@link Executor}以运行流轮询{@link Task}。

这个配置的是,用于轮询消费的线程池,轮询消费也就只用到了一个线程。你需要多线程消费,要自己通过 @Async来完成。

多线程消费

消费线程池的配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ThreadPoolTaskExecutorConfiguration {
	
	@Bean(initMethod = "initialize", destroyMethod = "destroy")
	public ThreadPoolTaskExecutor threadPoolTaskExecutor () {
		ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
		threadPoolTaskExecutor.setCorePoolSize(10);
		threadPoolTaskExecutor.setThreadNamePrefix("tream-consumer-");
		return threadPoolTaskExecutor;
	}
}

消费服务

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {
	
	static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);


	@Autowired
	private RedisConnectionFactory redisConnectionFactory;

	@Autowired
	private StreamMessageListener streamMessageListener;

	@Autowired
	private StringRedisTemplate stringRedisTemplate;

	private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

	@Override
	public void run(ApplicationArguments args) throws Exception {
		
		String channelName = "channel";
		String groupName = "group";
		String consumerName = "consumer-1";
	
		// 初始化stream
		if(!this.stringRedisTemplate.hasKey("channel")) {
			RecordId recordId = this.stringRedisTemplate.opsForStream().add(channelName, Collections.singletonMap("tryCreat", ""));
			LOGGER.info("stream {} 初始化:{}", "channel", recordId);
		}

		// 初始化消费组
		try {
			String result = this.stringRedisTemplate.opsForStream().createGroup(channelName, "group");
			LOGGER.info("消费组 {} 创建:{}", "group", result);
		} catch (Exception e) {
			LOGGER.error("消费组创建失败:{}", e.getMessage());
		}

		StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
				.builder()
				.batchSize(2)
				.executor(Executors.newSingleThreadExecutor())
				.errorHandler(new ErrorHandler() {
					@Override
					public void handleError(Throwable t) {
						t.printStackTrace();
					}
				})
				.pollTimeout(Duration.ZERO)
				.serializer(new StringRedisSerializer())
				.build();

		// 创建 Container
		StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
				.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);

		// 群组消费:开始接收消息,设置为手动消费
		streamMessageListenerContainer.receive(Consumer.from(groupName, consumerName),
				StreamOffset.create(channelName, ReadOffset.lastConsumed()), this.streamMessageListener);


		this.streamMessageListenerContainer = streamMessageListenerContainer;

		this.streamMessageListenerContainer.start();

	}

	@Override
	public void destroy() throws Exception {
		this.streamMessageListenerContainer.stop();
	}
}

消费者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;


@Component
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>>{
	
	static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageListener.class);
	
	@Autowired
	StringRedisTemplate stringRedisTemplate;
	
	@Override
	@Async
	public void onMessage(MapRecord<String, String, String> message) {
		
		LOGGER.info("MessageId: " + message.getId());
		LOGGER.info("Stream: " + message.getStream());
		LOGGER.info("Body: " + message.getValue());		
		
		this.stringRedisTemplate.opsForStream().acknowledge("group", message);
	}
}

客户端

private static final Logger LOGGER = LoggerFactory.getLogger(JpaApplicationTest.class);

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
public void test () {
	for (int  x = 0 ;x < 100;x ++) {
		RecordId recordId = this.stringRedisTemplate.opsForStream().add(MapRecord.create("channel", Collections.singletonMap("message", x + "")));			
		LOGGER.info("message push={}", recordId);
	}
}

主函数开启异步驱动

@EnableAsync(proxyTargetClass = true)

消费日志

2020-08-21 17:54:24.619  INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener     : Stream: channel
2020-08-21 17:54:24.619  INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener     : Body: {message=768}
2020-08-21 17:54:24.634  INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener     : MessageId: 1598003652317-0
2020-08-21 17:54:24.634  INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener     : Stream: channel
2020-08-21 17:54:24.634  INFO 4128 --- [eam-consumer-10] i.s.jpa.runner.StreamMessageListener     : Body: {message=769}
2020-08-21 17:54:24.635  INFO 4128 --- [ream-consumer-6] i.s.jpa.runner.StreamMessageListener     : MessageId: 1598003652326-0
2020-08-21 17:54:24.635  INFO 4128 --- [ream-consumer-6] i.s.jpa.runner.StreamMessageListener     : Stream: channel
2020-08-21 17:54:24.635  INFO 4128 --- [ream-consumer-6] i.s.jpa.runner.StreamMessageListener     : Body: {message=770}
2020-08-21 17:54:24.642  INFO 4128 --- [ream-consumer-5] i.s.jpa.runner.StreamMessageListener     : MessageId: 1598003652334-0

可以看到是不同的线程在消费不同的消息

如果这样实现的话,线程池就会被塞满,然后拒绝新任务,就会丢弃消息了。

不会丢消息,看线程池的拒绝策略。线程池的拒绝策略可以设置为:CallerRun。没有可执行线程的时候,由轮询线程来执行消费逻辑。轮询线程阻塞后,不会去拉取新的消息。所以消息不会丢的。