Redis不常用操作指南(基于SpringBoot2.3.2代码实现)

1 前言

通常程序员在学习Redis的安装到使用是非常快速的,因为Redis的大部分操作都等同于Map的操作逻辑,只要了解大部分api结合百度能够快速的投入到CRUD的工作中去,所以今天来一期Redis不常用操作指南,当然这里的不常用更多是不会天天用,但是在项目中还是会有应用到的地方。

2 安装Redis

安装及原生命令

3 SpringBoot2整合Redis(有现成环境的可以略过)

3.1 相关依赖

<!-- 单元测试 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- 连接池 --》
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

<!-- hutool -->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.7.17</version>
</dependency>

3.2 配置文件

spring:
  redis:
    host: 127.0.0.1   # 服务地址
    port: 6379        # 服务端口
    timeout: 10000    # 连接超时时间(毫秒)
    lettuce:
      pool:
        max-active: 8   # 最大连接数
        max-wait: -1ms  # 阻塞最大等待时长(负值代表没有限制)
        min-idle: 0     # 最小空闲连接
        max-idle: 8     # 最大空闲连接

3.3 配置代码(放在启动类中即可)

@Bean
public <T> RedisTemplate<String, T> redisTemplate(RedisConnectionFactory factory) {
    RedisTemplate<String, T> template = new RedisTemplate<>();
    template.setConnectionFactory(factory);
    template.setKeySerializer(new StringRedisSerializer());
    template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
    template.setHashKeySerializer(new StringRedisSerializer());
    template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
    return template;
}

添加此配置解决乱码问题

4 不常用操作

4.1 SCAN(必知必会)

通常我们使用scan是为了替换keys,keys命令执行时候会引发Redis锁,导致Redis操作大面积阻塞,所以Redis提供scan命令,不会阻塞主线程,支持按游标分批次返回数据,是比较理想的选择,缺点就是scan有可能返回重复数据,我们需要进行去重,这个在java里面使用Set接收返回值就ok了。

4.1.1 代码实现

package com.example.demo.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Component
public class RedisScan {

    /**
     * 获取一批指定前缀的key eg: key:* 获取所有key:开头的key
     *
     * @param pattern key匹配正则
     * @param count   一次获取数目
     * @return
     */
    public Set<String> scan(String pattern, int count) {
        return redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
            Set<String> keysTmp = new HashSet<>();
            try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
                    .match(pattern)
                    .count(count).build())) {
                while (cursor.hasNext()) {
                    keysTmp.add(new String(cursor.next(), StandardCharsets.UTF_8));
                }
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
            return keysTmp;
        });
    }

    /**
     * 批量删除
     *
     * @param pattern key匹配正则
     * @param step    阶梯删除的数目
     */
    public void batchDelete(String pattern, int step) {
        while (scan(pattern, step).size() > 0) {
            Set<String> keys = scan(pattern, step);
            redisTemplate.delete(keys);
        }
    }

    /**
     * 模拟指定数量的数据
     *
     * @param count
     */
    public void mock(String keyPrefix, int count) {
        Map<String, String> map = new HashMap<>();
        for (int i = 0; i < count; i++) {
            map.put(keyPrefix + i, String.valueOf(i));
        }
        redisTemplate.opsForValue().multiSet(map);
    }

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisScan.class);

}

提供scan方法及批量删除方法,提供mock数据方法,已经经过测试

4.1.2 测试

package com.example.demo.redis;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import com.example.demo.ApplicationTests;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class RedisScanTest extends ApplicationTests {

    @Autowired
    RedisScan redisScan;

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisScanTest.class);

    @Test
    public void testMock() {
        TimeInterval timer = DateUtil.timer();
        redisScan.mock("mock:", 10000);
        LOGGER.info("耗时:{}ms", timer.interval());
    }

    @Test
    public void batchDeleteTest() {
        TimeInterval timer = DateUtil.timer();
        redisScan.batchDelete("mock:*", 1000);
        LOGGER.info("耗时:{}ms", timer.interval());
    }
}

4.2 BitMap(需要掌握)

Redis的普通数据类型也能够实现BitMap所处理的需求,BitMap并不是为了特定业务而生,而是为了节约内存

  • 1.基于最小的单位bit进行存储,所以非常省空间。
  • 2.设置时候时间复杂度O(1)、读取时候时间复杂度O(n),操作是非常快的。
  • 3.二进制数据的存储,进行相关计算的时候非常快。
  • 4.方便扩容

4.2.1 预设需求

某平台有1E个用户,需要标记每个用户的在线状态

4.2.2 代码实现

package com.example.demo.redis;

import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class RedisBitMap {

    @Resource
    private RedisTemplate redisTemplate;

    private static final String ONLINE_STATUS = "online_status";

    /**
     * 设置在线状态
     *
     * @param userId
     * @param online
     */
    public void setOnlineStatus(int userId, boolean online) {
        redisTemplate.opsForValue().setBit(ONLINE_STATUS, userId, online);
    }

    /**
     * 统计在线人数
     *
     * @return
     */
    public Long getOnlineCount() {
        return (Long) redisTemplate.execute((RedisCallback<Long>) con -> con.bitCount(ONLINE_STATUS.getBytes()));
    }
}

