springboot activemq 同时支持topic和queue模式
pom.xml配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- fast json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- activemq连接池 -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
</dependencies>
application.properties配置
#是否启用内存模式(也就是不安装MQ,也可以使用MQ功能),默认为true
spring.activemq.in-memory=false
#ActiveMQ连接池是否启用
spring.activemq.pool.enabled=true
#ActiveMQ连接池最大连接数
spring.activemq.pool.max-connections=5
#ActiveMQ连接池连接空闲时间,默认为30秒
spring.activemq.pool.idle-timeout=30000
#服务地址
spring.activemq.broker-url=tcp://localhost:61616
#用户名(不写默认为admin)
spring.activemq.user=
#密码(不写默认为admin)
spring.activemq.password=
新建配置类ActivemqConfig
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.jms.ConnectionFactory;
/**
* activemq配置
*
* @author zhangxu
* @date 2019/12/20 10:43
*/
@Configuration
public class ActivemqConfig {
/**
* 设置存入mq的数据格式为json
*
* @return
*/
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
/**
* 将springboot里面的消息加到jms监听工厂
* 解决接受消息之后消费不了问题
*
* @param connectionFactory
* @param configurer
* @return
*/
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
/**
* 支持topic模式
*
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> topicModel(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
/**
* 支持queue模式
*
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> queueModel(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
}
新建测试类 TestMq
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 测试接收mq消息
*
* @author zhangxu
* @date 2019/12/26 10:39
*/
@Slf4j
@Component
public class TestMq {
/**
* 监听queue
*
* @param text
*/
@JmsListener(destination = "a.queue", containerFactory = "queueModel")
public void queue(String text) {
log.debug("获取到了queue消息:{}", text);
}
/**
* 监听topic
*
* @param text
*/
@JmsListener(destination = "a.topic", containerFactory = "topicModel")
public void topic(String text) {
log.debug("获取到了topic消息:{}", text);
}
}
开始测试
使用浏览器打开activemq管理页 http://localhost:8161
发送一条queue消息
此时控制台打印
再次发送一条topic消息
此时控制台输出
原文:https://zhangjava.coding.me/springboot-activemq-同时支持topic和queue模式/