springboot activemq 同时支持topic和queue模式

springboot activemq 同时支持topic和queue模式

pom.xml配置

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- fast json -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.60</version>
    </dependency>

    <!-- activemq -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <!-- activemq连接池 -->
    <dependency>
        <groupId>org.messaginghub</groupId>
        <artifactId>pooled-jms</artifactId>
    </dependency>

</dependencies>

application.properties配置

#是否启用内存模式(也就是不安装MQ,也可以使用MQ功能),默认为true
spring.activemq.in-memory=false
#ActiveMQ连接池是否启用
spring.activemq.pool.enabled=true
#ActiveMQ连接池最大连接数
spring.activemq.pool.max-connections=5
#ActiveMQ连接池连接空闲时间,默认为30秒
spring.activemq.pool.idle-timeout=30000
#服务地址
spring.activemq.broker-url=tcp://localhost:61616
#用户名(不写默认为admin)
spring.activemq.user=
#密码(不写默认为admin)
spring.activemq.password=

新建配置类ActivemqConfig

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;

/**
 * activemq配置
 *
 * @author zhangxu
 * @date 2019/12/20 10:43
 */
@Configuration
public class ActivemqConfig {

    /**
     * 设置存入mq的数据格式为json
     *
     * @return
     */
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    /**
     * 将springboot里面的消息加到jms监听工厂
     * 解决接受消息之后消费不了问题
     *
     * @param connectionFactory
     * @param configurer
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    /**
     * 支持topic模式
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> topicModel(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * 支持queue模式
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> queueModel(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

}

新建测试类 TestMq

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 测试接收mq消息
 *
 * @author zhangxu
 * @date 2019/12/26 10:39
 */
@Slf4j
@Component
public class TestMq {

    /**
     * 监听queue
     *
     * @param text
     */
    @JmsListener(destination = "a.queue", containerFactory = "queueModel")
    public void queue(String text) {
        log.debug("获取到了queue消息:{}", text);
    }

    /**
     * 监听topic
     *
     * @param text
     */
    @JmsListener(destination = "a.topic", containerFactory = "topicModel")
    public void topic(String text) {
        log.debug("获取到了topic消息:{}", text);
    }

}

开始测试

使用浏览器打开activemq管理页 http://localhost:8161

发送一条queue消息

此时控制台打印

再次发送一条topic消息

此时控制台输出


原文:https://zhangjava.coding.me/springboot-activemq-同时支持topic和queue模式/