4.2.3 测试

package com.example.demo.redis;

import com.example.demo.ApplicationTests;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class RedisBitMapTest extends ApplicationTests {

    @Autowired
    RedisBitMap redisBitMap;

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisBitMapTest.class);


    @Test
    public void setOnlineStatusTest() {
        // 10000个人
        for (int i = 0; i < 10000; i++) {
            // 设置偶数在线 奇数不在线
            redisBitMap.setOnlineStatus(i, i % 2 == 0);
        }
    }

    @Test
    public void onlineCountTest() {
        Long i = redisBitMap.getOnlineCount();
        LOGGER.info("oline count = {}", i);

    }
}

4.3 HyperLogLog(需要掌握)

HyperLogLog,你可以把它的功能理解成一个优化了存储空间的Set,适合大型网站进行用户行为统计的业务场景,如果你是小型网站,用Set就行了;HyperLogLog计数统计是有一定的误差的,误差最大在1%以下,所以HyperLogLog不适用于百分百精确统计的场景,网站访问量统计通常也能接收这样的误差。

4.3.1 优缺点

  • 基数不大,数据量不大就用不上,会有点大材小用浪费空间
  • 有局限性,就是只能统计基数数量,而没办法去知道具体的内容是什么

4.3.2 示例代码

package com.example.demo.redis;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import com.example.demo.ApplicationTests;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

import javax.annotation.Resource;

/**
 * RedisHyperLoglog对比Set
 *
 * @author 热黄油啤酒
 */
public class RedisHyperLogLogTest extends ApplicationTests {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    @Test
    public void hllTest() {
        TimeInterval timer = DateUtil.timer();
        String key = "pv_hll:20211220";
        // 模拟1000次操作
        for (int i = 1; i < 1000; i++) {
            redisTemplate.opsForHyperLogLog().add(key, String.valueOf(i));
        }
        Long size = redisTemplate.opsForHyperLogLog().size(key);
        LOGGER.info("size = {}, 耗时= {}ms", size, timer.interval());
        // 操作999次返回996
    }

    @Test
    public void setTest() {
        TimeInterval timer = DateUtil.timer();
        String key = "pv_set:20211220";
        // 模拟1000次操作
        for (int i = 1; i < 1000; i++) {
            redisTemplate.opsForSet().add(key, String.valueOf(i));
        }
        Long size = redisTemplate.opsForSet().size(key);
        LOGGER.info("size = {}, 耗时= {}ms", size, timer.interval());
        // 操作999次返回999
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisHyperLogLogTest.class);
}

熟练使用Bitmap及HyperLogLog,结合redis自增数等操作,你就具备网站行为统计业务的技术基础了,下一步就可以去了解PV UV IP之类的统计业务需求,来动手做一个BI系统(行为识别behavior identity)

4.4 地理信息GEO(了解即可)

主流的数据库都提供了地理信息支持,包含存储、分析、数据生成等等,对于涉及地理信息相关业务产品,Redis GEO操作具有操作简单,高性能的优势,但是Redis的GEO只提供了点位的支持,缺乏对线和面的支持

4.4.1 代码示例

package com.example.demo.redis;

import org.springframework.data.geo.*;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 地理信息操作
 */
@Component
public class RedisGeo {

    @Resource
    private RedisTemplate redisTemplate;

    private static final String CITY = "city";

    /**
     * 添加点位
     *
     * @param name 名称
     * @param x    经度
     * @param y    纬度
     */
    public void add(String name, double x, double y) {
        redisTemplate.opsForGeo().add(CITY, new Point(x, y), name);
    }


    /**
     * 距离(km)
     *
     * @param city1
     * @param city2
     * @return
     */
    public double distance(String city1, String city2) {
        Distance distance = redisTemplate.opsForGeo().distance(CITY, city1, city2, RedisGeoCommands.DistanceUnit.KILOMETERS);
        return distance.getValue();
    }

    /**
     * 周边城市
     *
     * @param city
     * @param distance
     * @return
     */
    public List<Map<String, Object>> circum(String city, double distance) {
        // 获取中心城市坐标
        List<Point> positions = redisTemplate.opsForGeo().position(CITY, city);
        List<Map<String, Object>> cityList = new ArrayList<>();
        if (CollectionUtils.isEmpty(positions)) {
            return cityList;
        }
        Point point = positions.stream().findFirst().get();
        Circle circle = new Circle(point, new Distance(distance, Metrics.KILOMETERS));
        RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(5);
        GeoResults<RedisGeoCommands.GeoLocation<String>> results = redisTemplate.opsForGeo()
                .radius(CITY, circle, args);
        for (GeoResult<RedisGeoCommands.GeoLocation<String>> result : results) {
            RedisGeoCommands.GeoLocation<String> content = result.getContent();
            String name = content.getName();
            Point cityPoint = content.getPoint();
            Distance cityDistance = result.getDistance();
            // 为了展示这些api的使用,我将返回值包装成map
            Map<String, Object> cityMap = new HashMap<>();
            cityMap.put("name", name);
            cityMap.put("lng", cityPoint.getX());
            cityMap.put("lat", cityPoint.getY());
            cityMap.put("distance", cityDistance.getValue());
            cityList.add(cityMap);
        }
        return cityList;
    }


}

