在SpringBoot中使用Redis的zset统计在线用户信息

统计在线用户的数量,是应用很常见的需求了。如果需要精准的统计到用户是在线,离线状态,我想只有客户端和服务器通过保持一个TCP长连接来实现。如果应用本身并非一个IM应用的话,这种方式成本极高。

现在的应用都趋向于使用心跳包来标识用户是否在线。用户登录后,每隔一段时间,往服务器推送一个消息,表示当前用户在线。服务器则可以定义一个时间差,例如:5分钟内收到过客户端心跳消息,视为在线用户

在线用户统计的实现

基于数据库实现

最简单的办法,就是在用户表,添加一个最后心跳包的日期时间字段 last_active。服务器收到心跳后,每次都去更新这个字段为当前的最新时间。

如果要查询最近5分钟活跃的用户数量,就可以简单的通过一句SQL完成。

SELECT COUNT(1) AS `online_user_count` FROM `user` WHERE `last_active` BETWEEN  '2020-12-22 13:00:00' AND '020-12-22 13:05:00';

弊端也是显而易见,为了提高检索效率,不得不为last_active字段添加索引,而因为心跳的更新,会导致频繁的重新维护索引树,效率极其低下。

基于Redis实现

这是比较理想的一种实现方式了,Redis基于内存进行读写,性能自然比关系型数据库好得多,而且它所提供的Zset可以很方便的构建出一个在线用户的统计服务。

Redis的Zset

这里不会涉及太多redis的东西,简单说明以下zset。它是一个有序的set集合,集合中的每个元素由2个东西组成

  • member 既然是集合,那么它便是集合中的元素,并且不能重复
  • score 既然是有序的,它就是用于排序的权重字段

Zset的部分操作

添加元素

ZADD key score member [score member ...]

一次性添加一个或者多个元素到集合,如果member已经存在则会使用当前score 进行覆盖

统计所有的元素数量

ZCARD key

统计score值在min和max之间元素数量

ZCOUNT key min max

删除score值在min和max之间的元素

ZREMRANGEBYSCORE key min max

一个示例

我打算,用一个zset存储我内心中编程语言的评分排名,这个key叫做lang

添加信息,返回新添加的元素个数

> zadd lang 999 php 10 java 9 go 8 python 7 javascript
"5"

查看添加的数量

> zcard lang
"5"

查看评分在8 - 10之间的元素个数,有3个

> zcount lang 8 10
"3"

删除评分在8 - 1000的元素,返回删除的个数

> ZREMRANGEBYSCORE lang 8 1000
"4"

在线用户服务的实现

知道了zset后,就可以实现一个在线用户的统计服务了。

实现思路

客户端每隔5分钟发送一个心跳到服务器,服务器根据会话获取到用户的ID,作为zsetmember
存入zsetscore便是当前收到心跳的时间戳,当同一个用户第二次发送心跳的时候,就会更新他对应的score值,由于更新是在内存,这个速度相当快。

zadd users 1608616915109 10000

需要统计出在线用户的数量,本质上就是需要统计出,最近5分钟有发送心跳的用户,通过zcount可以很轻松的统计出来。通过程序获取到当前的时间戳,作为maxScore,时间戳减去5分钟后作为minScore

zcount users 1608616615109 1608616915109 

因为某些用户可能长时间没有登录过了,可以通过ZREMRANGEBYSCORE进行清理。通过程序获取到当前的时间戳,减去5分钟后作为maxScore,使用0, 作为minScore,表示清理所有超过5分钟没有发送过心跳包的用户。

ZREMRANGEBYSCORE users 0 1608616615109 

实现代码

package io.springcloud.helper;

import java.time.Duration;
import java.time.Instant;

import javax.annotation.Resource;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import io.springcloud.constant.RedisKeys;

/**
 * 
 * 用户在线统计
 * @author KevinBlandy
 *
 */
@Component
public class OnlineUserStatisticsHelper {

	@Resource
	private StringRedisTemplate stringRedisTemplate;

	/**
	 * 添加在线用户信息
	 * 
	 * @param userId
	 * @return
	 */
	public Boolean online(Integer userId) {
		return this.stringRedisTemplate.opsForZSet().add(RedisKeys.ONLIE_USERS, userId.toString(),
				Instant.now().toEpochMilli());
	}

