springboot + RabbitMQ 使用stomp与前端VUE通讯

springboot + RabbitMQ 使用stomp与前端VUE通讯

MQ开启stomp

在MQ安装目录的sbin下, 命令行顺序执行下列命令

	rabbitmq-plugins enable rabbitmq_web_stomp
	rabbitmq-plugins enable rabbitmq_web_stomp_examples

确认MQ开启stomp, 确保15672, 15674 端口正常
后端连接15672端口 , 前端stomp连接15674端口

开始配置

Maven

	<dependency>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>

application.yml

spring:
	rabbitmq:
	    host: 127.0.0.1
	    port: 5672
	    username: admin
	    password: 123456
	    virtual-host: /
	    publisher-confirms: true
	    publisher-returns: true
	    # 连接超时 毫秒
	    connection-timeout: 60000
	    # 监听器
	    listener:
	      simple:
	        acknowledge-mode: manual
	        # 最小并发量
	        concurrency: 100
	        # 最大并发量
	        max-concurrency: 1000
	        prefetch: 50
	        retry:
	          # 重试是否可用
	          enabled: true
	      direct:
	        acknowledge-mode: manual
	    # 消息模板
	    template:
	      retry:
	        enabled: true

定义交换机

public class Constants {

    // 催办交换机
    public static final String PRESS_EXCHANGE = "press_exchange";
}

连接MQ配置文件


import com.mis.mq.constants.Constants;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 读取RabbitMQ配置文件
 */
@Slf4j
@Configuration
public class MqConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Bean
    public Queue directQueue() {
        // 队列名字, 持久化
        return new Queue(Constants.PRESS_EXCHANGE, true);
    }


    public Connection getConn() {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null;
        try {
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);
            connection = factory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            log.error("获取MQ连接失败...");
        }
        return connection;
    }
}

MQ推送工具类


import com.mis.mq.config.MqConfig;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ Channel消息Stomp推送工具类
 */
@Slf4j
@Component
public class SendUtil {

    private static SendUtil sendUtil;
    private MqConfig config;

    public SendUtil(MqConfig config) {
        this.config = config;
    }

    @PostConstruct
    public void init(){
        sendUtil = this;
    }


    /**
     *  推送stomp数据 默认消息持久化
     * @param exchange 交换机
     * @param type  交换机类型 如 : BuiltinExchangeType.DIRECT
     * @param routerKey 路由key
     * @param object 数据
     */
    public static boolean pushMsg(String exchange, BuiltinExchangeType type, String routerKey, Object object) {
        Connection connection = sendUtil.config.getConn();
        Channel channel = null;
        try {
            assert connection != null;
            channel = connection.createChannel();
            channel.exchangeDeclare(exchange, type, true);
            channel.basicPublish(exchange, routerKey,null, object.toString().getBytes());
            return true;
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return false;
    }

}


推送消息 以用户ID作为routerKey

SendUtil.pushMsg(Constants.PRESS_EXCHANGE, BuiltinExchangeType.DIRECT, 接收人ID, JSON数据)

VUE连接

import Stomp from 'stompjs'

let client = Stomp.client('ws://127.0.01:15674/ws')
// 账号密码
var headers = {
  'login': 'admin',
  'passcode': '123456'
}
client.connect(headers, (frame) => {
  // exchange 固定  press_exchange是后台定义的交换机的名称  userId 当前登录用户的ID
  state.client.subscribe('/exchange/press_exchange/' + userId, (frame1) => {
    // 获取消息
    const body = JSON.parse(frame1.body)
    // 获取消息后的操作
  }, (frame) => {
    console.log('通讯连接失败,将无法收到信息!')
  })
}, (frame) => {
  console.log('通讯连接失败,将无法收到信息!')
})

// 断开连接
client.disconnect()


原文:https://blog.csdn.net/qq_36476972/article/details/108536838
作者: · Coisini