4.4.2 测试

package com.example.demo.redis;

import com.example.demo.ApplicationTests;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;
import java.util.Map;

public class RedisGeoTest extends ApplicationTests {
    @Autowired
    private RedisGeo redisGeo;

    @Test
    public void addTest() {
        // 添加一些城市点位
        redisGeo.add("北京", 116.405285, 39.904989);
        redisGeo.add("武汉", 114.311582, 30.598467);
        redisGeo.add("郑州", 113.631419, 34.753439);
        redisGeo.add("广州", 113.271431, 23.135336);
        redisGeo.add("南宁", 108.373451, 22.822607);
    }

    @Test
    public void distanceTest() {
        // 北京到武汉的距离
        double distance = redisGeo.distance("北京", "武汉");
        LOGGER.info("distance = {}km", distance);
    }

    @Test
    public void circumTest() {
        // 北京周边1000km的城市
        List<Map<String, Object>> circumCity = redisGeo.circum("北京", 1000);
        LOGGER.info("circum city = {}", circumCity);
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisGeoTest.class);
}

redis geo基本覆盖了主流的点位相关场景,比如附近的人,周边的店铺等等,我们熟悉这些api就可以解决这些需求了,没此类需求的也可当一个知识储备,如果你有更复杂的地理信息存储需求,可以参考我的MySQL地理信息处理文章-MySQL空间数据存储及函数 - 掘金 (juejin.cn)

4.5 消息队列(了解即可)

Redis新的版本提供了官方的消息队列支持(当然这是Redis作者无奈的选择,毕竟太多人用Redis的List来处理消息了),我们介绍基于List的消息队列实现及官方提供消息队列

4.5.1 基于List

生产消费
package com.example.demo.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * 基于Redis List的消息队列实现
 */
@Component
public class RedisQueueOfList {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueueOfList.class);

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private static final String TOPIC = "redis_queue";


    /**
     * 发送消息
     *
     * @param msg
     */
    public void send(String msg) {
        redisTemplate.opsForList().leftPush(TOPIC, msg);
    }

    @PostConstruct
    public void listener() {
        LOGGER.info("消费者已启动...");
        new Thread() {
            public void run() {
                while (true) {
                    String msg = redisTemplate.opsForList().rightPop(TOPIC, 1, TimeUnit.SECONDS);
                    if (msg == null) {
                        continue;
                    }
                    // 业务处理
                    LOGGER.info("queue msg = {}", msg);
                }
            }
        }.start();

    }
}
测试接口
package com.example.demo.redis.controller;

import com.example.demo.redis.RedisQueueOfList;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("redis-queue")
public class RedisQueueController {

    @GetMapping("/send")
    public void send(String msg) {
        queueOfList.send(msg);
    }

    @Autowired
    private RedisQueueOfList queueOfList;
}

基于Redis List的消息队列简单来说就是列表push, 消费者循环pop, 可以小规模使用,消息量的场景建议使用更加专业的消息队列中间件(kafka、rocketmq…)

4.5.2 Channel

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

生产及消费
package com.example.demo.redis;

import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class RedisChannel {

    /**
     * 发送消息
     *
     * @param msg
     */
    public void send(String msg) {
        redisTemplate.convertAndSend(CHANNEL, msg);
    }

    /**
     * 注册消息监听
     *
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                       MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(CHANNEL));
        return container;
    }

    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter messageListenerAdapter(RedisChannelReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiver");
    }

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private static final String CHANNEL = "redis_channel";
}
消息处理
package com.example.demo.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 消息接收处理
 */
@Component
public class RedisChannelReceiver {

    public void receiver(String msg) {
        // TODO 消息处理业务
        LOGGER.info("receiver msg = {}", msg);
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisChannelReceiver.class);
}
测试接口
package com.example.demo.redis.controller;

import com.example.demo.redis.RedisChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("redis-channel")
public class RedisChannelController {

    @GetMapping("send")
    public void send(String msg) {
        redisChannel.send(msg);
    }

    @Autowired
    private RedisChannel redisChannel;
}

5 结尾

Redis能做很多事情,但前提是对数据量有准确的预估,文中代码可能有不规范之处,主要了为了演示api的使用,大家择优食用,有收获的同学给个赞吧0^0

6 我的其它文章

Redis与本地缓存组合 - 掘金 (juejin.cn)

阿里需求挑战-十分钟内连续登录5次失败,需要等待30分钟才能登录【附图】 - 掘金 (juejin.cn)

一文搞懂用户登录验证流程(附图) - 掘金 (juejin.cn)

我的其它文章主要涉及业务解决方案,缓存、数据库操作,消息队列,SaaS平台等等,有兴趣的可以进我主页看看噢 我的主页


作者:热黄油啤酒
链接:Redis不常用操作指南(基于SpringBoot2.3.2代码实现) - 掘金