	/**
	 * 获取一定时间内,在线的用户数量
	 * 
	 * @param duration
	 * @return
	 */
	public Long count(Duration duration) {
		Instant now = Instant.now();
		return this.stringRedisTemplate.opsForZSet().count(RedisKeys.ONLIE_USERS,
				now.minus(duration).toEpochMilli(),
				now.toEpochMilli());
	}

	/**
	 * 获取所有在线过的用户数量,不论时间
	 * 
	 * @return
	 */
	public Long count() {
		return this.stringRedisTemplate.opsForZSet().zCard(RedisKeys.ONLIE_USERS);
	}

	/**
	 * 清除超过一定时间没在线的用户数据
	 * 
	 * @param duration
	 * @return
	 */
	public Long clear(Duration duration) {
		return this.stringRedisTemplate.opsForZSet().removeRangeByScore(RedisKeys.ONLIE_USERS, 0,
				Instant.now().minus(duration).toEpochMilli());
	}
	
	/**
	 * 获取指定用户最后一次在线的时间差
	 * @param userId
	 * @return
	 */
	public Duration last(Integer userId) {
		Double result = this.stringRedisTemplate.opsForZSet().score(RedisKeys.ONLIE_USERS, userId.toString());
		return result == null ? null : Duration.ofMillis(result.longValue());
	}
}

使用示例

@Resource
private OnlineUserStatsService onlineUserStatsService;

@Test
public void test() {
	
	// ID为1的用户发送了心跳包
	boolean result = this.onlineUserStatsService.online(1);
	System.out.println("online=" + result);
	
	// 获取5分钟内,发送过心跳包的用户数量,也就是在线用户的数量
	Long count = this.onlineUserStatsService.count(Duration.ofMinutes(5));
	System.out.println("oneline count=" + count);
	
	// 获取所有发送过心跳包的用户数量
	count = this.onlineUserStatsService.count();
	System.out.println("all count=" + count);
	
	// 清除超过1天都没发送过心跳包的用户
	Long clear = this.onlineUserStatsService.clear(Duration.ofDays(1));
	System.out.println("clear=" + clear);
}

内存消耗分析

可以通过 http://www.redis.cn/redis_memory/ 预算Redis的内存消耗

我对Redis的内存分配并不熟悉,只是按照自己的想法去填写了一些数据,所以我在这里理解的东西,可能是错误的。但是我想这并不耽误证明 - 在这种场景使用Zset对内存消耗极低的事实

设想onlie_users需要存储1亿个用户的状态信息,每个元素scoremember需要10个字节存储,那么一共大约需要20G内存。20G的内存对于现在的服务器来说,并不是大问题。

最后

  • 心跳协议不一定非要HTTP,如果客户端支持的话UDP就很适合,可以节约一些系统开销。
  • zset的key,不一定非要用String,可以修改序列化方式,以固定的字节的形式存储用户ID,在用户ID过大的时候,可以节约一些存储空间。
String userId = "10010";
System.out.println(userId.getBytes().length); // 以字符串形式存储 => 需要5个字节

byte[] bin = ByteBuffer.allocate(4).putInt(Integer.valueOf(userId)).array();
System.out.println(bin.length);					// 序列化为字节形式存储 => 需要4个字节

System.out.println(ByteBuffer.wrap(bin).getInt());	// 反序列化为ID => 10010
2 个赞

Redis提供的有 Hyperloglog基数统计算法
(1)、什么是基数?(网页UV一个人访问多次算作1次)
基数(不重复的元素)可以接受误差。Hyperloglog用来做基数统计的算法,每个 HyperLogLog 键只需要花费 12 KB 内存
就可以计算接近 2^64 个不同元素的基数。但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身
所以HyperLogLog 不能像集合那样,返回输入的各个元素。(优点:占用内存特别小。)

(2、)语法
PFADD key element [element …] 添加指定元素到 HyperLogLog 中。
PFCOUNT key [key …] 返回给定 HyperLogLog 的基数估算值。
PFMERGE destkey sourcekey [sourcekey …] 将多个 HyperLogLog 合并为一个 HyperLogLog

例:
    PFADD runoobkey "redis"
    PFCOUNT runoobkey
    PFMERGE 自定义集合3 集合1 集合2

实时在线统计和统计uv不是一个概念。

哦,我鲁莽了