javax.websocket的整合使用

javax.websocket是JavaEE提供的一套websocket接口规范。具体由Servlet容器来提供实现。这里演示一个 比较完整的入门应用。

javax.websocket的核心组件

核心组件,要是都给讲明白。能得好几大篇幅,这里仅仅只做简单的介绍。 :roll_eyes:

ServerApplicationConfig

websocket服务的开发,有2种方式

  1. 注解,
  2. 实现Endpoint接口的方法。

该接口负责把所有的websocket的服务(包括注解和接口实现)注册到系统。需要自己提供实现类。系统启动后,会自动调动完成注册。

ServerEndpointConfigurator

websocket握手的处理类,可以读取到握手的请求头,也可以修改响应头。它还能获取到HttpSession

Encoder/Decoder

负责入站/出站消息的编解码

Session

连接对象,可以给客户端发送数据(同步,异步),断开,添加消息处理器等等。

Endpoint

抽象类,服务端的端点接口。提供了基本的事件方法。

void onOpen(Session session, EndpointConfig config);
void onClose(Session session, CloseReason closeReason) 
void onError(Session session, Throwable thr)

MessageHandler

消息处理器接口,它有俩子接口

Whole 处理普通消息

 void onMessage(T message);

Partial 处理分片消息

void onMessage(T partialMessage, boolean last);

Maven依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>io.javaweb</groupId>
	<artifactId>websocket</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>

	<dependencies>
		<!-- websocket api -->
		<dependency>
			<groupId>javax.websocket</groupId>
			<artifactId>javax.websocket-api</artifactId>
			<version>1.1</version>
			<scope>provided</scope>
		</dependency>

		<!-- Gson -->
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.8.6</version>
		</dependency>
		
		<!-- logback -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.30</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-access</artifactId>
			<version>1.2.3</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.1</version>
				<configuration>
					<compilerArgs>
						<arg>-parameters</arg>
					</compilerArgs>
					<source>1.8</source>
					<target>1.8</target>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<!-- jetty插件 -->
			<plugin>
				<groupId>org.eclipse.jetty</groupId>
				<artifactId>jetty-maven-plugin</artifactId>
				<version>9.4.31.v20200723</version>
				<configuration>
					<httpConnector>
						<port>80</port>
					</httpConnector>
					<webApp>
						<contextPath>/</contextPath>
					</webApp>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

ServerApplicationConfigImpl


import java.util.Collections;
import java.util.Set;

import javax.websocket.Endpoint;
import javax.websocket.server.ServerApplicationConfig;
import javax.websocket.server.ServerEndpointConfig;


public class ServerApplicationConfigImpl implements ServerApplicationConfig  {

	/**
	 * 实现了Endpoint的端点
	 */
	@Override
	public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> endpointClasses) {
		// 系统中没有Endpoint实现,返回空集合
		return Collections.emptySet(); 
	}

	/**
	 * 标识了@ServerEndpoint注解的端点
	 */
	@Override
	public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) {
		return scanned;
	}
}

ServerEndpointConfigurator

可以读取到websocket握手的请求头,也可以修改响应头。

package io.javaweb.websocket;

import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;

public class ServerEndpointConfigurator extends ServerEndpointConfig.Configurator {
	
	@Override
	public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
		super.modifyHandshake(sec, request, response);
	}
}

JsonEncoder

把响应给客户端的数据,编码为Json

package io.javaweb.websocket.decoder;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

import com.google.gson.Gson;


public class JsonEncoder implements Encoder.Text<Object> {

	private Gson gson;
	
	@Override
	public void init(EndpointConfig config) {
		this.gson =  new Gson();
	}

	@Override
	public void destroy() {
		
	}

	@Override
	public String encode(Object object) throws EncodeException {
		return this.gson.toJson(object);
	}
}

JsonDecoder

把客户端发送的数据,解码为JsonElement对象

package io.javaweb.websocket.encoder;

import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;

import com.google.gson.JsonElement;
import com.google.gson.JsonParser;

public class JsonDecoder implements Decoder.Text<JsonElement> {


	@Override
	public void init(EndpointConfig config) {
	}

	@Override
	public void destroy() {
	}

	@Override
	public JsonElement decode(String s) throws DecodeException {
		return JsonParser.parseString(s);
	}

	@Override
	public boolean willDecode(String s) {
		return true;
	}
}

TestChannel

package io.javaweb.websocket.channel;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.JsonElement;

import io.javaweb.websocket.ServerEndpointConfigurator;
import io.javaweb.websocket.decoder.JsonEncoder;
import io.javaweb.websocket.encoder.JsonDecoder;

@ServerEndpoint(value = "/test/{userId}", 
	configurator = ServerEndpointConfigurator.class,	// 握手处理器
	decoders = JsonDecoder.class,		// 消息解码器
	encoders = JsonEncoder.class		// 消息编码器
)
public class TestChannel {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(TestChannel.class);

	private Session session;
	
	@OnMessage(maxMessageSize = 1024) // 单个消息最大体积 1Kb
	public void onMessage(JsonElement message){
		
		LOGGER.info("收到消息:{}", message);
		
		// 把收到的消息异步响应给客户端
		this.session.getAsyncRemote().sendObject(message);
	}

	@OnOpen
	public void onOpen(@PathParam("userId") Long userId		// restfull风格的参数
						, Session session, EndpointConfig endpointConfig){
		
		// 查询参数
		Map<String, List<String>> parameter = session.getRequestParameterMap();
		String token = parameter.get("token").get(0);
		
		LOGGER.info("新的链接:userId={}, token={}, sessionId={}", userId, token, session.getId());
		
		this.session = session;
		this.session.setMaxIdleTimeout(0); // 一直空闲不超时
	}

	@OnClose
	public void onClose(CloseReason closeReason){
		LOGGER.info("链接断开:{}", closeReason);
	}

	@OnError
	public void onError(Throwable throwable) throws IOException {
		LOGGER.error("系统异常:{}", throwable.getMessage());
		throwable.printStackTrace();
	}

}

启动测试

启动Jetty插件

mvn jetty:run

客户端实现

const webSocket = new WebSocket('ws://localhost/test/1000?token=123456');
webSocket.onmessage = (e) => {
	console.log(`收到消息:${e.data}`);
}
webSocket.onopen = (e) => {
	console.log('链接打开');
	webSocket.send(JSON.stringify({message: "Hello Websocket"}));
}
webSocket.onerror = (e) => {
	console.log('链接异常');
}
webSocket.onclose = (e) => {
	console.log(`链接断开:code=${e.code}, reason=${e.reason}`);
}

日志

客户端

链接打开
index.html:13 收到消息:{"message":"Hello Websocket"}

服务端

11:12:12.618 [qtp277601240-12] INFO  i.j.websocket.channel.TestChannel - 新的链接:userId=1000, token=123456, sessionId=1
11:12:12.762 [qtp277601240-12] INFO  i.j.websocket.channel.TestChannel - 收到消息:{"message":"Hello Websocket"}