Skip to the content.

Redis基础

黑马程序员 Redis 入门到实战教程,全面透析 redis 底层原理+redis分布式锁+企业解决方案+redis实战_哔哩哔哩_bilibili

Redis 介绍:诞生于 2009 年,全称是 Remote Dictionary Server,远程字典服务器,是一个基于内存的键值型 NoSQL 数据库。

特点

- SQL NoSQL
数据结构 结构化(Structured) 非结构化
数据关联 关联的(Relational) 无关联的
查询方式 SQL 查询 非 SQL
事务特性 ACID BASE
存储方式 磁盘 内存
扩展性 垂直 水平
使用场景 1) 数据结构固定
2) 相关业务对数据安全性、一致性要求较高
1) 数据结构不固定
2) 对一致性、安全性要求不高
3) 对性能要求高

NoSQL 的类型有如下几种

Redis 的特点

键值存储数据库

列存储数据库

文档型数据库

图形(Graph)数据库

安装

windows

  1. 官网:https://redis.io
  2. 中文网:http://www.redis.net.cn/
  3. 解压直接可以使用: redis.windows.conf:配置文件 redis-cli.exe:redis 的客户端 redis-server.exe:redis 服务器端

docker

在 docker 中安装 redis。

docker 中进入后台运行的 redis:docker exec -it ae7c /bin/bash。然后再输入 redis-cli 启动交互模式。

# 拉取 redis 镜像
docker pull redis
# 运行 redis 容器
docker run --name myredis -d -p6379:6379 redis
# 执行容器中的 redis-cli 可以直接命令行操作 redis
docker exec -it myredis redis-cli

docker 启动 redis 并配置 redis.conf,配置数据存储目录

sudo docker run -p 6379:6379 # 端口映射
--name redis # 设置容器名
-v /home/payphone/software/redis-6/redis.conf:/etc/redis/redis.conf # 配置文件路径映射
-v /home/payphone/software/redis-6:/data # 数据存储路径映射
-d redis 
redis-server /etc/redis/redis.conf # 启动 redis-server 配置规则按 redis.conf 设置 
--appendonly yes # 开启 appendonly

# 无注释版本
sudo docker run -p 6379:6379 --name redis -v /home/payphone/software/redis-6/redis.conf:/etc/redis/redis.conf  -v /home/payphone/software/redis-6:/data -d redis redis-server /etc/redis/redis.conf --appendonly yes

docker 启动的 redis 可能连接不上,需要把 redis.conf 中的 bind 127.0.0.1 注释掉,再把 87 行的 protected-mode yes 改成 no。

# bind 127.0.0.1
protected-mode no

WSL

也可以直接在 windows 的 wsl 子系统中安装 redis,也很方便。

apt-get install redis
# 运行客户端
redis-cli

源码编译

数据结构

Redis 是一个 key-value 的数据库,key 一般是 String 类型,不过 value 的类型多种多样。

五大基本类型

特殊类型

命令

Redis 的命令可以在官网查询到,也可以通过在 Redis 客户端输入 help @命令类型help @generic 查询通用命令的使用方式。

通用命令

通用指令是部分数据类型的,都可以使用的指令,常见的有:

通过 help [command] 可以查看一个命令的具体用法,例如:

String类型

String 类型,也就是字符串类型,是 Redis 中最简单的存储类型。其 value 是字符串,不过根据字符串的格式不同,又可以分为 3 类:

不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。

key value
msg hello world
num 10
score 92.6

Redis 的 String 类型是动态字符串,是可以修改的字符串,采用预分配冗余空间的方式来减少内存的频繁分配。

具体来说 String 类型内部为当前字符串分配的实际空间 capacity 一般要高于实际字符串长度 len。当字符串长度小于 1MB 时,扩容都是加倍现有的空间。如果字符串长度超过 1MB,扩容时一次只会多扩 1MB 的空间。字符串最大长度为 512MB。

String 常见命令 说明
SET 添加或者修改已经存在的一个 String 类型的键值对
GET 根据 key 获取 String 类型的 value
MSET 批量添加多个 String 类型的键值对,M 开头的都是批量操作
MGET 根据多个 key 获取多个 String 类型的 value
INCR 让一个整型的 key 自增 1
INCRBY 让一个整型的 key 自增并指定步长,例如:incrby num 2 让 num 值自增 2
INCRBYFLOAT 让一个浮点类型的数字自增并指定步长
SETNX 添加一个 String 类型的键值对,前提是这个 key 不存在,否则不执行
(返回 0 表示存在了,未添加成功)
SETEX 添加一个 String 类型的键值对,并且指定有效期
# 存储:set key value
# set key value
127.0.0.1:6379> set username zhangsan
OK

# 获取
# get key
127.0.0.1:6379> get username
"zhangsan"

# 删除
# del key
127.0.0.1:6379> del age
(integer) 1

# 查看是否存在
127.0.0.1:6379> exists username

存储结构

用字符数组实现的,不过有两种存储方式,长度短时用 embstr 存储,长度超过 44 字节时用 raw 形式存储。具体的存储结构形式看原理部分。

Hash类型

Hash 类型,也叫散列,其 value 是一个无序字典,类似于 Java 中的 HashMap 结构。

String 结构是将对象序列化为 JSON 字符串后存储,当需要修改对象某个字段时很不方便,而 Hash 结构可以将对象中的每个字段独立存储,可以针对单个字段做 CRUD。

Hash 的常见命令 说明
HSET key field value 添加或者修改 hash 类型 key 的 field 的值
HGET key field 获取一个 hash 类型 key 的 field 的值
HMSET 批量添加多个 hash 类型 key 的 field 的值
HMGET 批量获取多个 hash 类型 key 的 field 的值
HGETALL 获取一个 hash 类型的 key 中的所有的 field 和 value
HKEYS 获取一个 hash 类型的 key 中的所有的 field
HVALS 获取一个 hash 类型的 key 中的所有的 value
HINCRBY 让一个 hash 类型 key 的字段值自增并指定步长
HSETNX 添加一个 hash 类型的 key 的 field 值,前提是这个 field 不存在,否则不执行
# hset key field value
127.0.0.1:6379> hset myhash username lisi
(integer) 1
127.0.0.1:6379> hset myhash password 123
(integer) 1

# hget key field: 获取指定的field对应的值
127.0.0.1:6379> hget myhash username
"lisi"

# hgetall key:获取所有的field和value
127.0.0.1:6379> hgetall myhash
1) "username"
2) "lisi"
3) "password"
4) "123"

127.0.0.1:6379> hget myhash username
"lisi"

# hdel key field
127.0.0.1:6379> hdel myhash username
(integer) 1

存储结构

Hash 类型采用的数组+链表的形式存储的。但是 Redis 的字典的值只能是字符串。

Java 的 Hash Map 在字典很大时,rehash 是个耗时的操作,需要一次性全部 rehash。Redis 为了追求高性能,不能堵塞服务,所以采用了渐进式 rehash 策略。渐进式 rehash 会在 rehash 的同时,保留新旧两个 hash 结构,查询时会同时查询两个 hash 结构,然后在后续的定时任务以及hash操作指令中,循序渐进地将旧 hash 的内容一点点地迁移到新的 hash 结构中。当搬迁完成了,就会使用新的 hash 结构取而代之。

当 hash 移除了最后一个元素之后,该数据结构被自动删除,内存被回收。

hash 可以对用户结构中的每个字段单独存储。这样当我们需要获取用户信息时可以进行部分获取。但是 hash 结构的存储消耗要高于单个字符串。

List类型

Redis 中的 List 类型与 Java 中的 LinkedList 类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。

特征也与 LinkedList 类似

常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。

List 的常见命令 说明
LPUSH key element … 向列表左侧插入一个或多个元素
LPOP key 移除并返回列表左侧的第一个元素,没有则返回 nil
RPUSH key element … 向列表右侧插入一个或多个元素
RPOP key 移除并返回列表右侧的第一个元素
LRANGE key star end 返回一段角标范围内的所有元素
BLPOP 和 BRPOP 与 LPOP 和 RPOP 类似,只不过在没有元素时会等待指定的时间,而不是直接返回 nil。
有点类似于阻塞队列。Timeout 时间是以秒为单位
LTRIM key star end 保留区间内的值,区间之外的则统统删除。

可以用 List 结构模拟 stack、queue 和 BlockingQueue。

# lpush key value: 将元素加入列表左表   把value加入名为key的链表中
# rpush key value:将元素加入列表右边
127.0.0.1:6379> lpush myList a
(integer) 1
127.0.0.1:6379> lpush myList b
(integer) 2
127.0.0.1:6379> rpush myList c
(integer) 3
# lrange key start end :范围获取
127.0.0.1:6379> lrange myList 0 -1
1) "b"
2) "a"
3) "c"

存储结构

列表元素较少的情况下,会使用一块连续的内存存储,这个结构是 ziplist,即压缩列表。它将所有的元素彼此紧挨着一起存储,分配的是一块连续的内存。当数据量比较多的时候才会改成 quicklist。因为普通的链表需要的附加指针空间太大,会浪费空间,还会加重内存的碎片化,比如某普通链表里存的只是 int 类型的数据,结构上还需要两个额外的指针 prev 和 next 。所以 Redis 将链表和 ziplist 结合起来组成了quicklist,也就是将多个 ziplist 使用双向指针串起来使用。

flowchart LR
ziplist1<--> ziplist2 <--> ziplist3 <--> ziplist4

Set类型

Redis 的 Set 结构与 Java 中的 HashSet 类似,可以看做是一个 value 为 null 的 HashMap。因为也是一个 hash 表,因此具备与 HashSet 类似的特征。

Set 常见命令 说明
SADD key member … 向 set 中添加一个或多个元素
SREM key member … 移除 set 中的指定元素
SCARD key 返回 set 中元素的个数
SISMEMBER key member 判断一个元素是否存在于 set 中
SMEMBERS 获取 set 中的所有元素
lSINTER key1 key2 … 求 key1 与 key2 的交集 S1,S2 的交(B、C)
lSDIFF key1 key2 … 求 key1 与 key2 的差集,S1,S2 的差(A)
lSUNION key1 key2 .. 求 key1 和 key2 的并集,S1,S2 的并(A、B、C、D)
127.0.0.1:6379> sadd myset a
(integer) 1
127.0.0.1:6379> sadd myset a
(integer) 0

# 获取:smembers key ==> 获取 set 集合中所有元素
127.0.0.1:6379> smembers myset
1) "a"

# srem key value ==> 删除 set 集合中的某个元素	
127.0.0.1:6379> srem myset a
(integer) 

练习

将下列数据用 Redis 的 Set 集合来存储

利用 Set 的命令实现下列功能

SortedSet类型

Redis 的 SortedSet 是一个可排序的 set 集合,与 Java 中的 TreeSet 有些类似,但底层数据结构却差别很大。SortedSet 中的每一个元素都带有一个 score 属性,可以基于 score 属性对元素排序,底层的实现是一个跳表(SkipList,用来排序的)+ hash 表。

SortedSet 具备下列特性

因为 SortedSet 的可排序特性,经常被用来实现排行榜这样的功能。

SortedSet 的常见命令 默认都是升序排名
ZADD key score membe 添加一个或多个元素到 sorted set ,如果已经存在则更新其 score 值
ZREM key member 删除 sorted set 中的一个指定元素
ZSCORE key member 获取 sorted set 中的指定元素的 score 值
ZRANK key member 获取 sorted set 中的指定元素的排名
ZCARD key 获取 sorted set 中的元素个数
ZCOUNT key min max 统计 score 值在给定范围内的所有元素的个数
ZINCRBY key increment member 让 sorted set 中的指定元素自增,步长为指定的 increment 值,比如自增 increment(自增 1)
ZRANGE key min max 按照 score 排序后,获取指定排名范围内的元素 (zrange age 0 9 ,取排名前 10 的元素)
ZRANGEBYSCORE key min max 按照 score 排序后,获取指定 score 范围内的元素
ZDIFF、ZINTER、ZUNION 求差集、交集、并集

注意:所有的排名默认都是升序,如果要降序则在命令的 Z 后面添加 REV 即可:ZREVRange 就是降序了

# 存储:zadd key score value  把 value 加入到键值为 key 的集合中,
# 并规定分数 score,根据 score 进行排序【从小到大】
127.0.0.1:6379> zadd mysort 60 zhangsan
(integer) 1
127.0.0.1:6379> zadd mysort 50 lisi
(integer) 1
127.0.0.1:6379> zadd mysort 80 wangwu
(integer) 1
# 获取:zrange key start end [withscores]
127.0.0.1:6379> zrange mysort 0 -1
1) "lisi"
2) "zhangsan"
3) "wangwu"

127.0.0.1:6379> zrange mysort 0 -1 withscores
1) "zhangsan"
2) "60"
3) "wangwu"
4) "80"
5) "lisi"
6) "500"
# 删除:zrem key value
127.0.0.1:6379> zrem mysort lisi
(integer) 1

存储结构

SortedSet 也可称为 zset,其内部的排序功能是通过“跳跃列表”数据结构来实现的。用 SkipList 实现排序,用 HT 来实现根据 key 快速查找 value。

练习

将班级的下列学生得分存入 Redis 的 SortedSet 中:

并实现下列功能:

通用知识

容器型数据结构的通用规则

list、set、hash、zset 这四种数据结构是容器型数据结构,它们共享下面两条通用规则。

  1. create if not exists:如果容器不存在,那就创建一个,再进行操作。比如 rpush 操作刚开始是没有列表的,Redis 就会自动创建一个,然后再 rpush 进去新元素。
  2. drop if no elements:如果容器里的元素没有了,那么立即删除容器,释放内存。这意味着 lpop 操作到最后一个元素,列表就消失了。

过期时间

Redis 所有的数据结构都可以设置过期时间,时间到了,Redis 会自动删除相应的对象(但是该对象的内存不一定会立马归还给操作系统)。需要注意的是,过期是以对象为单位的,比如一个 hash 结构的过期是整个 hash 对象的过期,而不是其中的某个子 key 的过期。

有一个需要特别注意的地方,如果一个字符串已经设置了过期时间,然后你调用 set 方法修改了它,它的过期时间会消失。

内存回收

Redis 并不总是立即将空闲内存归还给操作系统。

如果当前 Redis 占用的内存有 10G,删除 1GB 的 key 后,再去观察 Redis 内存的占用情况会发现内存变化不会太大。因为 OS 是以页为单位回收内存(内存默认是段页式存储?)的,这个页面上只要还有一个 key 在使用就不会被回收。虽然删除了 1GB 的 key,但是这些 key 分散在了很多页面中,如果每个页面都有其他 key 存在,内存就不会被回收!

内存分配

内存分配需要考虑使用何种算法划分内存页,如何处理内存碎片。Redis 则是直接采用现成的内存分配库。可以使用 jemalloc (facebook)、tcmalloc (google)。jemalloc 的性能比 tcmalloc 稍好,因此 Redis 默认用的 jemalloc。

分布式锁

分布在不同机器上的应用需要操作同一个资源,如修改用户状态,需要多个步骤,如先读取再修改,如果不施加访问条件,多个机器同时尝试修改就会出现并发问题。这是可以使用 Redis 的 setnx 来实现分布式锁(理论上,自己写个服务,部署在一台机器上也可以实现分布式锁的功能)。

使用分布式锁

分布式锁本质上要实现的目标就是在 Redis 里面占一个“坑”,当别的进程也要来占坑时,发现那里已经有一根“大萝卜”了,就只好放弃或者稍后再试。

占坑一般使用 setnx(set if not exists) 指令,只允许被一个客户端占坑。先来先占,用完了,再调用 del 指令释放“坑”。

但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。我们可以在拿到锁之后,再给锁加上一个过期时间,比如 5s,这样即使中间出现异常也可以保证 5s 之后锁会自动释放。

但是,如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因为机器掉电或者是人为造成的,就会导致 expire 得不到执行,也会造成死锁。

flowchart LR
set-->|竞争|setnx--x|过期|expire

Redis 2.8 加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,解决了该问题。

# 不存在才设置,设置过期时间为 10s
set key value nx ex 10

# 获取剩余的过期时间
ttl key

# 毫秒级别的获取
pttl key

超时问题

Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行得太长,以至于超出了锁的超时限制,就会出现问题。因为这时候第一个线程持有的锁过期了,临界区的逻辑还没有执行完,而同时第二个线程就提前重新持有了这把锁,导致临界区代码不能得到严格串行执行。

为了避免这个问题,Redis 分布式锁不要用于较长时间的任务。如果真的偶尔出现了问题,造成的数据小错乱可能需要人工介入解决。

稍微安全一点的方案是将 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key,这是为了确保当前线程占有的锁不会被其他线程释放,除非这个锁是因为过期了而被服务器自动释放的。

但是由于查找 value 和删除 key 不是原子性的,需要使用 lua 脚本保证连续多个指令的原子性执行。

但是如果真的超时了,当前线程的逻辑没有执行完,其他线程也会乘虚而入。

可重入锁

可参考 Java 的 ReentrantLock 来编写基于 Redis 的可重入分布式锁。也可采用 Redssion 这个三方库。

延时队列

如果在某个场景我们需要用到延迟队列,且场景比较简单,对消息的可靠性要求也不好高,此时可以用 Redis 的延迟队列。

Redis 的消息队列不是专业的消息队列,它没有非常多的高级特性,没有 ack 保证,如果对消息的可靠性有着极高要求,那么它就不适合使用。

异步消息队列

Redis 的 list(列表)数据结构常用来作为异步消息队列使用,用 rpush 和 lpush 操作入队列, lpop 和 rpop 操作出队列。但是如果队列为空了,消费者仍会不断执行 lpop/rpop,这种空轮询会拉高客户端的 CPU 消耗,也会拉高 Redis 的 QPS,降低 Redis 的性能。

上述问题可以用线程定期 sleep 来解决,但是这种方式不是最佳的。比较合理的是利用 list 的阻塞读取 blpop/brpop。有数据则读取,无数据则阻塞住。但是,如果线程一直阻塞在那里,Redis 的客户端连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用。这个时候 blpop/brpop 会抛出异常。因此,在编写阻塞获取这类方法的时候要小写,如果捕获到异常需要重试,指定时间范围内重试失败后再放弃。

延迟队列

先前提到了分布式锁请求加锁失败的问题。遇到加锁失败,可以用以下三种策略

直接抛出异常

这种方式适合用户发起请求出现异常的情况。返回给用户错误提示,让他稍后再试。

sleep 一会

sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞得比较频繁或者队列里消息比较多,sleep 可能并不合适。如果因为个别死锁的 key 导致加锁不成功,线程会彻底堵死,导致后续消息永远得不到及时处理。

延迟队列

这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突。

Redis 实现延迟队列

Redis 可以使用 SortedSet 来模拟延时队列。消息本身作为 key,到期处理时间作为 score。然后多线程轮询 SortedSet 获取到期的任务进行处理。使用多线程处理可以保证服务的可用性,避免某些线程挂调无法及时处理。

多线程争用一个延迟队列的话需要考虑并发安全问题。Redis 处理命令是单线程的,因此是线程安全的。可能出现并发问题的是多个线程拿到了同一个待处理的消息,要确保只有一个线程消费。

def loop():
    while True:
        values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
        if not values:
            time.sleep(1) # 没有数据就休眠一秒
            continue
		value = values[0]
        success = redis.zrem("delay-queue",value)
        if success:
            msg = json.loads(value) # 反序列化为对象
            hande_msg(msg) # 消费消息的函数

消费前先尝试移除,如果移除成功,则说明自己是第一个删除的人,有权执行,正常执行。如果移除失败,说明有人先一步移除了,就不消费消息。

同一个任务可能会被多个消费端获取后再使用 zrem 进行争抢,那些没抢到的都白取了一次任务。可以考虑使用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样就不会出现这种浪费了。

位图

位图,bitmap,就是用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。由于采用了 Bit 为单位来存储数据,因此在存储空间方面,可以大大节省。典型的场景有:记录用户一年的签到记录,签到了是 1,没签到是 0。

基本用法

setbit key offset value

时间复杂度:O(1),offset 从 0 开始计数。

> setbit name 0 1
0
> get name
"\x80" 

如果对应位的字节是不可打印字符,redis-cli 会显示该字符的十六进制形式。

统计和查找

Redis 提供了位图统计指令 bitcount 和位图查找指令 bitpos。bitcount 用来统计指定位置范围内 1 的个数,bitpos 用来查找指定范围内出现的第一个 0 或 1。

可以通过 bitcount 统计用户一共签到了多少天,通过 bitpos 指令查找用户从哪一天开始第一次签到。这两个命令的范围参数 [start, end] 是字节索引,而非 bit 位。因此,如果我们想统计某个月内用户签到了几天,必须将这个月所覆盖的字节内容全部取出来,然后在内存里进行统计。

# E 的 ASCII 69, 二进制为 0100,0101
# G 的 ASCII 71, 二进制为 0100,0111

# 设置某个 bit 位为 1
> setbit name 0 1

# 直接设置整个字符串
> set name EEGG
OK
> bitcount name
(integer) 14
> bitcount name 0 0 # 第 0 个字符为 1 的 bit 的个数
(integer) 3
> bitcount name 0 1 # 0~1 字符(第 0 个和第 1 个)字符的 bit 的个数
(integer) 6
#====================================
> bitpos name 0 # 第一个 0 位
(integer) 0
> bitpos name 1 # 第一个 1 位
(integer) 1
> bitpos name 1 2 2 # 从第三个字符算起,第一个 1 位,都是从 0 开始计数的
(integer) 17

bitfield

bitfield,无需使用管道便可一次进行多个位的操作。bitfield 有三个子指令,分别是 get、set、incrby,它们都可以对指定位片段进行读写,但是最多只能处理 64 个连续的位,如果超过 64 位,就得使用多个子指令, bitfield 可以一次执行多个子指令。

# 从第 0 位开始取 4 位,结果是无符号。
> BITFIELD name get u4 0
1) (integer) 4
# 从第 8 位开始取 8 位,结果是无符号。正好和 E 的 ASCII 对上了
> BITFIELD name get u8 8
1) (integer) 69

一次执行多个子指令

# 从第 0 位开始取 4 位,结果是无符号。
# 从第 0 位开始取 3 位,结果是无符号。
> bitfield name get u4 0 get u3 0
1) (integer) 4
2) (integer) 2

incrby,对指定范围的位进行自增操作。

# 从第三个位开始,对接下来的 4 位无符号数 +1
> bitfield name incrby u4 2 1
1) (integer) 2

HyperLogLog

HyperLogLog 提供了不精确的去重计数方案,标准误差是 0.81%,经可以满足常用的 UV 统计需求。

基本用法

HyperLogLog 提供了两个指令 pfadd 和 pfcount,一个是增加计数,一个是获取计数。pfadd 和 set 集合的 sadd 的用法是一样的,来一个用户 ID,就将用户 ID 放进去就是。pfcount 和 scard 的用法是一样的,直接获取计数值。

除了计数外还提供了合并多个数据的指令,pfmerge。

> pfadd uv user1
(integer) 1
> pfadd uv user2
(integer) 1
> pfadd uv user3
(integer) 1
> pfadd uv user4
(integer) 1
> pfcount uv
(integer) 4

用 Python 脚本模拟大量用户的 UV 请求。

# python 简单方便,因此用 python 测试
import redis

def test_uv():
    client = redis.StrictRedis()
    for item in range(100000):
        client.pfadd("uv", "user%d" % item)
    total = client.pfcount("uv")
    # 99725
    print(total)

if __name__ == '__main__':
    test_uv()

执行了 10w 次,但是最后的结果是 9.9725w,误差不大。

pfmerge 将多个 key 的数据进行合并

> pfadd u1 a b c
(integer) 1
> pfadd u2 a1 b1 c1
(integer) 1
> pfmerge new_u u1 u2
OK
> pfcount new_u
(integer) 6

基本原理

HyperLogLog 实际上不会存储每个元素的值,它使用的是概率算法,通过存储元素的 hash 值的第一个 1 的位置,来计算元素数量。

(28条消息) HyperLogLog 使用及其算法原理详细讲解_李子捌的博客-CSDN博客_hyperloglog

pf 的内存占用

Redis 的 HyperLogLog 实现中用的是 16384 个桶,也就是 $2^{14}$,每个桶的 maxbits 需要 6 个 bit 来存储,最大可以表示 maxbits=63,于是总共占用内存就是 $2^{14}*6/8$”,算出来的结果即是 12KB。

不过 Redis 对 HyperLogLog 的存储进行了优化,在计数比较小时,它的存储空间采用稀疏矩阵存储,空间占用很小,仅仅在计数慢慢变大、稀疏矩阵占用空间渐渐超过了阈值时,才会一次性转变成稠密矩阵,才会占用 12KB 的空间。

布隆过滤器

用于在大量数据下的去重。它在起到去重作用的同时,在空间上还能节省 90% 以上。Redis 4.0 的时候开始提供布隆过滤器的插件。

基本用法

从 docker 中拉取带有布隆过滤器插件的 redis

docker pull redislabs/rebloom
docker run -p6379:6379 redislabs/rebloom
redis-cli

布隆过滤器有两个基本指令,bf.add 和 bf.exists。

基本原理

布隆过滤器的原理也和概率相关:每个布隆过滤器对应到 Redis 的数据结构里面就是一个大型的位数组和几个不一样的无偏 hash 函数。无偏就是能够把元素的 hash 值算得比较均匀,让元素被 hash 映射到位数组中的位置比较随机。

判断数据是否存在时也是根据数据计算出这个数据应该在那几个 bit 位上,然后查找对应的几个 bit 是否为 1,都为 1 说明存在了,有不为 1 的说明不存在。

空间占用估计

布隆过滤器的空间占用计算的公式如下 \(k=0.7*(1/n) -- 约等于\\ f=0.6185^{1/n}\)

从上述表述可以看出,位数组相对越长 $\frac{1}{n}$,错误率 f 越低。

如果元素的内容长,如十几甚至是上百字节,除此之外还要一个指针指向元素,这个指针又会占 4 个字节或 8 个字节。而布隆过滤器在错误率为 0.1% 时,仅仅需要 2 个字节,空间优势非常明显。

count=900000000
error=0.001
functions=9.96
size=1579568.94kb

https://krisives.github.io/bloomcalculator

当实际元素超出预计元素时,错误率的变化可用该公式进行计算. \(f=(1-0.5^t)^k \ \ \ k \ 为 \ hash \ 函数的最佳数量,\ t \ 为倍数比\) 错误率为 0.1% 时,倍数比为 2 时,错误率会升至 5%。

限流

5种常见限流算法! - Java填坑笔记 - 博客园 (cnblogs.com)

你对限流了解多少? (qq.com)

当系统处理能力有限时,可以限制请求的数量,阻止执行计划以外的请求。简单的单机限流可以用信号量、guava 库。如果是分布式场景下限定了某个用户在指定时间内只能点击多少次,可以用 Redis 来做限流。

固定窗口限流算法

首先维护一个计数器,将单位时间段当做一个窗口,计数器记录这个窗口接收请求的次数。

具体做法:设置一个 key-value,value 为请求的次数,并设置 ttl。第一次请求没有数据时就初始化,后面请求时,如果发现 key 的有效期内容请求次数超过了限定的次数,则不再允许调用对应的方法。这是最简单粗暴的方法,限流的方式不够平稳。

滑动窗口限流算法

滑动窗口限流解决固定窗口临界值的问题。它将单位时间周期分为 n 个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。如限定了 1s 可以执行 10 次,那么 0~100ms 的时候可以执行一次,100~200ms 的时候可以执行一次。这种限流方式更为平稳。滑动窗口算法虽然解决了固定窗口的临界问题,但是一旦到达限流后,请求都会直接暴力被拒绝。

思考下,利用 Redis 应该怎么做呢?如何维护这个时间窗口呢?既要可以维护时间窗口,又能知道对应窗口内执行了多少次,用什么数据结构合适?SortedSet!

SortedSet 记录时间戳,获取的时候获取 [now-period, now] 范围内的数据(zcount key now-period, now)没超过阈值就放行,同时记得删除时间窗口之前的记录。

import time
import redis

client = redis.StrictRedis()

def is_action_allowed(user_id, action_key, period, max_count):
    key = "times: %s:%s" % (user_id, action_key)
    now_ts = int(time.time() * 1000)
    with client.pipeline() as pipe:
        # value 和 score 都用时间戳
        pipe.zadd(key, now_ts, now_ts)
        # 移除时间窗口之前的行为记录
        pipe.zremrangebyscore(key, 0, now_ts - period * 1000)
        # 获取窗口内的行为数量
        pipe.zcard(key)
        # 设置 zset 过期时间,避免冷用户持续占用内存
        pipe.expire(key, period+1)
        # 批量执行
        _,_,current_count <= pipe.execute()
	return current_count<=max_count

for item in range(20):
    print(is_action_allowed("test", "reply", 60 ,10))

漏桶算法

漏桶算法面对限流,就更加的柔性,不存在直接的粗暴拒绝。可以往漏桶中以任意速率流入水,以固定的速率流出水。只有当水超过桶的容量时,请求才会被溢出,也就是被丢弃。

在正常流量的时候,系统按照固定的速率处理请求,是我们想要的。但是面对突发流量的时候,漏桶算法还是循规蹈矩地处理请求,这就不是我们想看到的啦。流量变突发时,我们肯定希望系统尽量快点处理请求,提升用户体验。

Redis 4.0 提供了一个限流 Redis 模块,它叫 Redis-Cell。该模块也使用了漏斗算法,并提供了原子的限流指令。

分布式限流 redis-cell - 简书 (jianshu.com)

令牌桶算法

面对突发流量的时候,我们可以使用令牌桶算法限流。

如果令牌发放的策略正确,这个系统即不会被拖垮,也能提高机器的利用率。Guava 的 RateLimiter 限流组件,就是基于令牌桶算法实现的。

Java 的 Semaphore 可以实现该功能,利用定时器定期向 Semaphore 中投放许可证即可。Redis 的话,可以用 String 类型的结构 lock-time,一个线程专门用来定期为 time++,其他请求执行时利用 lua 对 lock 做出判断和自减操作,自减成功则可执行,失败则说明限流了。

GeoHash

GeoHash 算法是业界比较通用的地理位置距离排序算法,Redis 也使用 GeoHash 算法。GeoHash 算法将二维的经纬度数据映射到一维的整数,这样所有的元素都将挂载到一条线上,距离靠近的二维坐标映射到一维后的点之间距离也会很接近。当我们想要计算“附近的人”时,首先将目标位置映射到这条线上,然后在这个一维的线上获取附近的点即可。

注意事项

在一个地图应用中,车的数据、餐馆的数据、人的数据可能会有几百万条甚至几千万条,如果使用 Redis 的 Geo 数据结构,它们将被全部放在一个 zset 集合中。在 Redis 的集群环境中,集合可能会从一个节点迁移到另一个节点,如果单个 key 的数据过大,会对集群的迁移工作造成较大的影响,在集群环境中单个 key 对应的数据量不宜超过 1MB,否则会导致集群迁移出现卡顿现象,影响线上服务的正常运行。

所以,建议 Geo 的数据使用单独的 Redis 实例部署,不使用集群环境。

如果数据量过亿个,甚至更大,就需要对 Geo 数据进行拆分,按国家拆分、按省拆分、按市拆分,在人口特大城市甚至可以按区拆分。这样就可以显著降低单个 zset 集合的大小。

Scan

在平时线上 Redis 维护工作中,有时候需要从 Redis 实例的成千上万个 key 中找出特定前缀的 key 列表来手动处理数据,可能是修改它的值,也可能是删除 key。这里就有一个问题,如何从海量的 key 中找出满足特定前缀的 key 列表?

Redis 提供了一个简单粗暴的指令 keys 用来列出所有满足特定正则字符串规则的 key。但是 keys 不能分页。而且 keys 算法是遍历算法,复杂度是 O(n),如果实例中有千万级以上的 key,这个指令就会导致 Redis 服务卡顿,所有读写 Redis 的其他指令都会被延后甚至会超时报错。

Redis 在 2.8 版本中加入了指令——scan,解决了上述的问题。

特点

基本用法

先向 Redis 中插入大量的数据进行测试。仍然是使用 Python 脚本。

import redis

client = redis.StrictRedis()
for item in range(200):
    client.set("key%d" % item,item)

利用 scan 获取 key,语法为 scan ooo match kkk count xxx,如 scan 0 match key1* count 10,从 0 开始,查找符合 key1* 匹配规则的 key,查找 10 个(槽位,最后扫描出的元素数量可能少于 10)。

> scan 0 match key1* count 10
1) "48"
2) 1) "key157"
   2) "key199"
   3) "key148"
   4) "key193"
   5) "key125"
   6) "key181"
> scan 48 match key1* count 10
1) "120"
2) 1) "key111"
   2) "key131"
   3) "key177"
   4) "key129"
   5) "key192"
   6) "key182"

当游标值为零时,意味着遍历结束了。

槽位

在 Redis 中所有的 key 都存储在一个很大的哈希表中。该哈希表由数组和链表组成。数组的大小总是 $2^n,n ≥0$,每扩容一次数组,大小空间加倍。

scan 指令返回的游标就是第一维数组的位置索引(这个位置索引也称为槽 slot)。如果不考虑字典的扩容缩容,直接按数组下标挨个遍历就行了。

limit 参数就表示需要遍历的槽位数,之所以返回的结果可能多可能少,是因为不是所有的槽位上都有元素/链表,有些槽位可能是空的,有些槽位上挂接的链表上的元素可能会有多个。每一次遍历都会将 limit 数量的槽位上挂接的所有链表元素进行模式匹配过滤后,一次性返回给客户端。

遍历顺序

这块书里写的很模糊,后面查到合适的资料再进行补充。

big key 扫描

有时候会因为业务人员使用不当,在 Redis 实例中形成了很大的对象,比如一个很大的 hash 或一个很大的 zset,都是可能出现的。这样的对象给 Redis 的集群数据迁移带来了很大的问题。

如果发现 Redis 的内存大起大落,这极有可能是因为大 key 导致的,这时候就需要先定位出具体是哪个 key,再进一步定位出具体的业务来源,然后改进相关业务代码设计。

如果 key 非常多,直接使用 keys 拿到所有的 key 再逐个遍历的话,会给 Redis 带来极大的压力,导致卡顿。为了避免给线上 Redis 带来卡顿,就要用到 scan 指令,对于扫描出来的每一个 key,使用 type 指令获得 key 的类型,然后使用相应数据结构的 size 或者 len 方法来得到它的大小,对于每一种类型,将大小排名的前若干名作为扫描结果展示出来。

上面这样的过程需要编写脚本,比较烦琐,Redis 官方在 redis-cli 指令中提供了这样的扫描功能,我们可以直接拿来使用。

redis-cli -h 127.0.0.1 -p 7001 --bigkeys

如果担心这个指令会大幅抬升 Redis 的 ops 导致线上报警,还可以增加一个休眠参数。

redis-cli -h 127.0.0.1 -p 7001 --bigkeys -i 0.1

命令的执行结果

payphone@cv:/home/redis-6.2.1$ ./bin/redis-cli -h 127.0.0.1 -p 6379 --bigkeys

# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).

[00.00%] Biggest string found so far '"key157"' with 3 bytes

-------- summary -------

Sampled 200 keys in the keyspace!
Total key length in bytes is 1090 (avg len 5.45)

Biggest string found '"key157"' has 3 bytes

0 lists with 0 items (00.00% of keys, avg size 0.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
200 strings with 490 bytes (100.00% of keys, avg size 2.45)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
0 sets with 0 members (00.00% of keys, avg size 0.00)
0 zsets with 0 members (00.00% of keys, avg size 0.00)

设计KEY

Redis 没有类似 MySQL 中的 Table 的概念,我们该如何区分不同类型的 key 呢?例如,需要存储用户、商品信息到 redis,有一个用户 id 是1,有一个商品 id 恰好也是 1。

可以采用 项目名:业务名:类型:id 这种多个单词形成层级结构,单词间用 ‘:’ 隔开的方式设计 key。

在 redis 内部,最后会以层级关系显示这些数据。

如果 value 是一个 Java 对象,如一个 User 对象,则可以将对象序列化为 JSON 字符串后存储。

key value
user:1 {“id”:1, “name”: “Jack”, “age”: 21}
product:1 {“id”:1, “name”: “小米11”, “price”: 4999}

Redis 的 key 的格式==>[项目名]:[业务名]:[类型]:[id]

持久化

Redis 持久化机制有两种方式:RDB 和 AOF,而持久化策略有四种。

RDB:默认方式,不需要进行配置,默认就使用这种机制。在一定的间隔时间中,检测 key 的变化情况,然后持久化数据。

编辑 redis.windwos.conf 文件

after 900 sec (15 min) if at least 1 key changed

save 900 1

after 300 sec (5 min) if at least 10 keys changed

save 300 10

after 60 sec if at least 10000 keys changed

save 60 10000

重新启动 redis 服务器,并指定配置文件名称

D:\redis\windows-64\redis-2.8.9>redis-server.exe redis.windows.conf	

AOF:日志记录的方式,可以记录每一条命令的操作。可以每一次命令操作后,持久化数据。

编辑 redis.windwos.conf 文件

# appendonly no(关闭aof) --> appendonly yes (开启aof)
appendfsync always		# 每一次操作都进行持久化
appendfsync everysec	# 每隔一秒进行一次持久化
appendfsync no			# 不进行持久化

Java客户端

在 Redis 官网中提供了各种语言的客户端,地址:https://redis.io/clients

Jedis

①引入依赖

<!-- jedis 客户端-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

②建立连接

jedis = new Jedis("192.168.160.44", 6379);

③使用 Jedis

jedis.set("name", "hahah");

④释放资源

jedis.close();

完整代码

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import redis.clients.jedis.Jedis;

@SpringBootTest
public class TestJedis {
    private Jedis jedis;

    @Before
    public void setUp() {
        jedis = new Jedis("192.168.160.44", 6379);
    }

    @Test
    public void test() {
        jedis.set("name", "hahah");
    }

    @Test
    public void connection(){ // 测试连接
        jedis.set("111","1");
        // 设置超时时间
        jedis.expire("1111",30);
        jedis.close();
        Client client = jedis.getClient();
    }

    @Test
    public void saveString() throws JsonProcessingException { // 存储String
        Person p1 = new Person("kkl1", "18", "nan", 88, "1997-11-11");
        Person p2 = new Person("kkl2", "18", "nan", 88, "1997-11-11");
        Person p3 = new Person("kkl3", "18", "nan", 88, "1997-11-11");
        ArrayList<Person> list = new ArrayList<>();
        list.add(p1);
        list.add(p2);
        list.add(p3);
        ObjectMapper obj = new ObjectMapper();
        String json = obj.writeValueAsString(list);
        jedis.set("data",json);
        jedis.expire("data",300);
        String data = jedis.get("data");
        System.out.println(data);
        jedis.close();
    }

    @Test
    public void saveList(){ // 操作List
        jedis.lpush("mylist","a");
        jedis.lpush("mylist","aa");
        jedis.lpush("mylist","aaa");
        jedis.lpush("mylist","aaaa");
        List<String> mylist = jedis.lrange("mylist", 0, -1);
        mylist.stream().forEach(System.out::println);
        jedis.close();
    }

    @Test
    public void saveSet(){ // 操作set,不包括重复元素
        jedis.sadd("setdemo","1","2","3","4");
        jedis.sadd("setdemo","1","5","7","1");
        Set<String> setdemo = jedis.smembers("setdemo");
        setdemo.stream().forEach(System.out::println);
        jedis.srem("setdemo","1","2","3","4","5");
        jedis.close();
    }
    
    @After
    public void close() {
        jedis.close();
    }
}

Jedis 连接池

Jedis 本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此推荐使用 Jedis 连接池代替 Jedis 的直连方式。而 JedisPool 连接池是 Redis 自带的。

// Jedis 连接池
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisUtils {
    private static final JedisPool jedisPool;

    static {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(10);
        config.setMinIdle(5);
        config.setMaxTotal(20);
        config.setMaxWaitMillis(1000);
        jedisPool = new JedisPool(config, "192.168.2.240");
    }

    public static Jedis getJedis() {
        return jedisPool.getResource();
    }

    public static void close(Jedis jedis){
        jedis.close(); // 有连接池则归还到池中,没有则直接关闭
    }
}
@SpringBootTest
public class TestJedis {
    private Jedis jedis;

    @Before
    public void setUp() {
        jedis = JedisUtils.getJedis();
    }

    @Test
    public void test() {
        jedis.set("name", "hahah");
        String name = jedis.get("name");
        System.out.println(name);
    }

    @After
    public void close() {
        jedis.close();
    }
}

这里解释下为什么说 Jedis 是线程不安全的。

jedis非线程安全 - 简书 (jianshu.com)

简单说就是,Jedis 共享了 Socket。Jedis 类中有 RedisInputStream 和 RedisOutputStream 两个属性,而发送命令和获取返回值都是使用这两个成员变量,这很容易引发多线程问题。Jedis 执行命令前需要创建一个 conneciton 连接:

public void connect() {
    // 如果 socket 连接关闭了的话,创建一个新的连接。
    if (!this.isConnected()) {
        try {
            this.socket = this.jedisSocketFactory.createSocket();
            this.outputStream = new RedisOutputStream(this.socket.getOutputStream());
            this.inputStream = new RedisInputStream(this.socket.getInputStream());
        } catch (IOException var2) {
            this.broken = true;
            throw new JedisConnectionException("Failed connecting to " + this.jedisSocketFactory.getDescription(), var2);
        }
    }
}

可能两个线程出现这种情况:

sequenceDiagram
participant T1 as 线程1
participant T2 as 线程2
participant S as Socket
T1->>S:进入了 if ( !this.isConnected() ) 花括号里
T1-->>T2:线程上下文切换,线程2开始执行
T2->>S:也进入了 if ( !this.isConnected() ) 花括号里
T2-->>T1:线程上下文切换,线程1继续执行
T1->>S:创建了 socket,准备执行InputStream方法,接收redis的响应了
T1-->>T2:线程上下文切换,线程2开始执行
T2->>S:创建了socket,但是还没有初始化连接
T2-->>T1:线程上下文切换,线程1继续执行
T1-->S:线程1用未connection的连接获取InputStream出错

SpringDataRedis

SpringDataRedis 介绍

SpringData 是 Spring 中数据操作的模块,包含对各种数据库的集成,其中对 Redis 的集成模块就叫做 SpringDataRedis

官网地址:https://spring.io/projects/spring-data-redis

快速入门

SpringDataRedis 中提供了 RedisTemplate 工具类,其中封装了各种对 Redis 的操作。并且将不同数据类型的操作 API 封装到了不同的类型中:

API 返回值类型 说明
redisTemplate.opsForValue() ValueOperations 操作 String 类型数据
redisTemplate.opsForHash() HashOperations 操作 Hash 类型数据
redisTemplate.opsForList() ListOperations 操作 List 类型数据
redisTemplate.opsForSet() SetOperations 操作 Set 类型数据
redisTemplate.opsForZSet() ZSetOperations 操作 SortedSet 类型数据
redisTemplate ——— 通用的命令

①引入依赖,redis 启动的坐标和连接池依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

②redis 相关配置文件

spring:
  redis:
    host: 192.168.160.44
    port: 6379
    lettuce:
      pool:
        max-active: 10
        max-idle: 10
        min-idle: 1
        time-between-eviction-runs: 10s

③注入 RedisTemplate

@Autowired // 也可以用 @Resource JSR 的标准注解,不过 @Resource 默认安装名称进行注入的
RedisTemplate redisTemplate;

④测试

@SpringBootTest(classes = HmDianPingApplication.class)
@SuppressWarnings("all")
@RunWith(SpringRunner.class) // JUnit4 需要加这句
public class RedisTemplateTest {
    @Autowired
    RedisTemplate redisTemplate;

    @Test
    public void test() {
        // 插入一条string类型数据,如果采用默认的序列化机制的话,
        // name 会被当成一个 Java 对象进行序列化,
        // 默认采用 JDK 序列化,序列化为字节形式。
        redisTemplate.opsForValue().set("name", "hello-world");
        redisTemplate.opsForValue().setIfAbsent("name", "hello");
    }
}

序列化方式

RedisTemplate 可以接收任意 Object 作为 key 和 value 写入 Redis,只不过写入前会把 Object 序列化为字节形式,默认是采用 JDK 序列化(RedisTemplate#defaultSerializer 默认初始化为 JDK 序列化),得到的结果是这样的

设置 RedisTemplate 的序列化方式,存取的时候都会自动转 JSON。

@Configuration
@SuppressWarnings("all")
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);

        GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
        // 设置 key 的序列化方式
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }
}

// redisTemplate.OpsForValue("name",new User(1,"123")); 会自动序列化成字符串

但是在存数据的时候会存入 “@class” 这个属性,表明是什么类型的数据,会占据一些内存空间,如果想节省内存空间就不能用 JSON 序列化器。

{
    "@class":"com.ex.User",
	"name":"Jack"
}

直接使用 Spring 提供的 StringRedisTemplate,先将 key 和 value 统一转成 String 字符串,再存储,这样就不会存储 “@class” 属性,节省内存空间。

@SpringBootTest
@RunWith(SpringRunner.class) // JUnit4 需要加这句
public class RedisTemplateTest {
    @Resource // Resource 按名字注入,变量的名称要为 stringRedisTemplate
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/get")
    public String get() throws JsonProcessingException {
        // Spring 默认的 JSON 处理工具。
        // 还是不用 fastjson,一堆 bug。
        ObjectMapper objectMapper = new ObjectMapper();
        String jack = objectMapper.writeValueAsString(new User("Jack"));
        // 
        stringRedisTemplate.opsForValue().set("user:1000", jack);
        String s = stringRedisTemplate.opsForValue().get("user:1000");
		User user = objectMapper.readValue(s, User.class);
        return user.toString();
    }
}

// ====================== user ======================
public class User {
    public String name;

    public User(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" + "name='" + name + '\'' + '}';
    }
}

踩坑,在使用 JSON 转换工具时,需要为非 public 字段设置 set/get 方法,不然会报错。

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: 
	No serializer found for class

Spring 默认提供的 StringRedisTemplate 类,它的 key 和 value 的序列化方式默认就是 String 方式。

public class StringRedisTemplate extends RedisTemplate<String, String> {

	public StringRedisTemplate() {
		setKeySerializer(RedisSerializer.string());
		setValueSerializer(RedisSerializer.string());
		setHashKeySerializer(RedisSerializer.string());
		setHashValueSerializer(RedisSerializer.string());
	}

	public StringRedisTemplate(RedisConnectionFactory connectionFactory) {
		this();
		setConnectionFactory(connectionFactory);
		afterPropertiesSet();
	}

	protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) {
		return new DefaultStringRedisConnection(connection);
	}
}

序列化方案

方案一

方案二

场景实战

黑马程序员Redis入门到实战教程,全面透析redis底层原理+redis分布式锁+企业解决方案+redis实战_哔哩哔哩_bilibili

内容概述

学会利用 Redis 解决下面这些场景的问题。

导入 SQL 文件 comments.sql,SQL 中包含如下的表

这是一个前后端分离的项目,用 ngnix 进行方向代理,访问 tomcat 集群。Redis 和 MySQL 也有相应的集群。

项目的访问路径是:http://localhost:8081/shop-type/list,如果可以看到数据则证明运行没有问题。

PS:需要修改 application.yaml 文件中的 MySQL、Redis 的地址信息。资料中提供了一个 nginx,复制到任意目录,在控制台输入 start nginx.exe 启动 nginx。然后访问 http://127.0.0.1:8080,即可看到页面。

短信登录

短信登录功能的代码是一步一步迭代,改进的。仔细分析这个迭代改进的过程。

基于Session实现登录

具体的流程:发送验证码,回传验证码给用户,用户填写验证码登录时判断用户是否注册,注册了则直接登录,没注册则先注册然后再跳转到首页。

另一个需要考虑的是,我们需要在一些操作上判断用户是否登录。

显然,用过滤器的方式更好。我们可以把拦截器中获取到的用户信息传递到 Controller 里去,便于后期使用,且要保证使用时的线程安全,这里可以用 ThreadLocal 确保线程之间安全的使用自己线程中的数据。

说明:一条完整的 HTTP 请求对应一个线程,每一个进入 tomcat 的请求都是由一个独立的线程来处理的,如果单例对象在多线程下共享数据,如用同一个 ArrayList 存储,会出现并发安全问题;ThreadLocal 可以保证不同线程之间用的是自己的数据。

使用 ThreadLocal 保存用户信息的必要性:后面不少操作代码都需要获取到用户的信息,然后一个 HTTP 请求是在一个线程中完成的,因此这里先用 ThreadLocal 缓存用户信息,避免后面频繁的从 Session 中取出数据,后面改成 Redis 的话,可以避免频繁的从 Redis 中取出数据。而且,在最后请求处理完毕后,需要将用户从 ThreadLocal 中移除,拦截器的 afterCompletion 恰好是请求处理完毕之后执行

拦截器的两个方法

graph LR
subgraph 拦截器执行流程
	拦截器--拦截所有请求-->用户是否登录--登录-->存入ThreadLocal,&nbsp存UserDTO中存在的字段信息而非User-->放行
	用户是否登录--未登录-->提示未登录,无访问权限
end
graph LR
subgraph 发送验证码执行流程
	发送短信验证码&nbsp/user/code-->调用&nbspsendCode&nbsp方法,具体逻辑自己思考-->返回结果.
end
subgraph 登录执行流程
	发送数据到&nbsp/user/login-->做好数据校验,判断好登录还是注册-->存储用户信息,返回结果.
end

集群的session共享问题

session 共享问题:多台 Tomcat 并不共享 session 存储空间,当请求切换到不同 Tomcat 服务时会出现无法找到 session 数据的问题。Tomcat 后面虽然提供了 session 拷贝的解决办法,但是太浪费内存空间了,而且数据拷贝是有延迟的。

session 的替代方案应该满足

这里,可以使用 Redis 充当缓存,解决 Session 共享问题。也可以采用 JWT 作为解决方案。

采用 JWT 的话,用户登录后返回给前端 JWT,每次访问页面时携带 JWT,后端解析 JWT 获取用户信息,如果可以正常获取到用户信息说明用户已经登录了。Spring Boot2 系列教程(三十七)Spring Security 整合 JWT - 腾讯云开发者社区-腾讯云 (tencent.com)

采用 Redis 的话,用户登陆后以随机 token 为 key 将用户信息存入 Redis,后面前端访问后端时,携带 token,后端依据 token 从 Redis 中查询数据判断用户是否登录。

如果是 Redis+JWT 的组合就有些不三不四的。如非必要,二者选其一即可。

基于Redis实现共享session登录

创建 session 时会自动创建 session id,写到用户浏览器的 cookie 中,每发一次请求就会带着 cookie(session id),这样就可以根据 session id 找到 session,从 session 中取数据。而 session id 客户端自己会维护好。使用 Redis 替代 session 的话,也需要我们自己从 Redis 中查询出是否有该用户的信息(成功登录),需要让客户端维护一个 Redis 的 key 用于查找数据。

存入 Redis 的话需要思考用什么数据类型来保存,用什么作为 key。

前端 token 的格式是 authorization=xxx ,后端可以通过获取 authorization 字段的信息得到 token。

graph LR
subgraph 发送验证
	验证码数据存入&nbspRedis&nbsp并设置有效期-->key:LOGIN_CODE_KEY+phone
end
graph LR
subgraph 登录
	Redis&nbsp中获取验证码-->从数据库中查询用户
	从数据库中查询用户-->|存在|用户信息转&nbspHash&nbsp存入&nbspRedis-->key&nbsp的设计:随机字符串
	从数据库中查询用户-->|不存在|返回错误信息,提示用户不存在
end
// 用户登录 controller
public Result login(){
    // 用户信息转 Hash,可以用 hutool 这个工具类
    // 将 UserDTO 转为 Hash 存储
    String token = UUID.randomUUID().toString(true);
    UserDTO userDTO= BeanUtil.copyProperties(user, UserDTO.class);
    // 对象转 map 时要确保值都是 string 的。
    Map<String, Object> userMap = BeanUtil.beanToMap(user, new HashMap<>(),
                                                     CopyOptions.create().
                                                     setIgnoreNullValue(true).
                                                     setFieldValueEditor((key, value) -> value.toString()));
    // Map<Sring, Object> userMap = BeanUtil.beanToMap(userDTO);
    String loginKey = "login:token:"+token; // 重构到 RedisConstants 中
    stringRedisTemplate.opsForHash().putAll(loginKey, userMap);
    // 设置时间的有效期
    stringRedisTemplate.expire(loginKey, 30,TimeUnit.MINUTES);
    return Result.ok(token);
}

注意,拦截器类是我们手动 new 出来放到 IOC 里面的,所以拦截器里注入 StringRedisTemplate 需要用构造函数手动给对象赋值。然后添加自定义的拦截器又是在 Configurate 类中做的, StringRedisTemplate 可以通过依赖注入实例化。

public class LoginInterceptor implements HandlerInterceptor{
    
    private StringRedisTemplate stringRedisTemplate;
    
    public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    public boolean preHandle(xxx){
        // some code...
        // map 转对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // some code...
    }
}

@Configuration
public class MvcConfig implements WebMvcConfigurer{
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    public void addInterceptors(InterceptorRegistry registry){
        registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(xxx);
    }
}

为什么选择用 Hash 存储用户数据而非 String?

保存登录的用户信息,可以用 String 结构,以 JSON 字符串来保存,比较直观。

key value
shop:user:1 {name: “Jack”, age: 21}
shop:user:2 {name: “Pose”, age: 18}

哈希结构可以将对象中的每个字段独立存储,可以针对单个字段做 CRUD,并且内存占用更少

为什么要用 ThreadLocal 暂存用户信息?

用户每次操作界面时,都需要重新计算用户 token 在 Redis 中的有效期。在拦截器中每次从 Redis 中获取用户数据,如果存在,则重新设置有效期,然后存入 ThreadLocal。后面的操作如果还要用到用户信息,就直接从 ThreadLocal 中取,无需再次请求 Redis(一个用户的请求可能涉及若干操作,而这些若干操作都是在一个线程中完成的,如果这若干操作中都需要用到用户信息,此时可以直接从 ThreadLocal 中取,无需多次访问 Redis,减小了网络开销,内存消耗也不大,用户请求结束后就会从 ThreadLocal 中移除数据)。

总结

Redis 代替 session 需要考虑的问题

登录拦截器的优化,先前的拦截器拦截的只是需要做登录校验的路径,有些路径不需要检验但是也需要刷新 token,所有拦截器部分需要做一个优化。

拦截器的执行顺序有个先后关系,可以通过 Order 来设置。Order 的值越小,执行计划等级越高。

// 负责判断是否登录
public class LoginInterceptor implements HandlerInterceptor{ 
    public boolean preHandle(xxx){
        // some code...
        // map 转 对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // some code...
    }
}

// 负责刷新 token
public class RefreshTokenInterceptor implements HandlerInterceptor{
    
    private StringRedisTemplate stringRedisTemplate;
    
    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    public boolean preHandle(xxx){
        // some code...
        // map 转 对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // some code...
    }
}

@Configuration
public class MvcConfig implements WebMvcConfigurer{
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    public void addInterceptors(InterceptorRegistry registry){
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
            .excludePathPatterns(xxx).order(0); // 数字越小,越先执行。
    }
}

商品查询缓存

缓存介绍

缓存是数据交换的缓冲区(称作 Cache [ kæʃ ] ),是存贮数据的临时地方,一般读写性能较高。

graph LR
浏览器缓存-->应用层缓存-->数据库缓存-->CPU缓存
缓存的作用 缓存的成本
降低后端负载 数据一致性成本
提高读写效率,降低响应时间 代码维护成本
- 运维成本

添加Redis缓存

查询商铺缓存

key 设计为 cache:shop:+id,因为店铺的信息都是需要用到的,因此这里用 String。

@RestController
@RequestMapping("/shop")
public class ShopController{
    // some code
    @GetMapping("/{id}")
    public Result queryShopById(@PathVariable("id") Long id) {
        return Result.ok(shopService.getById(id));
    }
}

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

    @Resource
    public IShopService shopService;

    public Result queryShopById(Long id) {
        // 1. 从 Redis 查询商铺缓存
        // 2. 判断是否存在
        // 3. 存在,直接返回
        JSONUtil.toBean(xxx,Shop.class);
        // 4. 不存在,根据 id 查询数据库
        // 5. 不存在,返回错误
        // 6. 存在,写入 Redis
        // 7. 返回
    }
}

修改 ShopTypeController 中的 queryTypeList 方法,添加查询缓存

key 和 value 的选取,value 可以用 String,也可以用 List。

缓存更新策略

- 内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用 Redis 的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。 给缓存数据添加 TTL 时间,到期后自动删除缓存。下次查询时更新缓存。 编写业务逻辑,在修改数据库的同时,更新缓存。
一致性 一般
维护成本

业务场景

主动更新策略

操作缓存和数据库时有三个问题需要考虑

删除缓存还是更新缓存? 如何保证缓存与数据库的操作的同时成功或失败? 先操作缓存还是先操作数据库?
更新缓存:每次更新数据库都更新缓存,无效写操作较多 单体系统,将缓存与数据库操作放在一个事务 先删除缓存,再操作数据库❌
删除缓存:更新数据库时让缓存失效,查询时再更新缓存 分布式系统,利用 TCC 等分布式事务方案 先操作数据库,再删除缓存✔️

缓存更新策略的最佳实践方案

1️⃣低一致性需求:使用 Redis 自带的内存淘汰机制。

2️⃣高一致性需求:主动更新,并以超时剔除作为兜底方案。

读操作 写操作
缓存命中则直接返回 先写数据库,然后再删除缓存
缓存未命中则查询数据库,并写入缓存,设定超时时间 要确保数据库与缓存操作的原子性

给查询商铺的缓存添加超时剔除和主动更新的策略

修改 ShopController 中的业务逻辑,满足下面的需求:

①根据 id 查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间

②根据 id 修改店铺时,先修改数据库,再删除缓存,这个操作需要加入事务,确保操作的一致性(update 方法上加 @Transactional)

思考下,分布式系统下如何保证事务的一致性?TCC 分布式事务框架。

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

常见的解决方案有两种

解决方案 优点 缺点
缓存空对象 优点:实现简单,维护方便 缺点:额外的内存消耗;可能造成短期的不一致
布隆过滤 优点:内存占用较少,没有多余 key 缺点:实现复杂;存在误判可能

缓存穿透产生的原因是什么?

用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

缓存雪崩

缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。

常见的解决方案有:

1️⃣给不同的 Key 的 TTL 添加随机值,比如原先设置的 20,那么我们再加一个随机的 1~10 的值

2️⃣利用 Redis 集群提高服务的可用性(很严重),尽可能的避免宕机。宕机的话,可以用 redis 的哨兵机制,监控服务。搭建 Redis 服务形成主从,如果有机器宕机(如主宕机了,会随机从一个从机器里选一个替代主)

3️⃣给缓存业务添加降级限流策略

4️⃣给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点 Key 问题(少部分 key 失效),就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。确保重建缓存的时候只有一个访问可以重建就行,其他访问获取旧的数据或等待缓存重建成功然后再从缓存中查询数据。

假定有多个线程(1,2,3,4)查询到了一个失效的缓存,线程 1 发现缓存失效了,开始走重建缓存的流程。此时缓存还未重建完毕,线程 2,3,4 也发现缓存失效了,也走了重建缓存的流程(没有必要)。

常见的解决方案有:互斥锁和逻辑过期。利用互斥锁,让其中一个线程重建缓存即了。

互斥锁需要互相等待,1000 个线程来了,一个负责重建,其他的只能等待。如果重建时间长的话,系统性能会很差。可以为数据设置一个逻辑过期(就是数据永不过期,设置一个逻辑过期时间),解决等待的问题。为了避免多个线程重建缓存,此处也需要获取锁,只让一个线程可以重建缓存。

graph LR
subgraph 逻辑过期
	重建缓存-->获取互斥锁
	获取互斥锁-->|成功|开启新线程重建缓存
	获取互斥锁-->|失败|直接返回过期数据
end
解决方案 优点 缺点
互斥锁 没有额外的内存消耗
保证一致性
实现简单
线程需要等待,性能受影响
可能有死锁风险
逻辑过期 线程无需等待,性能较好 不保证一致性
有额外的内存消耗
实现复杂

基于互斥锁解决缓存击穿问题

需求:修改根据 id 查询商铺的业务,基于互斥锁(redis 的 setnx)方式来解决缓存击穿问题。如果只是一个单机的系统,可以直接使用语言内置的锁来解决缓存击穿。而 Redis String 类型的 setnx 可以实现这种类似的功能。setnx 只有在 key 不存在是才可以写(返回 1),key 存在的话无法写(返回 -1)。一般都会为 setnx 设置一个有效期。

private boolean tryLock(String key){
    Boolean flag = stringRedisTemplate
                        .opsForValue()
        				// 设置有效期避免死锁。如果重建失败,其他线程拿到锁发现没有重建
        				// 成功会继续重建缓存
                        .setIfAbsent(key,"1",10,TimeUnit.SECONDS);
    return flag == null?false:true;
}

private void unlock(String key){
    stringRedisTemplate.delete(key);
}

获取锁成功后,应该再次检测 redis 缓存是否存在,做 doubleCheck,如果存在则无需重建缓存。

public ShopQueryWithMutex(Long id){
    String key = CACHE_SHOP_KEY+id;
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    // 判断是否存在
    if(StrUtil.isNotBlank(shopJson)){
        return JSONUtil.toBean(shopJson, Shop.class)
    }
    // 判断命中是否是空值
    if(shopJson !=null){
        // 返回一个错误信息
        return null;
    }
    // some code
    // 不存在,查询数据库
    Shop shop = getById(id);
    // 不存在,返回错误,写入空值
    if(shop == null){
        stringRedisTemplate.opsForValue().set(key,null);
        return null;
    }
}

使用 JMeter 进行高并发压力测试。

基于逻辑过期方式解决缓存击穿问题

需求:修改根据 id 查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题。

原先的 POJO 对象(Shop)没有逻辑过期这个字段,但是我们又不能破坏 Shop 类的结构,此时我们可以创建一个 ShopDTO,专门用来数据传输的,在这个类里加上逻辑过期字段。还有一种更通用的方式,定义一个 RedisData,存储传输的对象和逻辑过期时间。

public class RedisData{
    private LocalDateTime expireTime;
    private Object data;
}

缓存工具封装

基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:

1️⃣将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间(针对普通 key 的)

2️⃣根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题(针对普通 key 的)

3️⃣将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓存击穿问题(针对热点 key 的)

4️⃣根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题(针对热点 key 的)

在更新缓存的时候,不知道数据库查询的具体操作,采用 lambda 由用户传入具体的操作。

@Slf4j
@Component
/*
 * 缓存雪崩,设置过期时间时加上一个随机数,避免大量 key 同时过期
 * 缓存穿透,查询的数据 Redis 和数据库中都没有,可以通过缓存 null 值来解决。(占用内存,数据不一致)
 * 缓存击穿,热点 key 失效。大量用户查询数据,导致大量用户尝试重建缓存,可以采用互斥锁的方式重建缓存;也可以采用逻辑过期的方式。
*/
public class CacheClient{
    private final StringRedisTemplate stringRedisTemplate;
    
    public CacheClient(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    public void set(String key, Object value, Long time, TimeUnit unit){
        // cn.hutool.json 工具类中的方法 toJsoonStr
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }
   
    // 设置逻辑过期时间
    public void setWithLogicalExpore(String key, Object value, Long time, TimeUnit unit){
        // cn.hutool.json 工具类中的方法 toJsonStr
        RedisData redisData = new RedisData(value, LocalDateTime.now().plusSeonds(unit.toSeconds(time)));
		stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }
    
    // 缓存穿透,通过缓存空值来解决。需要自己判断那些数据会发生缓存穿透问题,然后查询数据的时候使用该方法。(比如,常见的存入 redis 的数据都可以使用这个来进行查询)
    public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, 
                                          Function<ID,R> dbFallback, Long Time, TimeUnit unit){
        String key = keyPrefix + id;
        // 1. 查询缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2. 判断是否存在,存在直接返回
        if(StrUtil.isNotBlank(json)){
            // 3.数据不为空,说明存在,直接返回
            return JSONUtil.toBean(json, type);
        }
        // 4. 如果命中的是空值'',说明发生了缓存穿透,直接返回错误信息(如果发现数据库中没有会向 redis 中存入一个空值)
        if(json != null){
            return null;
        }
        // 5. 如果数据还未缓存到 Redis, 则查询数据库。
        R r = dbFallback.apply(id);
        
        // 6. 不存在,发生错误,并向 Redis 中写入空值
        if(r == null){
            stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;
        }
        // 7.存在,写入数据库
        this.set(key, r, time, unit);
        return r;
    }
    
    // 使用逻辑时间过期解决缓存击穿问题。对于根据业务预判成功热点数据,可以统一采用该方法进行数据的查询。
    public <ID,R> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, 
                                           Function<ID, R> dbFallBack, Long time, TimeUnit unit){
        String key = keyPrefix + id;
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2. 判断是否存在,不存在直接返回
        if(StrUtil.isBlank(json)){
            // 3.不存在,直接返回
            return null;
        }
        // 4.命中,把 json 反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5. 判断是否逻辑过期
        if(expireTime.isAfter(LocalDateTime.now())){
            // 未过期
            return r;
        }
        // 过期,缓存重建
        // 6. 获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        boolean getLock = tryLock(lockKey);
        if(getLock){
            // 成功拿到锁则开启线程重建缓存,这个缓存重建是异步的,所以返回的是 r 而非 r1.
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try{
                    // 查询数据
                    R r1 = dbFallBack.apply(id);
                    // 写入 redis
                    this.setWithLogicalExpore(key, r1, time, unit);
                }cache(Exception e){
                    throw new RuntimeException(e);
                }finally{
                    unlock(lockKey);
                }
            });
        }
        return r;
    }
}

优惠券秒杀

全局唯一ID

问题

每个店铺都可以发布优惠券。当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增 ID 就存在一些问题。

全局 ID 生成器

全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性。

为了增加 ID 的安全性,我们不直接使用 Redis 自增的数值,而是拼接一些其它信息。

graph
符号位&nbsp1bit
时间戳&nbsp31bit
序列化&nbsp32bit

ID 的组成部分

全局唯一 ID 生成策略

Redis 自增 ID 策略

案例:生成全局唯一 ID

public class RedisIdWorker{
    private static final int COUNT_BIT = 32;
    public long nextId(String keyPrefix){
        // 1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = new.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 2.生成序列号
        // 2.1 获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyy:MM:dd"));
        // 2.2 自增长(不会存在空指针的,如果key不存在会自动给你重建一个,然后+1返回1给你)
        Long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);

        // 拼接返回
         // 符号位 时间戳 序列化
        // 用位运算。timestamp 左移 32 位,然后在 | 上自增的位。count 的自增可以保证 id 的唯一性。
        return (timestamp<<COUNT_BIT)|count
    }
}

优惠券秒杀下单

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

VoucherController 中提供了一个接口用于添加秒杀优惠券:VoucherController#addSeckillVoucher

秒杀下单的接口

- 说明
请求方式 POST
请求路径 /voucher-order/seckill/{id}
请求参数 id,优惠券 id
返回值 订单 id

下单时需要判断两点:

注意:涉及到两张表的操作,因此需要加上事务。在添加事务的时候注意事务是否会失效。

超卖问题

问题

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁。

类型 说明
乐观锁 认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。如果没有修改则认为是安全的,
自己才更新数据。如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
悲观锁 认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如 Synchronized、Lock 都属于悲观锁

乐观锁

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:版本号法和 CAS。

悲观锁

加重量级锁,如果觉得并发粒度不够,可以采用分段锁的思想。

方案 优点 缺点
悲观锁,添加同步锁,让线程串行执行 简单粗暴 性能一般
乐观锁,不加锁,在更新时判断是否有其它线程在修改 性能好 存在成功率低的问题

乐观锁解决超卖问题

# 修改下 sql 语句, 失败率会非常高
update voucherxxx set stock = stock-1 where voucher_id = k and stock = queryStock;

# 再次修改 sql 语句
update voucherxxx set stock = stock-1 where voucher_id = k and stock > 0;

一人一单

问题

修改秒杀业务,要求同一个优惠券,一个用户只能下一单。可以直接用联合主键完成,也可以代码里写逻辑。

查询订单和扣减库存这块会出现并发问题,因为查询和创建不是原子性的。在加锁的时候需要注意锁的范围和事务的范围。同时,需要注意用什么充当锁。此处可以采用用户的 id 充当锁,因为我们的目的是一人一单,避免有人用脚本刷单,对单个用户的一人一单操作加锁即可。(涉及到了 String 的 intern 方法)

单价情况下一人一单

事务是由 Spring 进行管理的。在释放锁后,才会开始提交事务。函数执行完后,Spring 提交事务,此时锁以及被释放了,其他线程已经可以进来,而且事务尚未提交,其他用户查到的是未更新的数据,存在很大的问题。应该是提交事务后,再释放锁。

@Transactional
public Result createVoucherOrder(Long voucherId){
    Long userId = UserHolder.getUser().getId();
    // toString 返回的是一个 new String(xxx)
    synchronized(userId.toString().intern()){
        // 业务代码
    }
    // 会出现,锁已经释放了,但是事务还没有提交。数据没有刷新到数据库中,导致其他事务查询到的还是未更新的数据,导致数据不一致。
    // 为了避免这种问题,需要加大锁的范围
}

加大锁的范围,需要注意的是,Spring 的事务是通过代理对象实现的,直接调用 createVoucherOrder 是不会走代理对象的,所以我们需要改成用代理对象调用,这样就是走的代理方法了。

@Transactional
public Result createVoucherOrder(Long voucherId){
    Long userId = UserHolder.getUser().getId();
    // 业务代码,操作数据库。
}

// 加大锁的范围,让锁锁住事务 Spring 事务失效的几中情况
public boolean KK(){
    // some code
    Long userId = UserHolder.getUser().getId();
    // Spring 的事务是通过代理对象实现的,
    // 而 KK 方法上面没有加事务注解,因此 KK 方法不会走代理方法增强
    // 在 KK 方法中直接调用 createVoucherOrder 是不会走代理对象的
    // 所以我们需要改成用代理对象调用
    synchronized(userId.toString().intern()){
    	IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
        return proxy.createVoucherOrder(userId); // createVoucherOrder 是接口 IVoucherOrderService 中的方法
    }
}

// 在 SpringBoot 启动类上加上注解
@EnableAspectJAutoProxy(exposeProxy = true) // 暴露代理对象。
@MapperScan("xx.xxx.xxx")
@SpringBootApplication
public class XXApplication{
    // some code
}

2023-1-9 温习 Spring 的时候温习到了事务相关的内容,对上述内容又做了一次实验,确实是会导致事务失效的。测试代码如下。

@Service
public class UserService {
    @Autowired
    UserDao userDao;
	
    // 调用 testTX 方法,testAopContext 不会对事务进行回滚!
    public void testTX() {
        synchronized (this) {
            testAopContext();
        }
    }

    @Transactional
    public void testAopContext() {
        String update = "update tb_u set age = ? where id = ?";
        userDao.update(update, 30, 1);
        int i = 1 / 0;
        userDao.update(update, 30, 2);
    }
}

分布式下的一人一单

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。假定下面两个线程访问的是不同的服务器:

模拟集群模型:

1.将服务启动两份,端口分别为 8081 和 8082

2.修改 nginx 的 conf 目录下的 nginx.conf 文件,配置反向代理和负载均衡。

3.jmeter 并发访问,出现并发安全问题,并未实现一人一单。

分布式锁可以解决上述问题。当然也可以直接用数据库的联合主键解决一人一单的问题。对比下使用联合主键和分布式锁的性能差距。

分布式锁

分布式锁是满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种

MySQL Redis Zookeeper
互斥 利用 mysql 本身的互斥锁机制 利用 setnx 这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

基于Redis的分布式锁

在处理一人一单业务的时候,在分布式情况下需要使用分布式锁来确保一人一单。这里使用 Redis 定义一个分布式锁,再创建订单时先看能不能拿到锁,能拿到锁就创建订单,不能就返回错误。在代码中,我们为了实现分布式锁时定义了两个基本方法:

方法 说明
获取锁方法 ①互斥,确保只能有一个线程获取锁
setnx lock thread1,并通过 expire lock 10 添加锁过期时间,避免服务器宕机引起死锁
③非阻塞:尝试一次,成功返回 true,失败返回 false
释放锁方法 ①手动释放
②超时释放:获取锁时添加一个超时时间 del key
public interface ILock{
    // 尝试获取锁,timeoutsec 表示锁的持有时间,过期后自动释放。
    boolean tryLock(long timeoutsec){};
    void unlock();
}
public class SimpleRedisLock implements ILock{
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    
    private static final String KEY_PREFIX = "lock:";
    
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate){
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public boolean tryLock(long timeout){
        long threadId = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name, threadId+"", timeout, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }
    
    @Override
    public void unlock(){
        stringRedisTemplate.opsForValue().delete(KEY_PREFIX+name);
    }
}
// 使用锁的代码示例:
new SimpleRedisLock("order:"+userId, stringRedisTemplate);

上述的方案存在一个问题,如果两个线程争抢锁,线程 1 抢到了,但是由于执行业务的时间太长了,致使锁超时释放。此时线程 2 拿到了锁执行业务。在线程 2 执行业务的时候,线程 1 业务执行完毕了,释放了锁(释放了其他线程加的锁),线程 3 在线程 2 为完成业务,且锁未超时的情况下拿到了锁。

问题辨析:锁超时了,其他线程拿到了锁合理吗?如果是缓存重建这种情况,锁超时了还未成功重建缓存,其他线程在超时的情况下拿到锁是没什么大问题的。

为了解决这个问题,我们需要在释放锁之前判断一下,是不是自己加的锁,是自己加的锁才要释放。

改进Redis的分布式锁

需求:修改之前的分布式锁实现,满足:

// 修改分布式锁释放锁的代码
public void unlock(){
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取、判断、删除不是原子性的,会存在并发问题。
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    if(threadId.eqquals(id)){
        // 一致则释放锁
        stringRedisTemplate.opsForValue().delete(KEY_PREFIX+name);
    }
    // 不一致则不释放
}

但是这种方案仍然存在问题

判断锁标识和释放锁不是原子性的,会有并发问题。线程 1 判断锁标识,发现一致,在释放锁时发送了阻塞(如 GC)。在阻塞过程中,其他服务器的线程 2 获取到了锁,并开始执行业务。在线程 2 执行业务时,线程 1 把线程 2 的锁释放了。如果有其他线程也来抢锁,是可以拿到锁的。

Lua 脚本

前面的核心问题在与,判断锁和释放锁不是原子性的操作,而 Lua 脚本可以解决这种问题。

基本语法

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。Lua 是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html 这里重点介绍 Redis 提供的调用函数,语法如下:

# 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行 set name jack,则脚本是这样

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行 set name Rose,再执行 get name,则脚本如下

# 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用 Redis 命令来调用脚本,调用脚本的常见命令如下

例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下

如果脚本中的 key、value 不想写死,可以作为参数传递。key 类型参数会放入 KEYS 数组,其它参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组获取这些参数。

# keys 数组只有 1 个元素 name,arg 数组有一个元素 rose
eval "return redis.call('set', KEYS[1],ARGV[1])" 1 name rose

# keys 数组无元素(0),arg 数组有一个元素 arg
eval "return argv[1]" 0 arg1

Lua 改进分布式锁

释放锁的业务流程如下

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
  -- 一致,则删除锁
  return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

需求:基于 Lua 脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate 调用 Lua 脚本的 API 如下

/**
 script:脚本
 keys:对应 KEYG
 args:对应 ARGV
*/
public <T> execute(RedisScript<T> script, List<K> kyes, Object... args){
    return scriptExecutor.execute(script, keys, args);
}

假定,我们将 lua 脚本放在 resources 根目录下

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);
}

public static unlock(){
    stringRedisTemplate.execute(
        UNLOCK_SCRIPT,
        Collections.singletionList(KEY_PREFIX+name),
        ID_PREFIX+Thread.currentThread().getId()
    );
}

基于 Redis 的分布式锁实现思路:

特性:

基于 setnx 实现的分布式锁存在下面的问题:

Redisson

介绍&配置

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

官网地址: https://redisson.org GitHub 地址: https://github.com/redisson/redisson

为了避免 Redission 里的配置把 SpringBoot 里的覆盖了,这里就采用引入 redisson,自己配置 Redission 的方式。

引入依赖

<dependency>
	<groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

配置客户端

@Configuration
public class RedisConfig{
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址 
        config.useSingleServer()
            .setAddress("redis://192.168.1.101:6379")
            .setPassword("123");
        return Redission.create(config);
    }
}

使用 Redisson 分布式锁

@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
    // 获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    // 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
    // 判断释放获取成功
    if(isLock){
        try {
            System.out.println("执行业务");
        }finally {
            // 释放锁
            lock.unlock();
        }
    }
}

原理

可重入锁原理

与 Java 其他的可重入锁的原理很类似。内部的数据类型如下。

key value  
  field value
lock thread1 1

有人加锁时判断是是持有锁的线程,是则锁的重入次数(value 值)+1,不是则加锁失败。释放锁也是类似的,value – 后为 0 就释放锁。

释放锁的 lua 脚本

local key = KEYS[1]; 
-- 锁的key
local threadId = ARGV[1]; 
-- 线程唯一标识
local releaseTime = ARGV[2]; 
-- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
    return nil; 
    -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0 
if (count > 0) then
    -- 大于0说明不能释放锁,重置有效期然后返回
    redis.call('EXPIRE', key, releaseTime);
    return nil;
else  -- 等于0说明可以释放锁,直接删除
    redis.call('DEL', key);
    return nil;
end;

源码位于 RedissonLock。如果有人释放锁了会发布一个消息,而那些等待锁的会订阅这个消息。如果在指定时间还没获取到锁就取消订阅。如果在指定时间内发现有人释放了锁,则尝试获取锁。这样不断的尝试。巧妙的运用了发布订阅模式,避免无休止的盲等。

Redisson 分布式锁原理

主从一致问题

Redisson 分布式锁主从一致问题。如果是单节点的 Redis 发送了故障,会导致所有依赖于 Redis 的业务都出现问题。一般都会搭建主从节点。主节点负责写操作,从节点负责读操作。锁数据同步需要时间。假定在主节点要同步锁数据的时候,主节点宕机了,所有的从节点都没有同步到锁数据,访问新的主节点时

graph LR
subgraph 设置锁,主从节点正常
Java应用-->|获取锁,set&nbsplock&nbspthread1&nbspnx&nbspex&nbsp10|Redis&nbspMaster-->|主从同步|Redis&nbspSlave1
Redis&nbspMaster-->|主从同步|Redis&nbspSlave2
end
graph LR
subgraph 设置锁,主节点宕机
Java应用-->|获取锁,set&nbsplock&nbspthread1&nbspnx&nbspex&nbsp10|Redis&nbspMaster宕机
Java应用-->|获取锁,set&nbsplock&nbspthread1&nbspnx&nbspex&nbsp10|Redis&nbspSlave2升级为主节点
Redis&nbspSlave2升级为主节点-->|主从同步|Redis&nbspSlave1
end

这样,A 线程设置好了锁,其他线程尝试获取锁时,因为原先的主节点宕机了,而且锁没有同步过去,其他线程向新的主节点设置锁时可以设置成功。这样就不是一个线程持有锁了,而是两个了。Redission 的 multiLock 可以避免这种问题,只要 Redis 集群设置成这种模式,每个节点都可以读写。

graph LR
subgraph 设置锁,主从节点正常
Java应用-->|获取锁,set&nbsplock&nbspthread1&nbspnx&nbspex&nbsp10|Redis&nbspNode1,lock=thread1
Java应用-->|获取锁,set&nbsplock&nbspthread1&nbspnx&nbspex&nbsp10|Redis&nbspNode2,lock=thread1
Java应用-->|获取锁,set&nbsplock&nbspthread1&nbspnx&nbspex&nbsp10|Redis&nbspNode3,lock=thread1
end

如果觉得这样的可用性不高,那么可以为这些节点再设置从节点。

类型 原理 缺陷
不可重入 Redis 分布式锁 利用 setnx 的互斥性;利用 ex 避免死锁;释放锁时判断线程标示 不可重入、无法重试、锁超时失效
可重入的 Redis 分布式锁 利用 hash 结构,记录线程标示和重入次数;利用 watchDog 延续锁时间;利用信号量控制锁重试等待 redis 宕机引起锁失效问题
Redisson 的 multiLock 多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功 运维成本高、实现复杂

Redis优化秒杀

Redis 中预先缓存库存,下单前先查库存,库存够才允许下单。下单前再在 Redis 中做订单校验判断是否下单过。(有个业务问题,校验成功后 Redis 直接预减库存,生成订单信息加入 MQ。MQ 的消费端再慢慢从 MQ 中取出数据进行消费,修改数据库中的存货。如果用户取消订单,则 Redis 中库存 +1)

原始架构
Redis优化后

key 的设计 stock:vid:7,用 String 即可。一人一单则采用 zset 中存储用户 id,确保唯一,key 的设计为 order:vid:7

改进秒杀业务,提高并发性能

秒杀优化思路

消息队列实现异步秒杀

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括 3 个角色。

flowchart LR
生产者-->1(Message Queue)-->消费者

使用消息队列完成异步秒杀

生产者:判断秒杀时间和库存;校验一人一单;都通过后发送优惠券 id 和用户 id 到消息队列。

消费者:接受消息后完成下单。

Redis 提供了三种不同的方式来实现消息队列

基于List的消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而 Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP 来实现。不过要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会返回null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。

基于 List 的消息队列有哪些优缺点?

优点:利用 Redis 存储,不受限于 JVM 内存上限;基于 Redis 的持久化机制,数据安全性有保证;可以满足消息有序性。

缺点:无法避免消息丢失;只支持单消费者。

基于PubSub的消息队列

PubSub(发布订阅)是 Redis2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

flowchart LR
生产者-->|publish order.queue msg1|1(Message Queue)-->|subscribe order.queue|消费者1
1(Message Queue)-->|subscribe order.queue|消费者2

基于 PubSub 的消息队列

优点:采用发布订阅模型,支持多生产、多消费

缺点:不支持数据持久化;无法避免消息丢失;消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令

127.0.0.1:6379> help xadd

  XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
  summary: Appends a new entry to a stream
  since: 5.0.0
  group: stream
# 创建名为 users 的队列,并向其中发送一个消息,内容是 {name=jack, age=21}, 并且使用 Redis 自动生成 id
127.0.0.1:6379> xadd users * name jack age 21
"1674809034254-0"

读取消息的方式之一:XREAD

127.0.0.1:6379> help xread

  XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  summary: Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.
  since: 5.0.0
  group: stream

例如,使用 XREAD 读取第一个消息

127.0.0.1:6379> xread count 1 streams users 0
1) 1) "users"
   2) 1) 1) "1674809034254-0"
         2) 1) "name"
            2) "jack"
            3) "age"
            4) "21"

XREAD 阻塞方式,读取最新的消息 ($ 表示读取最新的消息)

127.0.0.1:6379> xread count 1 block 1000 streams users $
(nil)
(1.02s)

在业务开发中,我们可以循环的调用 XREAD 阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下。

while(true){
    //尝试读取队列中的消息,最多阻塞2秒
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    if(msg == null){
        continue;
    }
    //处理消息
    handleMessage(msg);
}

当我们指定起始 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

Stream 消息队列的 XREAD 命令特点

基于Stream的消息队列-消费者组

消费者组(Consumer Group),将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

创建消费者组

XGROUP CREATE key groupName ID [MKSTREAM]

其他常见命令

# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

消费者监听消息的基本思路

while(true){
    //尝试监听队列, 使用阻塞模式,最长等待 2000 毫秒
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >"); 
    if(msg == null){ // null 说明没有消息,继续下一次
        continue;
    }
    try {
        //外理消息,完成后一定要 ACK
        handleMessage(msg);
    } catch(Exception e){
        while(true){
            Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
            if(msg == null){ // null 说明没有异常消息,所有消息确认,结束循环
                break;
            }
            try {
                //说明有异常消息,再次处理
                handleMessage(msg);
            } catch(Fxception H){
                // 再次出现异常,记录日志,继续循环
                continue; 
            }
        }
    }
}

Stream 消息队列 XREADGROUP 命令特点

Redis 消息队列总结

- List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

案例练习

基于 Redis 的 Stream 结构作为消息队列,实现异步秒杀下单

  1. 创建一个 Stream 类型的消息队列,名为 stream.orders
  2. 修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId
  3. 项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单

达人探店

这部分主要练习业务。

发布笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个。

DROP TABLE IF EXISTS `tb_blog`;
CREATE TABLE `tb_blog`  (
  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  `shop_id` bigint(20) NOT NULL COMMENT '商户id',
  `user_id` bigint(20) UNSIGNED NOT NULL COMMENT '用户id',
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '标题',
  `images` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '探店的照片,最多9张,多张以\",\"隔开',
  `content` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '探店的文字描述',
  `liked` int(8) UNSIGNED NULL DEFAULT 0 COMMENT '点赞数量',
  `comments` int(8) UNSIGNED NULL DEFAULT NULL COMMENT '评论数量',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 23 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

需要注意部分是文件上传的设置。文件上传的路径要设置好,此处需要设置为 Nginx 目录

public class SystemConstants{
    // 设置为 nginx 的目录
    public static final String IMAGE_UPLOAD_DIR = "D:\\xx\\nginx-1.18.0\\html\\xx\imgs\\";
    public static final String USER_NICK_NAME_PREFIX = "user_";
    public static final int DEFAULT_PAGE_SIZE = 5;
    public static final int MAX_PAGE_SIZE = 10;
}

点赞

需求如下:

实现步骤:

点赞排行榜

点赞排行榜/游戏排行榜 etc…

需求:按照点赞时间先后排序,返回 Top5 的用户

  List Set SortedSet
排序方式 按添加顺序排序 无法排序 根据 score 值排序
唯一性 不唯一 唯一 唯一
查找方式 按索引查找或首尾查找 根据元素查找 根据元素查找

好友关注

关注和取关

关注是 User 之间的关系,是博主与粉丝的关系,在数据库中可以用一张 tb_follow 表来标示

DROP TABLE IF EXISTS `tb_follow`;
CREATE TABLE `tb_follow`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` bigint(20) UNSIGNED NOT NULL COMMENT '用户id',
  `follow_user_id` bigint(20) UNSIGNED NOT NULL COMMENT '关联的用户id',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

共同关注

利用 Redis 中恰当的数据结构(如 Set 集合),实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友。求 Redis 中两个 set 的交集即可。

关注推送

关注推送也叫做 Feed 流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。传统模式是用户寻找内容,而 Feed 模式是内容匹配用户。

graph LR
用户-->|寻找|内容
graph LR
内容-->|匹配|用户

Feed流的模式

Redis实现feed流 - 腾讯云开发者社区-腾讯云 (tencent.com)

Feed 流产品有两种常见模式

本例中的个人页面,是基于关注的好友来做 Feed 流,因此采用 Timeline 的模式。该模式的实现方案有三种:

这么一看,微博的功能实现起来确实复杂。

  拉模式 推模式 推拉结合
写比例
读比例
用户读取延迟
实现难度 复杂 简单 很复杂
使用场景 很少使用 用户量少、没有大 V(用户量在千万以下) 过千万的用户量,有大 V

案例

基于推模式实现关注推送功能

①修改探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱

②收件箱满足可以根据时间戳排序(List、Sorted),必须用 Redis 的数据结构实现,可以采用 Sorted。

③查询收件箱数据时,可以实现分页查询

Feed 流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。可以采用滚动分页的模式:每次记录当前查询的最后一条记录,下一页就从最后一条记录开始查询。

根据上面的描述可梳理好逻辑

部分代码

/**
保存笔记
查询笔记作者所有粉丝
推送笔记 id 给粉丝
返回 id
主要是推送的代码
*/
for(Follow follow : follows){
    Long userId = follow.getUserId();
    String key = "feed:"+userId;
    stringRedisTemplate
        .opsForZSet()
        .add(key,blog.getId().toString(),System.currentTimeMillis());
}

拉取代码逻辑

/**
zreverangebyscore z1 5 0 withscores limit 1 3
滚动分页查询参数
max:当前时间戳 | 上一次查询的最小时间戳
min:0
offset:0 | 在上一次的结果中,与最小值一样的元素个数
count:3
*/
public Result queryBlogOfFollow(Long max,Integer offset){
    Long userId = xxx;
    String key = "feed:"+userId;
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
        .reverseRangeByScoreWithScores(key,0,max,offset,2);
    //...
    // 解析数据,注意集合中的最后一个才是最小时间
    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime = 0;
    int os = 1;
    for(ZSetOperations.TypedTule<String> tuple : typedTuples){
        ids.add(Long.valueOf(tuple.getValue()));
        long time = tuple.getScore().longValue();
        // 统计下时间戳最小的相同的个数,作为下一次查询的 offset,然后返回给前端
        // 下次前端请求数据的时候,申请 offset。
        if(time == minTime){
            os++;
        }else{
            minTime = time;
            os = 1;
        }
    }
    // 根据 id 查询 blogs,ids 是支持存储的
    // 实战篇-10 19:51
    // 返回数据
}

附件商户

GEO数据结构

GEO 就是 Geolocation 的简写形式,代表地理坐标。Redis 在 3.2 版本中加入了对 GEO 的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:

命令 说明
GEOADD 添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST 计算指定的两个点之间的距离并返回
GEOHASH 将指定 member 的坐标转为 hash 字符串形式并返回
GEOPOS 返回指定 member 的坐标
GEORADIUS 指定圆心、半径,找到该圆内包含的所有 member,并按照与圆心之间的距离排序后返回。6.2 以后已废弃
GEOSEARCH 在指定范围内搜索 member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2. 新功能
GEOSEARCHSTORE 与 GEOSEARCH 功能一致,不过可以把结果存储到一个指定的 key。 6.2. 新功能

练习

添加下面几条数据:

计算北京西站到北京站的距离

搜索天安门( 116.397904 39.909005 )附近 10km 内的所有火车站,并按照距离升序排序

# 添加数据
geoadd g1 116.378248 39.865275 bjn 116.42803 39.903738  116.322287 39.893729 bjz bjx

# 计算距离,默认是米,后面接 km 的话就是千米
geodist g1 bjn bjx

# xx key 搜索方式 经纬度 根据半径来搜 10 km 带上距离
geosearch g1 fromlonlat 116.397904 39.909005 byradius 10 km withdist # 默认是升序的

附件商户搜索

假定:在首页中点击某个频道,即可看到频道下的商户。

我们可以按照商户类型做分组,类型相同的商户作为同一组,以 typeId 为 key 存入同一个 GEO 集合中即可。

key value score
shop:geo:food xxx xxx
  xxx xxx
shop:geo:tax qqq www

SpringDataRedis 的 2.3.9 版本并不支持 Redis 6.2 提供的 GEOSEARCH 命令,因此我们需要提示其版本,修改自己的 POM 文件,内容如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
        </exclusion>
        <exclusion>
            <artifactId>lettuce-core</artifactId>
            <groupId>io.lettuce</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>2.6.2</version>
</dependency>
<dependency>
    <artifactId>lettuce-core</artifactId>
    <groupId>io.lettuce</groupId>
    <version>6.1.6.RELEASE</version>
</dependency>

用户签到

BitMap用法

假如我们用一张表来存储用户签到信息,其结构应该如下。

DROP TABLE IF EXISTS `tb_sign`;
CREATE TABLE `tb_sign`  (
  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` bigint(20) UNSIGNED NOT NULL COMMENT '用户id',
  `year` year NOT NULL COMMENT '签到的年',
  `month` tinyint(2) NOT NULL COMMENT '签到的月',
  `date` date NOT NULL COMMENT '签到的日期',
  `is_backup` tinyint(1) UNSIGNED NULL DEFAULT NULL COMMENT '是否补签',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

假如有 1000 万用户,平均每人每年签到次数为 10 次,则这张表一年的数据量为 1 亿条

每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共 22 字节的内存,一个月则最多需要 600 多字节

我们按月来统计用户签到信息,签到记录为 1,未签到则记录为 0

把每一个 bit 位对应当月的每一天,形成了映射关系。用 0 和 1 标示业务状态,这种思路就称为位图(BitMap)

Redis 中是利用 string 类型数据结构实现 BitMap,因此最大上限是 512M,转换为 bit 则是 $2^{32}$个 bit 位

BitMap 的操作命令 说明
SETBIT 向指定位置(offset)存入一个 0 或 1
GETBIT 获取指定位置(offset)的 bit 值
BITCOUNT 统计 BitMap 中值为 1 的 bit 位的数量
BITFIELD 操作(查询、修改、自增)BitMap 中 bit 数组中的指定位置(offset)的值
BITFIELD_RO 获取 BitMap 中 bit 数组,并以十进制形式返回
BITOP 将多个 BitMap 的结果做位运算(与 、或、异或)
BITPOS 查找 bit 数组中指定范围内第一个 0 或 1 出现的位置

签到功能

需求:实现签到接口,将当前用户当天签到信息保存到 Redis 中。因为 BitMap 底层是基于 String 数据结构,因此其操作也都封装在字符串相关操作中了。

签到统计

什么叫连续签到天数?

从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。我们可以用 redis 的 bitmap 来实现连续签到功能。

需求:用 redis 命令实现一个月签到的功能。

 # 初始化签到数(设置一个数据,所有bit位初始化为0)
 setbit count 0 0
 
 # 签到(第一天就将offset=0的bit位设置为1)
 setbit count 0 1
 
 # 查看你第一天(offset=0)是否签到
 getbit count 0
 
 # 统计签到的总天数
 bitcount count
 
 # 统计指定范围内的签到天数(0字节到~0字节的签到天数,就是统计了8个bit位)
 bitcount count 0 0

如何得到本月到今天为止的所有签到数据?

BITFIELD key GET u[dayOfMonth] 0

如何从后向前遍历每个 bit 位?

与 1 做与运算,就能得到最后一个 bit 位。随后右移 1 位,下一个 bit 位就成为了最后一个 bit 位。

UV统计

HyperLogLog用法

首先我们搞懂两个概念:

UV 统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到 Redis 中,数据量会非常恐怖。

Hyperloglog(HLL) 是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理可以参考:https://juejin.cn/post/6844903785744056333#heading-0

Redis 中的 HLL 是基于 string 结构实现的,单个 HLL 的内存永远小于 16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于 0.81% 的误差。不过对于 UV 统计来说,这完全可以忽略。

127.0.0.1:6379> help pfadd

  PFADD key element [element ...]
  summary: Adds the specified elements to the specified HyperLogLog.
  since: 2.8.9
  group: hyperloglog

127.0.0.1:6379> help pfcount

  PFCOUNT key [key ...]
  summary: Return the approximated cardinality of the set(s) observed by the HyperLogLog at key(s).
  since: 2.8.9
  group: hyperloglog

127.0.0.1:6379> help pfmerge

  PFMERGE destkey sourcekey [sourcekey ...]
  summary: Merge N different HyperLogLogs into a single one.
  since: 2.8.9
  group: hyperloglog

实现UV统计

我们直接利用单元测试,向 HyperLogLog 中添加 100 万条数据,看看内存占用和统计效果如何。

@Test
void testHyperLogLog() {
    // 准备数组,装用户数据
    String[] users = new String[1000];
    // 数组角标
    int index = 0;
    for (int i = 1; i <= 1000000; i++) {
        // 赋值
        users[index++] = "user_" + i;
        // 每1000条发送一次
        if (i % 1000 == 0) {
            index = 0;
            stringRedisTemplate.opsForHyperLogLog().add("hll1", users);
        }
    }
    // 统计数量
    Long size = stringRedisTemplate.opsForHyperLogLog().size("hll1");
    System.out.println("size = " + size); // 997593
}

HyperLogLog 的作用

Pipeline 导入数据

如果要导入大量数据到 Redis 中,可以有多种方式:

分布式缓存

单点 Redis 的问题

水平扩展和垂直扩展

Redis的持久化

RDB持久化

RDB 全称 Redis Database Backup file(Redis 数据备份文件),也被叫做 Redis 数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当 Redis 实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为 RDB 文件,默认是保存在当前运行目录。

执行时机

RDB 持久化在四种情况下会执行:

1)save 命令

执行下面的命令,可以立即执行一次 RDB:

127.0.0.1:6379> save # 由 Redis 主进程来执行 RDB, 会阻塞所有命令
OK

save 命令会导致主进程执行 RDB,这个过程中其它所有命令都会被阻塞。适合用在 Redis 即将停止时,比如在数据迁移时可能用到。

2)bgsave 命令

下面的命令可以异步执行 RDB:

127.0.0.1:6379> bgsave	# 开启子进程执行 RDB, 避免主进程受到影响
Background saving started

这个命令执行后会开启独立进程完成 RDB,主进程可以持续处理用户请求,不受影响。

3)停机时

Redis 停机时会执行一次 save 命令,实现 RDB 持久化。

4)触发 RDB 条件

Redis 内部有触发 RDB 的机制,可以在 redis.conf 文件中找到,格式如下:

# 900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB
save 900 1  
save 300 10  
save 60 10000 

RDB 的其它配置也可以在 redis.conf 文件中设置:

# 是否压缩 ,建议不开启,压缩也会消耗 cpu,磁盘的话不值钱
rdbcompression yes

# RDB文件名称
dbfilename dump.rdb  

# 文件保存的路径目录
dir ./ 

RDB 的频率不要太高,频率太高会一直处于写入数据的状态,影响性能,一般用默认的就好。

RDB原理

bgsave 开始时会 fork 主进程得到子进程,子进程共享主进程的内存数据。完成 fork 后读取内存数据并写入 RDB 文件。注意:fork 这个操作过程要拷贝页表是阻塞的。

fork 采用的是 copy-on-write 技术:

Linux 中,所有的进程都没办法直接操作物理内存而是由操作系统给每个进程分配一个虚拟内存,主进程操作虚拟内存,操作系统维护一个虚拟内存与物理内存直接的映射关系(页表)。fork 主进程实际上是 fork 页表(页表中保存了物理内存与虚拟内存的映射关系)的过程,让子进程和主进程拥有一样的映射关系。这样就实现了子进程和主进程一样的内存共享。这样就无需拷贝内存中的数据,直接实现数据共享。

但这样会有一个问题,就是一个读一个写,会有并发问题。如果子进程在拷贝数据的时候,主进程还在写怎么办?fork 底层会采用 copy-on-write 的技术。源数据只读,如果需要修改就复制一份数据,在复制的数据中进行修改,如果每次修改要复制的数据很大,那么这样的开销是很大的。JVM 垃圾回收算法的标记如果标记在对象上,修改对象的 GC 标记的频繁复制对象对内存的压力特别大,具体可看《垃圾回收的算法与实现》与写时复制技术不兼容。JVM 笔记#标记清除(后面好像是等持久化结束后,在写入源数据。MySQL 也有一个类似的操作,查下 MySQL 的笔记)

小结

RDB 方式 bgsave 的基本流程?

RDB 会在什么时候执行?save 60 1000 代表什么含义?

RDB 的缺点?

AOF持久化

AOF 全称为 Append Only File(追加文件)。Redis 处理的每一个写命令都会记录在 AOF 文件,可以看做是命令日志文件。

AOF配置

AOF 默认是关闭的,需要修改 redis.conf 配置文件来开启 AOF

# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"

AOF 的命令记录的频率也可以通过 redis.conf 文件来配

# 表示每执行一次写命令,立即记录到AOF文件,Redis 主进程完成磁盘写入操作。
appendfsync always 
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案,子进程完成磁盘写入操作
appendfsync everysec 
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no

用配置文件的方式启动 Redis

sudo ./redis-server ../redis.conf

向 Redis 中写入数据可以发现,每次执行的命令都被写入到了 aof 文件中

cat appendonly.aof
*2
$6
SELECT
$1
0
*3
$3
set
$4
name
$3
ljw
*3
$3
set
$4
name
$3
kkx

三种策略对比

配置项 刷盘时机 优点 缺点
Always 同步刷盘 可靠性高,几乎不丢数据 性能影响大
everysec 每秒刷盘 性能适中 最多丢失 1 秒数据
no 操作系统控制 性能最好 可靠性较差,可能丢失大量数据

AOF文件重写

因为是记录命令,AOF 文件会比 RDB 文件大的多。而且 AOF 会记录对同一个 key 的多次写操作,但只有最后一次写操作才有意义。通过执行 bgrewriteaof 命令,可以让 AOF 文件执行重写功能,用最少的命令达到相同效果。

如图,AOF 原本有三个命令,但是 set num 123 和 set num 666 都是对 num 的操作,第二次会覆盖第一次的值,因此第一个命令记录下来没有意义。

所以重写命令后,AOF 文件内容就是:mset name jack num 666。实际测试,AOF 文件重写后内容被压缩了,命令也被简化了~

Redis 也会在触发阈值时自动去重写 AOF 文件。阈值也可以在 redis.conf 中配置:

# AOF文件比上次文件增长超过 100%(翻了一倍)则触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积超过 64mb 就触发重写 
auto-aof-rewrite-min-size 64mb 

混合RDB和AOF

Redis 4.0 中提出了一个混合使用 AOF 日志和内存快照的方法。内存快照以一定的频率执行,在两次快照之间,使用 AOF 日志记录这期间的所有命令操作。这样,不用频繁执行快照,避免了频繁 fork 对主线程的影响。且,AOF 日志也只用记录两次快照间的操作,无需记录所有操作了,不会出现文件过大的情况,也可以避免重写开销。如下图所示,T1 和 T2 时刻的修改,用 AOF 日志记录,等到第二次做全量快照时,就可以清空 AOF 日志,因为此时的修改都已经记录到快照中了,恢复时就不再用日志了。

RDB与AOF对比

RDB 和 AOF 各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用。

- RDB AOF
持久化方式 定时对整个内存做快照 记录每一次执行的命令
数据完整性 不完整,两次备份之间会丢失 相对完整,取决于刷盘策略
文件大小 会有压缩,文件体积小 记录命令,文件体积大
宕机恢复速度 很快
数据恢复优先级 低,因为数据完整性不如 AOF 高,数据完整性高
系统资源占用 高,大量的 CPU 和内存消耗 低,主要是磁盘 IO 资源,但是 AOF 重写时会占用大量 CPU 和内存资源
使用场景 可以容忍数分钟的数据丢失,追求更快的启动速度 对数据安全性要求较高场景

Redis主从

搭建主从架构

单节点 Redis 的并发能力是有上限的,要进一步提高 Redis 的并发能力,就需要搭建主从集群,实现读写分离。

多个从结点承担读的请求,Redis 读取数据的能力可以得到极大的提升。

将 redis.conf 文件复制多份,分别启动多个 Redis。此时这样启动的还只是一个一个孤立的 Redis。

开启主从关系

要配置主从关系可以使用 replicaof 或 slaveof(5.0 以前的命令)

有临时和永久两种模式

此处采用临时模式配置主从关系。

1️⃣用不同配置文件开启多个 redis 服务;

# 在 6370 端口开启一个 redis 实例
sudo ./bin/redis-server redis.conf
# 连接 6739 端口的 redis 实例
./bin/redis-cli -p 6379

# 在 7000 端口开启一个 redis 实例
sudo ./bin/redis-server 7000.conf
# 连接 7000 端口的 redis 实例
./bin/redis-cli -p 7000

2️⃣配置主从节点(只要指定从节点即可)

# 在 7000 redis-cli 中执行命令,将 7000 端口的 redis 指定为
# 6379 端口的从节点。
127.0.0.1:7000> SLAVEOF  172.26.26.72 6379
OK

3️⃣查看从节点的 key,然后在主节点中写入一些数据,看是否会同步到从节点中

# 主节点和从节点此时数据都为空
127.0.0.1:6379> keys *
(empty array)

127.0.0.1:7000> keys *
(empty array)

# 主节点中写数据
127.0.0.1:6379> set master somevalue
OK

# 从节点同步到了主节点的数据
127.0.0.1:7000> keys *
1) "master"

4️⃣查看集群状态信息 info replication

127.0.0.1:7000> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:3
master_sync_in_progress:0
slave_repl_offset:423
slave_priority:100
slave_read_only:1
connected_slaves:0
master_failover_state:no-failover
master_replid:b6811140f497c5bbe6f0ddd27c46b39a091fb7ac
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:423
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:423

主从同步原理

全量同步

主从第一次建立连接时,会执行全量同步,将 master 节点的所有数据都拷贝给 slave 节点,流程:

这里有一个问题,master 如何得知 salve 是第一次来连接呢??

有几个概念,可以作为判断依据:

因此 slave 做数据同步,必须向 master 声明自己的 replication id 和 offset,master 才可以判断到底需要同步哪些数据。

因为 slave 原本也是一个 master,有自己的 replid 和 offset,当第一次变成 slave,与 master 建立连接时,发送的 replid 和 offset 是自己的 replid 和 offset。

master 判断发现 slave 发送来的 replid 与自己的不一致,说明这是一个全新的 slave,就知道要做全量同步了。

master 会将自己的 replid 和 offset 都发送给这个 slave,slave 保存这些信息。以后 slave 的 replid 就与 master 一致了。

因此,master 判断一个节点是否是第一次同步的依据,就是看 replid 是否一致

完整流程描述

增量同步

全量同步需要先做 RDB,然后将 RDB 文件通过网络传输个 slave,成本太高了。因此除了第一次做全量同步,其它大多数时候 slave 与 master 都是做增量同步

增量同步就是只更新 slave 与 master 存在差异的部分数据。如图:

那么 master 怎么知道 slave 与自己的数据差异在哪里呢?简单来说是根据 master 和 slave 的 offset 的差值来判断的,如果 master 和 slave 的 offset 不一样,则说明主从需要进行同步。如果 master 的 offset 覆盖了未同步的数据,就得进行全增量同步了。具体原理请看 “repl_backlog 原理”

repl_backlog原理

master 怎么知道 slave 与自己的数据差异在哪里呢?这就要靠全量同步时的 repl_baklog 文件了。

这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从 0 开始读写,这样数组头部的数据就会被覆盖。

repl_baklog 中会记录 Redis 处理过的命令日志及 offset,包括 master 当前的 offset,和 slave 已经拷贝到的 offset(红色部分是尚未同步的内容)。

slave 与 master 的 offset 之间的差异,就是 slave 需要增量拷贝的数据了。随着不断有数据写入,master 的 offset 逐渐变大, slave 也不断的拷贝,追赶 master 的 offset。

直到数组被填满。

此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到 slave 的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分。

但是,如果 slave 出现网络阻塞,导致 master 的 offset 远远超过了 slave 的 offset。

如果 master 继续写入新数据,其 offset 就会覆盖旧的数据,直到将 slave 现在的 offset 也覆盖。

棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果 slave 恢复,需要同步,却发现自己的 offset 都没有了,无法完成增量同步了。只能做全量同步。

repl_baklog 大小有上限,写满后会覆盖最早的数据。如果 slave 断开时间过久,导致尚未备份的数据被覆盖,则无法基于 log 做增量同步,智能再次全量同步。

主从同步优化

主从同步可以保证主从数据的一致性,非常重要。可以从以下几个方面来优化 Redis 主从集群(如尽可能的避免全量同步,少做磁盘 IO)。

上面两个都是在提高全量同步的性能,下面两点是从减少全量同步出发的。

小结

简述全量同步和增量同步区别?

什么时候执行全量同步?

什么时候执行增量同步?

实际使用是全量同步+增量同步一起使用。

Redis哨兵

slave 节点宕机恢复后可以找 master 节点同步数据,那 master 节点宕机该如何处理?

Redis 提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。而哨兵是用于监控整个集群做故障恢复的。

哨兵原理

集群的结构和作用

哨兵的作用如下:

服务状态监控

Sentinel 基于心跳机制监测服务状态,每隔 1 秒向集群的每个实例发送 ping 命令:

故障恢复原理

一旦发现 master 故障,sentinel 需要在 slave 中选择一个作为新的 master,选择依据是这样的:

当选出一个新的 master 后,该如何实现切换呢?流程如下:

小结

Sentinel 的三个作用是什么?

Sentinel 如何判断一个 redis 实例是否健康?

故障转移步骤有哪些?

搭建哨兵集群

集群结构

先按之前的方式搭建好主从集群,然后继续搭建一个三节点形成的 Sentinel 集群,来监管之前的 Redis 主从集群。

三个 sentinel 实例信息如下

节点 IP PORT
s1 192.168.150.101 27001
s2 192.168.150.101 27002
s3 192.168.150.101 27003

准备实例和配置

要在同一台虚拟机开启 3 个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

我们创建三个文件夹,名字分别叫 s1、s2、s3:

# 进入/tmp目录
cd /tmp
# 创建目录
mkdir s1 s2 s3

然后我们在 s1 目录创建一个 sentinel.conf 文件,添加下面的内容:

port 27001
sentinel announce-ip 172.26.26.72
sentinel monitor mymaster 172.26.26.72 27001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"

配置说明

然后将 s1/sentinel.conf 文件拷贝到 s2、s3 两个目录中(在 /tmp 目录执行下列命令)

# 方式一:逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二:管道组合命令,一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf

修改 s2、s3 两个文件夹内的配置文件,将端口分别修改为 27002、27003

sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf

启动

为了方便查看日志,我们打开 3 个 ssh 窗口,分别启动 3 个 redis 实例,启动命令

# 第1个
redis-sentinel s1/sentinel.conf
# 第2个
redis-sentinel s2/sentinel.conf
# 第3个
redis-sentinel s3/sentinel.conf

测试

如果哨兵选举的时候一直失败,可能是没开发所有主从服务器和哨兵节点的端口 systemctl stop firewalld.service,如果是 ubuntu 系统则执行 sudo ufw disable 关闭防火墙。

尝试让 master 节点 7001 宕机,查看 sentinel 日志:

查看 7003 的日志:

查看 7002 的日志:

RedisTemplate

在 Sentinel 集群监管下的 Redis 主从集群,其节点会因为自动故障转移而发生变化,Redis 的客户端必须感知这种变化,及时更新连接信息。Spring 的 RedisTemplate 底层利用 lettuce 实现了节点的感知和自动切换。

导入Demo工程

redis-demo 这个文件夹

引入依赖

在项目的 pom 文件中引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置Redis地址

在配置文件 application.yml 中指定 redis 的 sentinel 相关信息:

spring:
  redis:
    sentinel:
      master: mymaster
      nodes:
        - 192.168.150.101:27001
        - 192.168.150.101:27002
        - 192.168.150.101:27003

配置读写分离

在项目的启动类中,添加一个新的 bean:

@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
    return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

这个 bean 中配置的就是读写策略,包括四种:

Redis分片集群

搭建分片集群

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决

使用分片集群可以解决上述问题,如图

分片集群特征

集群结构

片集群需要的节点数量较多,这里我们搭建一个最小的分片集群,包含 3 个 master 节点,每个 master 包含一个 slave 节点,结构如下

这里我们会在同一台虚拟机中开启 6 个 redis 实例,模拟分片集群,信息如下

IP PORT 角色
192.168.150.101 7001 master
192.168.150.101 7002 master
192.168.150.101 7003 master
192.168.150.101 8001 slave
192.168.150.101 8002 slave
192.168.150.101 8003 slave

准备实例和配置

删除之前的 7001、7002、7003 这几个目录,重新创建出 7001、7002、7003、8001、8002、8003 目录:

# 进入/tmp目录
cd /tmp
# 删除旧的,避免配置干扰
rm -rf 7001 7002 7003
# 创建目录
mkdir 7001 7002 7003 8001 8002 8003

在 /tmp 下准备一个新的 redis.conf 文件,内容如下:

port 6379
# 开启集群功能
cluster-enabled yes
# 集群的配置文件名称,不需要我们创建,由redis自己维护
cluster-config-file /tmp/6379/nodes.conf
# 节点心跳失败的超时时间
cluster-node-timeout 5000
# 持久化文件存放目录
dir /tmp/6379
# 绑定地址
bind 0.0.0.0
# 让redis后台运行
daemonize yes
# 注册的实例ip
replica-announce-ip 192.168.150.101
# 保护模式
protected-mode no
# 数据库数量
databases 1
# 日志
logfile /tmp/6379/run.log

将这个文件拷贝到每个目录下:

# 进入/tmp目录
cd /tmp
# 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf

修改每个目录下的 redis.conf,将其中的 6379 修改为与所在目录一致:

# 进入/tmp目录
cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf

启动

因为已经配置了后台启动模式,所以可以直接启动服务:

# 进入/tmp目录
cd /tmp
# 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf

通过 ps 查看状态:

ps -ef | grep redis

发现服务都已经正常启动:

[ root@localhost tmp]# ps -ef | grep redis
root	2362	1	0 09:21 ?	00:00:00 redis-server 0.0.0.0: 7001	[cluster]
root	2368	1	0 09:21 ?	00:00:00 redis-server 0.0.0.0: 7002	[cluster]
root	2374	1	0 09:21 ?	00:00:00 redis-server 0.0.0.0: 7003	[cluster]
root	2380	1	0 09:21 ?	00:00:00 redis-server 0.0.0.0: 8001	[cluster]
root	2386	1	0 09:21 ?	00:00:00 redis-server 0.0.0.0: 8002	[cluster]
root	2392	1	0 09:21 ?	00:00:00 redis-server 0.0.0.0: 8003	[cluster]

如果要关闭所有进程,可以执行命令:

ps -ef | grep redis | awk '{print $2}' | xargs kill

或者(推荐这种方式):

printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown

创建集群

虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联。

我们需要执行命令来创建集群,在 Redis5.0 之前创建集群比较麻烦,5.0 之后集群管理命令都集成到了 redis-cli 中。

1)Redis5.0 之前

Redis5.0 之前集群命令都是用 redis 安装包下的 src/redis-trib.rb 来实现的。因为 redis-trib.rb 是有 ruby 语言编写的所以需要安装 ruby 环境。

# 安装依赖
yum -y install zlib ruby rubygems
gem install redis

然后通过命令来管理集群:

# 进入 redis 的 src 目录
cd /tmp/redis-6.2.4/src
# 创建集群
./redis-trib.rb create --replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

2)Redis5.0 以后

我们使用的是 Redis6.2.4 版本,集群管理以及集成到了 redis-cli 中,格式如下:

redis-cli --cluster create --cluster-replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

命令说明:

运行后的样子:

这里输入 yes,则集群开始创建:

通过命令可以查看集群状态:

redis-cli -p 7001 cluster nodes

测试

尝试连接 7001 节点,存储一个数据:

# 连接
redis-cli -p 7001
# 存储数据
set num 123
# 读取数据
get num
# 再次存储
set a 1

结果悲剧了:

集群操作时,需要给 redis-cli 加上 -c 参数才可以:

redis-cli -c -p 7001

这次可以了:

散列插槽

插槽原理

Redis 会把每一个 master 节点映射到 0~16383 共 16384 个插槽(hash slot)上,查看集群信息时就能看到:

数据 key 不是与节点绑定,而是与插槽绑定。redis 会根据 key 的有效部分计算插槽值。简单说就是根据 key 的哈希映射判断,这个 key 存储在哪里。

key 的有效部分分两种情况:

例如:key 是 num,那么就根据 num 计算,如果是 {itcast} num,则根据 itcast 计算。计算方式是利用 CRC16 算法得到一个 hash 值,然后对 16384 取余,得到的结果就是 slot 值。

如图,在 7001 这个节点执行 set a 1 时,对 a 做 hash 运算,对 16384 取余,得到的结果是 15495,因此要存储到 103 节点。

到了 7003 后,执行 get num 时,对 num 做 hash 运算,对 16384 取余,得到的结果是 2765 插槽的范围不在 7003 内,而是在 7001 的插槽范围内,因此需要切换到 7001 节点。

为什么 Redis 的 key 要和插槽绑定而不是直接和 Redis 实例绑定呢?因为 Redis 的实例可能会宕机,key 直接和实例绑定的话,宕机了 key 就没有对应的实例了。如果和插槽绑定的话,插槽对应的实例是可以进行更替(更方便)的,数据跟着插槽走,永远都可以找到插槽的位置。

小结

Redis 如何判断某个 key 应该在哪个实例?

如何将同一类数据固定的保存在同一个 Redis 实例?

集群伸缩

redis-cli –cluster 提供了很多操作集群的命令,可以通过下面方式查看:

添加节点的命令

需求分析

需求:向集群中添加一个新的 master 节点,并向其中存储 num = 10

这里需要两个新的功能:

创建 Redis 实例

创建一个文件夹:

mkdir 7004

拷贝配置文件:

cp redis.conf /7004

修改配置文件:

sed /s/6379/7004/g 7004/redis.conf

启动

redis-server 7004/redis.conf

添加新节点到 redis

执行命令:

redis-cli --cluster add-node  192.168.150.101:7004 192.168.150.101:7001

通过命令查看集群状态:

redis-cli -p 7001 cluster nodes

如图,7004 加入了集群,并且默认是一个 master 节点:

但是,可以看到 7004 节点的插槽数量为 0,因此没有任何数据可以存储到 7004 上

转移插槽

我们要将 num 存储到 7004 节点,因此需要先看看 num 的插槽是多少:

如上图所示,num 的插槽为 2765。

我们可以将 0~3000 的插槽从 7001 转移到 7004,命令格式如下:

具体命令如下:

建立连接:

得到下面的反馈:

询问要移动多少个插槽,我们计划是 3000 个:

新的问题来了:

那个 node 来接收这些插槽??

显然是 7004,那么 7004 节点的 id 是多少呢?

复制这个 id,然后拷贝到刚才的控制台后:

这里询问,你的插槽是从哪里移动过来的?

这里我们要从 7001 获取,因此填写 7001 的 id:

填完后,点击 done,这样插槽转移就准备好了:

确认要转移吗?输入 yes:

然后,通过命令查看结果:

可以看到:

目的达成。

故障转移

集群初识状态是这样的:

其中 7001、7002、7003 都是 master,我们计划让 7002 宕机。

自动故障转移

当集群中有一个master宕机会发生什么呢?比如直接停止一个 redis 实例,例如 7002:

redis-cli -p 7002 shutdown

1)首先是该实例与其它实例失去连接

2)然后是疑似宕机:

3)最后是确定下线,自动提升一个 slave 为新的 master:

4)当 7002 再次启动,就会变为一个 slave 节点了:

手动故障转移

利用 cluster failover 命令可以手动让集群中的某个 master 宕机,切换到执行 cluster failover 命令的这个 slave 节点,实现无感知的数据迁移。其流程如下:

这种 failover 命令可以指定三种模式:

案例需求:在 7002 这个 slave 节点执行手动故障转移,重新夺回 master 地位。

步骤如下:

1)利用 redis-cli 连接 7002 这个节点

2)执行 cluster failover 命令

payphone@cv:/home/redis-6.2.1$ redis-cli -p 7002
127.0.0.1:7002> CLUSTER FAILOVER
OK

效果:

RedisTemplate访问分片集群

RedisTemplate 底层同样基于 lettuce 实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:

1)引入 redis 的 starter 依赖

2)配置分片集群地址

3)配置读写分离

与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:

spring:
  redis:
    cluster:
      nodes:
        - 192.168.150.101:7001
        - 192.168.150.101:7002
        - 192.168.150.101:7003
        - 192.168.150.101:8001
        - 192.168.150.101:8002
        - 192.168.150.101:8003

最佳实践

键值设计

优雅的Key设计

Redis 的 key 虽然可以自定义,但最好遵循下面的几个最佳实践约定:

例如:登录业务,保存用户信息,key 可以这样设计 login:user:10

优点

# 示例 如果是 4.0 版本以下的 redis embstr 的长度限制是 39 字节
set name 123
type num # string
object encoding num # "int" 类型

set name Jack
object encoding name # "embstr"
type name # string

set name aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa # 44 字节
object encoding name # "embstr"

set name aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa # 45 字节
object encoding name # "raw"

慎用BigKey

BigKey 通常以 Key 的大小和 key 中的成员的数量来综合判断的,如

推荐值

memory usage name # 衡量 key 占用的字节大小,不推荐使用,耗费 CPU,实践上预估一下就行。
# 字符串看长度,集合看大小,大致估计

BigKey 的危害

如何发现 BigKey

删除 BigKey

BigKey 内存占用较多,删除这些 key 也需要耗费很长的时间,导致 Redis 主线程阻塞,引发一系列问题。

恰当的数据类型

BigKey 往往都是业务设计不恰当导致的,选择更合适的数据类型,避免 BigKey。

比如,存储一个 User 对象,我们有三种存储方式

每一次存储 key,value 的时候,在 redis 内部是有很多元信息要保存的,原本一个 key 可以解决的,你用多个 key,元信息的内存消耗就上来了。

假如有 hash 类型的 key,其中有 100 万对 field 和 value,field 是自增 id,这个 key 存在什么问题?如何优化?

方案一:修改 hash entry 的数量上限

config get hash-max-ziplist-entries # 获取配置的最大值
config set hash-max-ziplist-entries 1000 # 设置最大值为 1000

方案二:拆分为 string 类型

方案三:把一个大的 hash 拆分为小的 hash,分开存储(数据分片)

hash、set、hashset 这些的 key 有内存优化(ziplist),推荐使用。临时存储信息,占用存储空间小的用 string 还是挺合适的,比如短信验证码。

总结

key 的最佳实践

value 的最佳实践

批处理优化

Pipeline

大数据量的导入。

单个命令的执行流程

一次命令的响应时间 = 1 次往返的网络传输耗时 + 1 次 Redis 执行命令耗时

N 条命令批量执行

N 次命令的响应时间 = 1 次往返的网络传输耗时 + N 次 Redis 执行命令耗时

批处理方案

较少网络耗时,使用 mset、hmset 这些命令,如利用 mset 批量插入 10 万条数据,每次插入 1k 条。但是,不要在一次批处理中传输太多命令,否则单次命令占用带宽过多,会导致网络阻塞。

MSET 这些命令虽然可以进行批处理操作,但是只能操作部分数据类型,因此如果有对复杂数据类型的批处理需求,可以使用 Pipeline。

void test(){
    Pipeline pipeline = jedis.pipelined();
    for(int i=1; i<= 100000; i++){
        pipeline.set("test:key_"+i,"value_"+i);
        // pipelien.zaddXX
        if(i%1000 == 0){
            // 每放入 1000 条命令,批量执行
            pipeline.sync();
        }
    }
}

M 操作比 Pipeline 快,因为 M 操作是 Redis 内置的操作,Redis 会把 M 操作的多组 key 和 value 作为一个原子性操作,一次性执行完;而 Pipeline 是把所有命令一起发过去,但未必是一起执行。

总结

批处理方案

注意事项

集群下的批处理

如 MSET 或 Pipeline 这样的批处理需要在一次请求中携带多条命令,而此时如果 Redis 是一个集群,那批处理命令的多个 key 必须落在一个插槽中,否则会导致执行失败。

- 串行命令 串行 slot 并行 slot hash_tag
实现思路 for 循环遍历,依次执行每个命令 在客户端计算每个 key 的 slot,将 slot 一致分为一组,每组都利用 Pipelien 批处理。串行执行各组命令。 在客户端计算每个 key 的 slot,将 slot 一致分为一组,每组都利用 Pipelien 批处理。并行执行各组命令。 将所有 key 设置相同的 hash_tag,则所有 key 的 slot 一定相同。
耗时 N 次网络耗时+N 次命令耗时 m 次网络耗时 + N 次命令耗时
m = key 的 slot 个数
1 次网络耗时+N 次命令耗时 1 次网络耗时+N 次命令耗时
优点 实现简单 耗时较短 耗时非常短 耗时非常短,实现简单
缺点 耗时非常久 实现稍复杂
slot 越多,耗时越久
实现复杂 容易出现数据倾斜

原理

后面再看 《Reids 深度历险》《Redis 设计与实现》补充。

数据结构

动态字符串

我们都知道 Redis 中保存的 Key 是字符串,value 往往是字符串或者字符串的集合。可见字符串是 Redis 中最常用的一种数据结构。不过 Redis 没有直接使用 C 语言中的字符串,因为 C 语言字符串存在很多问题:

例如,我们执行命令:set name kkx,那么 Redis 将在底层创建两个 SDS,其中一个是包含 “name” 的 SDS,另一个是包含 “kkx” 的 SDS。

Redis 是 C 语言实现的,其中 SDS 是一个结构体,源码如下:

例如,一个包含字符串 “name” 的 sds 结构如下:

SDS 之所以叫做动态字符串,是因为它具备动态扩容的能力,例如一个内容为 “hi” 的 SDS:

假如我们要给 SDS 追加一段字符串 “,Amy”,这里首先会申请新内存空间:

如果新字符串小于 1M,则新空间为扩展后字符串长度的两倍 +1;

如果新字符串大于 1M,则新空间为扩展后字符串长度 +1M+1。称为内存预分配。

优点

intset

IntSet 是 Redis 中 set 集合的一种实现方式,基于整数数组来实现,并且具备长度可变、有序等特征。其结构如下:

typedef struct intset {
    uint32_t encoding; /*编码方式,支持存放16位、32位、 64位整数*/
    uint32_t length; /*元素个数*/
    int8_t contents[]; /*整数数组,保存集合数据*/
} intset;

int8_t 中存储的不是元素。因为 Redis 的数组并未使用 C 语言相关的定义,所有的增删改查操作都是自己定义的,所有数组中内容的定义 int8_t 并不是元素的实际类型,元素的编码类型实际上是 encoding 属性来决定的。其中的 encoding 包含三种模式,表示存储的整数大小不同:

/* Note that these encodings are ordered, so:
* INTSET ENC INT16 < INTSET ENC INT32 < INTSET ENC INT64. */
#define INTSET_ENC_INT16 (sizeof(int16_t)) /* 2字节整数,范围类似java的short*/
#define INTSET_ENC_INT32 (sizeof(int32_t)) /* 4字节整数,范围类似java的int */
#define INTSET_ENC_INT64 (sizeof(int64_t)) /* 8字节整数,范围类似java的long */

为了方便查找,Redis 会将 intset 中所有的整数按照升序依次保存在 contents 数组中,结构如图:

现在,数组中每个数字都在 int16_t 的范围内,因此采用的编码方式是 INTSET_ENC_INT16,每部分占用的字节大小为:

统一编码格式是为了方便基于数组角标进行寻址。

我们向该其中添加一个数字:50000,这个数字超出了 int16_t 的范围,intset 会自动升级编码方式到合适的大小。以当前案例来说流程如下:

源码如下

intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    uint8_t valenc = _intsetValueEncoding(value);// 获取当前值编码
    uint32_t pos; //要插入的位置
    if (success) *success = 1; 
    //判断编码是不是超过了 当前intset的编码
    if (valenc > intrev32ifbe(is->encoding)) {
        //超出编码,需要升级
        return intsetUpgradeAndAdd(is,value);
    } else {
        //在当前intset中查找值与value-样的元素的角标pos
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0; //如果找到了,则无需插入,直接结束并返回失败
            return is;
        }
        //数组扩容
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        //移动数组中pos之后的元素到pos+1,给新元素腾出空间
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }
    //插入新元素
    _intsetSet(is,pos,value);
    //元素长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+ 1);
    return is;
}

static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    //获取当前intset编码
    uint8_t curenc = intrev32ifbe(is->encoding);
    //获取新编码
    uint8_t newenc = _intsetValueEncoding(value);
    //获取元素个数
    int length = intrev32ifbe(is->length);
    //判断新元素是大于0还是小于0,小于0插入队首、大于0插入队尾
    intprepend=value<0?1:0;
    //重置编码为新编码
    is->encoding = intrev32ifbe(newenc); 
    //重置数组大小
    is = intsetResize(is,intrev32ifbe(is->length)+ 1);
    //倒序遍历,逐个搬运元素到新的位置,_intsetGetEncoded按照旧编码方 式查找元素
    while(length--) // intsetSet按照新编码方式插入新元素
        _intsetSet(is,length+prepend, intsetGetEncoded(is,length,curenc));
    /*插入新元素,prepend决定是队首还是队尾*/
    if (prepend)
        _intsetSet(is,0,value); 
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    //修改数组长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

总结

Intset 可以看做是特殊的整数数组,具备一些特点:

Dict

Redis 是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系正是通过 Dict 来实现的。而 Dict 由三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)

typedef struct dictht {
    // entry数组
    //数组中保存的是指向entry的指针
    dictEntry **table;
    //哈希表大小
    unsigned long size;
    //哈希表大小的掩码,总等于size - 1
    unsigned long sizemask;
    // entry个数
    unsigned long used;
} dictht; 

typedef struct dictEntry {
    void *key;//键
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    }v; //值
    //下一个Entry的指针
    struct dictEntry *next;
} dictEntry;

当我们向 Dict 添加键值对时,Redis 首先根据 key 计算出 hash 值(h),然后利用 h & sizemask 来计算元素应该存储到数组中的哪个索引位置。我们存储 k1=v1,假设 k1 的哈希值 h=1,则 1&3=1,因此 k1=v1 要存储到数组角标 1 位置。

为什么 h & sizemask 可以取到余数呢?因为 size 是 2 的 n 次方,sizemask = size-1,用位运算可以取到余数,比如 size = 4,则 sizemask = 1,要求 4 放在那个索引位置,则 4 & 3 = (0100 & 0011 = 0000) ,应该放在索引为 0 的位置,可以拿到余数的部分。

Dict 由三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)

字典结构体

typedef struct dict {
    dictType *type; // dict类型,内置不同的hash函数
    void *privdata; // 私有数据,在做特殊hash运算时用
    dictht ht[2]; //一个Dict包含两个哈希表,其中-个是当前数据,另一个般是 空,rehash时使用
    long rehashidx; // rehash的进度,- 1表示未进行
    int16_t pauserehash; // rehash是否暂停,1则暂停,0则继续
} dict;

哈希表结构体

typedef struct dictht {
    // entry数组
    //数组中保存的是指向entry的指针
    dictEntry **table;
    //哈希表大小
    unsigned long size;
    //哈希表大小的掩码,总等于size - 1
    unsigned long sizemask;
    // entry个数
    unsigned long used;
} dictht;

哈希节点结构体

typedef struct dictEntry {
    void *key; //键
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    }v; //值
    //下一个Entry的指针
    struct dictEntry *next;
} dictEntry;

Dict 的扩容

static int _dictExpandlfNeeded(dict *d){
    //如果正在rehash,则返回ok
    if (dictlsRehashing(d)) return DICT_OK;
    //如果哈希表为空,则初始化哈希表为默认大小: 4
    if (d->ht[O].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
    //负载因子(used/size) 达到1以上,粗当前没有进行bgrewrite等子进程操作
    //或者负载因子超过5,则进行dictExpand,也就是扩容
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize || d->ht[0].used / d->ht[O].size > dict_force_resize_ratio)){
        //扩容大小为used + 1,底层会对扩容大小做判断,实际上找的是第一个大于等于used+1的2^n
        return dictExpand(d, d-> ht[0].used + 1); 
    }
    return DICT OK;
}

Dict 除了扩容以外,每次删除元素时,也会对负载因子做检查,当 LoadFactor < 0.1 时,会做哈希表收缩:

//t_ hash.c # hashTypeDeleted()
if (dictDelete((dict*)o->ptr, field) == C_OK) {
    deleted = 1;
    //删除成功后,检查是否需要重置Dict大小,如果需要则调用dictResize重置
    /* Always check if the dictionary needs a resize after a delete. */
    if (htNeedsResize(o->ptr)) dictResize(o->ptr); 
}

// server.c文件
int htNeedsResize(dict *dict) {
    long long size, used;
    //哈希表大小
    size = dictSlots(dict);
    // entry数量
    used =
        dictSize(dict);
    // size > 4 (哈希表初识大小)組负载因子低于0.1
    return (size > DICT HT INITIAL SIZE && (used*100/size <
                                            HASHTABLE MIN_ FIL));
}

int dictResize(dict *d){
    unsigned long minimal;
    //如果正在做bgsave或bgrewriteof或rehash,则返回错误
    if (!dict_can_resize || dictlsRehashing(d))
        return DICT_ERR;
    //获取used,也就是entry个数
    minimal = d->ht[0].used;
    //如果used小于4,则重置为4
    if (minimal < DICT_HT_INITIAL_SIZE)
        minimal = DICT_HT_INITIAL_SIZE;
    //重置大小为minimal,实是第一个大于等 于minimal的2^n
    return dictExpand(d, minimal);
}

Dict 的 rehash

不管是扩容还是收缩,必定会创建新的哈希表,导致哈希表的 size 和 sizemask 变化,而 key 的查询与 sizemask 有关。因此必须对哈希表中的每一个 key 重新计算索引,插入新的哈希表,这个过程称为 rehash。过程是这样的:

整个过程可以描述成:

rehash 是在做增删改的时候判断的,增删往往是由主线程来操作的,如果 hash 中有大量元素,一次性 rehash 所有元素的话,会导致主线程阻塞。因此 Dict 的 rehash 并不是一次性完成的,而是渐进式的。

总结

Dict 的结构:

Dict 的伸缩:

ZipList

ZipList 是一种特殊的“双端链表” ,由一系列特殊编码的连续内存块组成。可以在任意一端进行压入/弹出操作, 并且该操作的时间复杂度为 O(1)。

属性 类型 长度 用途
zlbytes uint32_t 4 字节 记录整个压缩列表占用的内存字节数
zltail uint32_t 4 字节 记录压缩列表表尾节点距离压缩列表的起始地址有多少字节,通过这个偏移量,可以确定表尾节点的地址。
zllen uint16_t 2 字节 记录了压缩列表包含的节点数量。 最大值为 UINT16_MAX (65534),如果超过这个值,此处会记录为 65535,
但节点的真实数量需要遍历整个压缩列表才能计算得出。
entry 列表节点 不定 压缩列表包含的各个节点,节点的长度由节点保存的内容决定。
zlend uint8_t 1 字节 特殊值 0xFF (十进制 255 ),用于标记压缩列表的末端。

不是说是连续的内存空间吗?为什么 entry 还不固定?这是为了节省内存空间,因为存入压缩链表的不同数据,占用的内存空间不同,如存数字 1 和数字 200 万,entry 会进行动态调整,以便节省内存。

ZipListEntry

ZipList 中的 Entry 并不像普通链表那样记录前后节点的指针,因为记录两个指针要占用 16 个字节,浪费内存。而是采用了下面的结构:

这样 ZipList 既可以通过 previous_entry_length 知道上一个结点的位置,也可以通过计算出当前 entry 的长度(previous_entry_length + encoding 记录的长度信息)得到下一个结点的位置,但是这样就无法随机存取了。注意:ZipList 中所有存储长度的数值均采用小端字节序,即低位字节在前,高位字节在后。例如:数值 0x1234,采用小端字节序后实际存储值为:0x3412

Encoding 编码

ZipListEntry 中的 encoding 编码分为字符串和整数两种,如果 encoding 是以 “00”、“01” 或者 “10” 开头,则证明 content 是字符串。编码长度是指 encoding 这个属性的长度,这个属性中记录了“编码的格式”(字符串/整数)和“内容的大小”

编码 编码长度 字符串大小
|00pppppp| 1 bytes <= 63 bytes
|01pppppp|qqqqqqqq| 2 bytes <= 16383 bytes
|10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| 5 bytes <= 4294967295 bytes

例如,我们要保存字符串:“ab” 和 “bc”

ZipListEntry 中的 encoding 编码分为字符串和整数两种:

整数:如果 encoding 是以 “11” 开始,则证明 content 是整数,且 encoding 固定只占用 1 个字节

编码 编码长度 整数类型
11000000 1 int16_t(2 bytes)
11010000 1 int32_t(4 bytes)
11100000 1 int64_t(8 bytes)
11110000 1 24 位有符整数(3 bytes)
11111110 1 8 位有符整数(1 bytes)
1111xxxx 1 直接在 xxxx 位置保存数值,范围从 0001~1101,减 1 后结果为实际值

ZipList的连锁更新问题

ZipList 的每个 Entry 都包含 previous_entry_length 来记录上一个节点的大小,长度是 1 个或 5 个字节,在更新 ZipList 中的数据时,如果记录的内容长度从 254 字节以内变成 254 字节以上,那么 previous_entry_length 长度会变为 5 字节,那么该 entry 会增加四个字节,后面的又要记录该 entry 的长度,就要更新,后的也要更新,导致连锁更新。

现在,假设我们有 N 个连续的、长度为 250~253 字节之间的 entry,因此 entry 的 previous_entry_length 属性用 1 个字节即可表示,如图所示:

ZipList 这种特殊情况下产生的连续多次空间扩展操作称之为连锁更新(Cascade Update)。新增、删除都可能导致连锁更新的发生。

ZipList 特性总结

QuickList

问题 1:ZipList 虽然节省内存,但申请内存必须是连续空间,如果 ZipList 内存占用较多,那申请内存的效率可能会很低。因为 OS 的内存是碎片化,很难找到一个大内存(如果是空闲链表法一类的内存分配算法,需要遍历链表查找合适的内存块),如果没有的话 OS 就需要进行内存紧凑,内存紧凑的开销是很大的。

答:为了缓解这个问题,我们必须限制 ZipList 的长度和 entry 大小。

问题 2:但是我们要存储大量数据,超出了 ZipList 最佳的上限该怎么办?

答:可以利用数据分片的思想,创建多个 ZipList 来分片存储数据。

问题 3:数据拆分后比较分散,不方便管理和查找,这多个 ZipList 如何建立联系?

答:Redis 在 3.2 版本引入了新的数据结构 QuickList,它是一个双端链表,只不过链表中的每个节点都是一个 ZipList。

为了避免 QuickList 中的每个 ZipList 中 entry 过多,Redis 提供了一个配置项:list-max-ziplist-size 来限制。

其默认值为 -2:

127.0.0.1:6379> config get list-max-ziplist-size
1) "list-max-ziplist-size"
2) "-2"

除了控制 ZipList 的大小,QuickList 还可以对节点的 ZipList 做压缩。通过配置项 list-compress-depth 来控制。因为链表一般都是从首尾访问较多,所以首尾是不压缩的。list-compress-depth 可以控制首尾不压缩的节点个数:

list-compress-depth 默认值为 0,如果中间的节点访问的频率较低,可以开启节点压缩,节省内存空间。

以下是 QuickList 的和 QuickListNode 的结构体:

typedef struct quicklist {
    //头节点指针
    quicklistNode *head;
    //尾节点指针
    quicklistNode *tail;
    //所有ziplist的entry的数量
    unsigned long count;
    // ziplists总数量
    unsigned long len;
    // ziplist的entry上限,默认值-2
    int fill: QL_FILL_BITS;
    //首尾不压缩的节点数量
    unsigned int compress : QL_COMP_BITS;
    //内存重分配时的书签数量及数组,一般用不到
    unsigned int bookmark_count: QL_BM_BITS;
    quicklistBookmark bookmarks[];
} quicklist;
typedef struct quicklistNode {
    //前一个节点指针
    struct quicklistNode *prev;
    //下一个节点指针
    struct quicklistNode *next;
    //当前节点的ZipList指针
    unsigned char *zl;
    //当前节点的ZipList的字节大小
    unsigned int sZ;
    //当前节点的ZipList的entry个数
    unsigned int count: 16;
    //编码方式: 1, ZipList; 2,Izf压缩模式
    unsigned int encoding : 2;
    //数据容器类型(预留) : 1,期; 2, ZipList
    unsigned int container : 2;
    //是否被解压缩。1:则说明被解压了,将来要重新压缩
    unsigned int recompress : 1;
    unsigned int attempted_compress : 1; //测试用
    unsigned int extra: 10; /*预留字段*/
} quicklistNode;

我们接下来用一段流程图来描述当前的这个结构

QuickList 特点总结

SkipList

如果需要从中间随机查询数据,那么查询性能就比较差了。而 SkipList 可以解决这个问题。SkipList(跳表)首先是链表,但与传统链表相比有几点差异:

SkipList 的结构体定义如下

// t zset.c
typedef struct zskiplist {
    // 头尾节点指针
    struct zskiplistNode *header, *tai;
    //节点数量
    unsigned long length;
    //最大的索引层级,默认是1
    int level;
} zskiplist;

typedef struct zskiplistNode {
    sds ele; //节点存储的值
    double score://节点分数,排序、查找用
    struct zskiplistNode *backward; // 前一个节点指针
        struct zskiplistLevel {
            struct zskiplistNode *forward;//下一一个节点指针
            unsigned long span; //索引跨度
        } level[]; //多级索引数组
} zskiplistNode;

SkipList 特点总结

RedisObject

Redis 中的任意数据类型的键和值都会被封装为一个 RedisObject,也叫做 Redis 对象,源码如下:

占用字节数:4+4+24=32 bit = 4 字节,refcount 占 4 字节,prt 一般占 8 字节;redisObject 一共占 16 字节(不包含指针指向的字节)。我们从对象头占用的字节数来分析下 string 的开销。如果用 set,hash 这种,用一个对象头就可以存储很多键值对了;用 string 则需要很多额外的对象头信息,内存占用较大。

什么是 redisObject

从 Redis 的使用者的角度来看,⼀个 Redis 节点包含多个 database(非 cluster 模式下默认是 16 个,cluster 模式下只能是 1 个),而一个 database 维护了从 key space 到 object space 的映射关系。这个映射关系的 key 是 string 类型,⽽ value 可以是多种数据类型,比如:string, list, hash、set、sorted set 等。我们可以看到,key 的类型固定是 string,而 value 可能的类型是多个。而从 Redis 内部实现的⾓度来看,database 内的这个映射关系是用⼀个 dict 来维护的。dict 的 key 固定用⼀种数据结构来表达就够了,这就是动态字符串 sds。而 value 则比较复杂,为了在同⼀个 dict 内能够存储不同类型的 value,这就需要⼀个通⽤的数据结构,这个通用的数据结构就是 robj,全名是 redisObject。

Redis 的编码方式,Redis 中会根据存储的数据类型不同,选择不同的编码方式,共包含 11 种不同类型:

编号 编码方式 说明
0 OBJ_ENCODING_RAW raw 编码动态字符串
1 OBJ_ENCODING_INT long 类型的整数的字符串
2 OBJ_ENCODING_HT hash 表(字典 dict)
3 OBJ_ENCODING_ZIPMAP 已废弃
4 OBJ_ENCODING_LINKEDLIST 双端链表
5 OBJ_ENCODING_ZIPLIST 压缩列表
6 OBJ_ENCODING_INTSET 整数集合
7 OBJ_ENCODING_SKIPLIST 跳表
8 OBJ_ENCODING_EMBSTR embstr 的动态字符串
9 OBJ_ENCODING_QUICKLIST 快速列表
10 OBJ_ENCODING_STREAM Stream 流

五种数据结构,Redis 中会根据存储的数据类型不同,选择不同的编码方式。每种数据类型的使用的编码方式如下。

数据类型 编码方式
OBJ_STRING int、embstr、raw
OBJ_LIST LinkedList 和 ZipList (3.2 以前)、QuickList (3.2 以后)
OBJ_SET intset、HT(哈希表)
OBJ_ZSET ZipList、HT、SkipList
OBJ_HASH ZipList、HT

String

String 是 Redis 中最常见的数据存储类型:

String 底层实现方式采用的是动态字符串 sds 或者 long。String 的内部存储结构⼀般是 sds(Simple Dynamic String,可以动态扩展内存),但是如果⼀个 String 类型的 value 的值是数字,那么 Redis 内部会把它转成long类型来存储,从而减少内存的使用。

44 字节,变成连续编码。连续编码的优势在于,只需申请一次内存,效率更高。内存分配涉及到用户态和内核态的切换,少一次分配就少一次切换。

len - 8 bit;alloc - 8 bit;flag - char,占一个字节;尾部标记 ‘/0’ 占一个字节;共 16+1+1+1+44+1=64 字节;Redis 底层默认采用的内存分配的算法是 jemalloc(可选的分配器还有:glibe、tcmalloc)会以 $2^n$ 分配内存,而 64 恰好是 $2^6$ 因此也不会尝试内存碎片,因此以 44 作为限制。在 Redis 中使用 String 类型时,尽量不要让字符串超过 44 字节。

String 类型数据的三种存储方式小结

String 在 Redis 中是用⼀个 robj 来表示的。用来表示 String 的 robj 可能编码成 3 种内部表示。

其中前两种编码使用的是 sds 来存储,最后⼀种 OBJ_ENCODING_INT 编码直接把 string 存成了 long 型。在对 string 进行 incr, decr 等操作的时候,如果它内部是 OBJ_ENCODING_INT 编码,那么可以直接行加减操作;如果它内部是 OBJ_ENCODING_RAW 或 OBJ_ENCODING_EMBSTR 编码,那么 Redis 会先试图把 sds 存储的字符串转成 long 型,如果能转成功,再进行加减操作。对⼀个内部表示成 long 型的 string 执行 append, setbit, getrange 这些命令,针对的仍然是 string 的值(即十进制表示的字符串),而不是针对内部表⽰的 long 型进行操作。比如字符串 ”32”,如果按照字符数组来解释,它包含两个字符,它们的 ASCII 码分别是 0x33 和 0x32。当我们执行命令 setbit key 7 0 的时候,相当于把字符 0x33 变成了 0x32,这样字符串的值就变成了 ”22”。而如果将字符串 ”32” 按照内部的 64 位 long 型来解释,那么它是 0x0000000000000020,在这个基础上执行 setbit 位操作,结果就完全不对了。因此,在这些命令的实现中,会把 long 型先转成字符串再进行相应的操作。

List

Redis 的 List 类型可以从首、尾操作列表中的元素。

127.0.0.1:6379> lpush l1 e3 e2 e1	# 从 head 写入
(integer) 3
127.0.0.1:6379> rpush l1 e4 e5 e6	# 从 tail 写入
(integer) 6
127.0.0.1:6379> lrange l1 0 6		# 范围获取
1) "e1"
2) "e2"
3) "e3"
4) "e4"
5) "e5"
6) "e6"
127.0.0.1:6379> lpop l1 1			# 从 head 取
1) "e1"
127.0.0.1:6379> rpop l1 1			# 从 tail 取
1) "e6"

哪一个数据结构能满足上述特征?

Redis 的 List 结构类似一个双端链表,可以从首、尾操作列表中的元素:

Set

Set 是 Redis 中的单列集合,满足下列特点:

127.0.0.1:6379> sadd s1 m1 m2 m3
(integer) 3
127.0.0.1:6379> sadd s2 m2 m3 m4
(integer) 3
127.0.0.1:6379> sismember s1 m1	# 判断元素是否存在
(integer) 1
127.0.0.1:6379> sinter s1 s2	# 求交集
1) "m2"
2) "m3"

可以看出,Set 对查询元素的效率要求非常高,思考一下,什么样的数据结构可以满足?HashTable,也就是 Redis 中的 Dict,不过 Dict 是双列集合(可以存键、值对);跳表也可以实现快速查找的功能,但是跳表有个排序的功能,Set 并不需要。

Set 是 Redis 中的集合,不一定确保元素有序,可以满足元素唯一、查询效率要求极高。

robj *setTypeCreate(sds value) {
    //判断value是否是数值类型long long
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        //如果是数值类型,则采用IntSet编码
        return createlntsetObject();
    //否则采用默认编码,也就是HT
    return createSetObject();
}

robj *createlntsetObject(void) {
    //初始化INTSET并申请内存空间
    intset *is = intsetNew();
    //创建RedisObject
    robj *o = createObject(OBJ SET,is);
    //指定编码为INTSET
    o->encoding = OBJ_ENCODING_INTSET;
    return O;
}

robj *createSetObject(void) {
    //初始化Dict类型,并申请内存
    dict *d = dictCreate(&setDictType,NULL);
    //创建RedisObject
    robj *o = createObject(OBJ_SET,d);
    //设置encoding为HT
    o->encoding = OBJ_ENCODING_HT;
    return o;
}

结构如下

ZSET

ZSet 也就是 SortedSet,其中每一个元素都需要指定一个 score 值和 member 值:

因此,zset 底层数据结构必须满足键值存储、键必须唯一、可排序这几个需求。之前学习的哪种编码结构可以满足?

// zset结构
typedef struct zset {
    // Dict指针
    dict *dict;
    // SkipList指针
    zskiplist *zsl;
} zset; 

robj *createZsetObject(void) {
    zset *zs = zmalloc(sizeof(*zs)); 
    robj *o;
    //创建Dict
    zs->dict = dictCreate(&zsetDictType, NULL);
    //创建SkipList
    zs->zsl = zslCreate();
    o = createObject(OBJ_ZSET, zs);
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}

ZSET 结构示意图:采用了 dict + skiplist 的模式,如果只是查找元素,直接通过 dict 进行查找,如果需要排序的信息,就先从 skiplist 中查找指定范围内的元素,然后在用 key 从 dict 中查找对应的 value。

当元素数量不多时,HT 和 SkipList 的优势不明显,而且更耗内存。因此,当元素数量不多时查询速度也不会慢,此时 zset 还会采用 ZipList 结构来节省内存,不过需要同时满足两个条件:

ziplist 本身没有排序功能,而且没有键值对的概念,因此需要有 zset 通过编码实现:

// zadd添加元素时,先根据key找到zset,不存在则创建新的zset
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ ZSET)) goto cleanup;
//判断是否存在
if (zobj == NUL){// zset不存在
    if (server.zset_max_ziplist_entries == 0 ||
        server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
    { // zset_max_ziplist_entries 设置为 0 就是禁用了 ZipList,
        // 或者 value 大小超过了 zset_max_ziplist_value,采用 HT + SkipList
        zobj = createZsetObject();
    }else {//否则,采用ZipList
        zobj = createZsetZiplistObject();
    }
    dbAdd(c->db,key,zobj);
}
//..
zsetAdd(zobi, score, ele, flaas, &retflaas, &newscore);
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore){
    /*判断编码方式*/
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {//是ZipList编码
        unsigned char *eptr; 
        //判断当前元素是否经存在,已经存在则更新score即可
        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
            //...略
            return 1;
        } else if (!xx) {
            //元素不存在,需要新增,则判断ziplist长度有没有超、元素的大小有没有超
            if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries
                || sdslen(ele) > server.zset_max_ziplist_value
                || !ziplistSafeToAdd(zobj->ptr, sdslen(elel)))
            {//如果超出,则需要转为SkipList编码
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            } else {
                zobj->ptr = zzlInsert(zobj-> ptr,ele,score);
                if (newscore) *newscore = score;
                *out_ flags |= ZADD_OUT_ADDED;
                return 1;
            }
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    }
    //本身就是SKIPLIST编码,无需转换
    if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        // ..略
    } else{
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

Hash

Hash 结构与 Redis 中的 Zset 非常类似:

区别如下:

因此,Hash 底层采用的编码与 Zset 基本一致,只需要把排序有关的 SkipList 去掉即可。

当满足上面两个条件其中之⼀的时候,Redis 就使用 dict 字典来实现 hash。Redis 的 hash 之所以这样设计,是因为当 ziplist 变得很大的时候,它有如下几个缺点:

总之,ziplist 本来就设计为各个数据项挨在⼀起组成连续的内存空间,这种结构并不擅长做修改操作。⼀旦数据发⽣改动,就会引发内存 realloc,可能导致内存拷贝。

网络模型

用户空间和内核态空间

为了避免用户应用导致冲突/内核崩溃,用户应用与内核是分离的:


补充知识:寻址空间和 Linux 权限分级~

寻址空间

什么是寻址空间呢?我们的应用程序也好,还是内核空间也好,都是没有办法直接去物理内存的,而是通过分配一些虚拟内存映射到物理内存中,我们的内核和应用程序去访问虚拟内存的时候,就需要一个虚拟地址,这个地址是一个无符号的整数,比如一个 32 位的操作系统,他的带宽就是 32,他的虚拟地址就是 2 的 32 次方,也就是说他寻址的范围就是 0~2 的 32 次方, 这片寻址空间对应的就是 2 的 32 个字节,就是 4GB,这个 4GB,会有 3 个 GB 分给用户空间,会有 1GB 给内核系统

Linux 权限分级

Linux 权限分成两个等级,0 和 3,用户空间只能执行受限的命令(Ring3),而且不能直接调用系统资源,必须通过内核提供的接口来访问内核空间可以执行特权命令(Ring0),调用一切系统资源,所以一般情况下,用户的操作是运行在用户空间,而内核运行的数据是在内核空间的,而有的情况下,一个应用程序需要去调用一些特权资源,去调用一些内核空间的操作,所以此时他俩需要在用户态和内核态之间进行切换。比如:

针对这个操作:我们的用户在写读数据时,会去向内核态申请,想要读取内核的数据,而内核数据要去等待驱动程序从硬件上读取数据,当从磁盘上加载到数据之后,内核会将数据写入到内核的缓冲区中,然后再将数据拷贝到用户态的 buffer 中,然后再返回给应用程序,整体而言,速度慢,就是这个原因,为了加速,我们希望 read 也好,还是 wait for data 也最好都不要等待,或者时间尽量的短。

阻塞IO

在《UNIX 网络编程》一书中,总结归纳了 5 种 IO 模型

应用程序想要去读取数据,他是无法直接去读取磁盘数据的,他需要先到内核里边去等待内核操作硬件拿到数据,这个过程就是1,是需要等待的,等到内核从磁盘上把数据加载出来之后,再把这个数据写给用户的缓存区,这个过程是 2,如果是阻塞 IO,那么整个过程中,用户从发起读请求开始,一直到读取到数据,都是一个阻塞状态。具体流程如下图:

用户去读取数据时,会去先发起 recvform 一个命令,去尝试从内核上加载数据,如果内核没有数据,那么用户就会等待,此时内核会去从硬件上读取数据,内核读取数据之后,会把数据拷贝到用户态,并且返回 ok,整个过程,都是阻塞等待的,这就是阻塞 IO

阶段一

阶段二

可以看到,阻塞 IO 模型中,用户进程在两个阶段都是阻塞状态。

非阻塞IO

顾名思义,非阻塞 IO 的 recvfrom 操作会立即返回结果而不是阻塞用户进程。

阶段一

阶段二

可以看到,非阻塞 IO 模型中,用户进程在第一个阶段是非阻塞,第二个阶段是阻塞状态。虽然是非阻塞,但性能并没有得到提高。而且忙等机制会导致 CPU 空转,CPU 使用率暴增。

IO多路复用

无论是阻塞 IO 还是非阻塞 IO,用户应用在一阶段都需要调用 recvfrom 来获取数据,差别在于无数据时的处理方案:

所以怎么看起来以上两种方式性能都不好。而且,在单线程情况下,只能依次处理 IO 事件,如果正在处理的 IO 事件恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有 IO 事件都必须等待,性能自然会很差。就比如服务员给顾客点餐,分两步

要提高效率有几种办法?①多线程;②不排队,数据就绪了,用户应用就去读取数据。用户进程如何知道内核中数据是否就绪?多路复用模型就是解决这个问题的。

文件描述符(File Descriptor)简称 FD,是一个从 0 开始的无符号整数,用来关联 Linux 中的一个文件。在 Linux 中,一切皆文件,例如常规文件、视频、硬件设备等,当然也包括网络套接字(Socket)。

IO 多路复用:利用单个线程同时监听多个 FD,并在某个 FD 可读、可写时得到通知,从而避免无效等待,充分利用 CPU 资源。而 IO 多路复用常见的方式有三种:select、poll、epoll

select 方式的 IO 多路复用

阶段一

阶段二

当用户去读取数据的时候,不再去直接调用 recvfrom 了,而是调用 select 的函数,select 函数会将需要监听的数据交给内核,由内核去检查这些数据是否就绪了,如果说这个数据就绪了,就会通知应用程序数据就绪,然后来读取数据,再从内核中把数据拷贝给用户态,完成数据处理,如果 N 多个 FD 一个都没处理完,此时就进行等待。用 IO 复用模式,可以确保去读数据的时候,数据是一定存在的,他的效率比原来的阻塞 IO 和非阻塞 IO 性能都要高

其中 select 和 poll 相当于是当被监听的数据准备好之后,他会把你监听的 FD 整个数据都发给你,你需要到整个 FD 中去找,哪些是处理好了的,需要通过遍历的方式,所以性能也并不是那么好。而 epoll,则相当于内核准备好了之后,他会把准备好的数据,直接发给你,省去了遍历的动作。

select方式

select 是 Linux 最早是由的 I/O 多路复用技术。简单说,就是我们把需要处理的数据封装成 FD,然后在用户态时创建一个 fd 的集合(这个集合的大小是要监听的那个 FD 的最大值+1,但是大小整体是有限制的 ),这个集合的长度大小是有限制的,同时在这个集合中,标明出来我们要控制哪些数据。比如要监听的数据,是 1,2,5 三个数据,此时会执行 select 函数,然后将整个 fd 发给内核态,内核态会去遍历用户态传递过来的数据,如果发现这里边都数据都没有就绪,就休眠,直到有数据准备好时,就会被唤醒,唤醒之后,再次遍历一遍,看看谁准备好了,然后再将处理掉没有准备好的数据,最后再将这个 FD 集合写回到用户态中去,此时用户态就知道了,奥,有人准备好了,但是对于用户态而言,并不知道谁处理好了,所以用户态也需要去进行遍历,然后找到对应准备好数据的节点,再去发起读请求,我们会发现,这种模式下他虽然比阻塞 IO 和非阻塞 IO 好,但是依然有些麻烦的事情, 比如说频繁的传递 fd 集合,频繁的去遍历 FD 等问题。

select 模式存在的问题

poll模式

(21条消息) poll函数详解_青季的博客-CSDN博客_poll函数

poll 模式对 select 模式做了简单改进,但性能提升不明显,部分关键代码如下。

// pollfd 中的事件类型
#define POLLIN //可读事件
#define POLLOUT //可写事件
#define POLLERR //错误事件
#define POLLNVAL // fd未打开

// pollfd结构
struct pollfd {
    int fd; /*要监听的fd*/
    short int events; /* 要监听的事件类型:读、写、异常*/
    short int revents;/* 实际发生的事件类型*/
};

// poll函数
int poll(
    struct pollfd *fds // pollfd数组, 可以自定义大小
    nfds_ t nfds //数组元素个数
    int timeout //超时时间
);

IO 流程

与 select 对比

epoll函数

epoll 模式是对 select 和 poll 的改进,它提供了三个函数:

第一个是:eventpoll 的函数,他内部包含两个东西:一个是红黑树记录的事要监听的 FD;一个是链表记录的是就绪的 FD。紧接着调用 epoll_ctl 操作,将要监听的数据添加到红黑树上去,并且给每个 fd 设置一个监听函数,这个函数会在 fd 数据就绪时触发,就是准备好了,现在就把 fd 把数据添加到 list_head 中去。

struct eventpoll {
    //...
    struct rb_root rbr;//一颗红黑树,记录要监听的FD
    struct list_head rdlist;// 一个链表,记录就绪的FD
    //...
};

// 1.创建-一个epoll实例,内部是event poll,返回对应的句柄epfd
int epoll_create(int size);

// 2.将- -个FD添加到epoll的红黑树中,并设置ep_ .poll callback
// callback触发时,就把对应的FD加入到rdlist这个就绪列表中
int epoll_ctl(
    int epfd, // epoll实例的句柄
    int op, // 要执行的操作,包括: ADD、MOD、DEL
    int fd,//要监听的FD
    struct epoll_event *event //要监听的事件类型:读、写、异常等
);

// 3.检查rdlist列表是否为空,不为空则返回就绪的FD的数量
int epoll wait(
    int epfd,
    // epoll实例的句柄
    struct epoll event *events, //空event数组,用于接收就绪的FD
    int maxevents, // events数组的最大长度
    int timeout // 超时时间,-1用不超时; 0不阻塞;大于0为阻塞时间
)

调用 epoll_wait 函数就去等待,在用户态创建一个空的 events 数组,当就绪之后,我们的回调函数会把数据添加到 list_head 中去,当调用这个函数的时候,会去检查 list_head,当然这个过程需要参考配置的等待时间,可以等一定时间,也可以一直等, 如果在此过程中,检查到了 list_head 中有数据会将数据添加到链表中,此时将数据放入到 events 数组中(内核空态可以操作用户态的 events 数组,在内核态下直接将就绪的事件写入 event 数组,避免就绪事件来回拷贝的开销),并且返回对应的操作的数量,用户态的此时收到响应后,从 events 中拿到对应准备好的数据的节点,再去调用方法去拿数据。

select 模式存在的三个问题

poll 模式的问题

epoll 模式中如何解决这些问题的?

epoll中的ET和LT

当 FD 有数据可读时,我们调用 epoll_wait(或者 select、poll)可以得到通知。但是事件通知的模式有两种。

举个栗子

如果我们采用 LT 模式,因为 FD 中仍有 1kb 数据,则第5️⃣步依然会返回结果,并且得到通知

如果我们采用 ET 模式,因为第3️⃣步已经消费了 FD 可读事件,第5️⃣步 FD 状态没有变化,因此 epoll_wait 不会返回,数据无法读取,客户端响应超时。

如果我们想要使用 ET 模式,在读取方式上做一些改变可以解决它数据无法读取的问题。

LT 模式的缺点

总结

基于epoll的服务器端流程

服务器启动以后,服务端会去调用 epoll_create,创建一个 epoll 实例,epoll 实例中包含两个数据

1️⃣红黑树(为空):rb_root 用来去记录需要被监听的 FD

2️⃣链表(为空):list_head,用来存放已经就绪的 FD

创建好了之后,会去调用 epoll_ctl 函数,此函数会会将需要监听的数据添加到 rb_root 中去,并且对当前这些存在于红黑树的节点设置回调函数,当这些被监听的数据一旦准备完成,就会被调用,而调用的结果就是将红黑树的 fd 添加到 list_head 中去(但是此时并没有完成)

3️⃣当第二步完成后,就会调用 epoll_wait 函数,这个函数会去校验是否有数据准备完毕(因为数据一旦准备就绪,就会被回调函数添加到 list_head 中),在等待了一段时间后(可以进行配置),如果等够了超时时间,则返回没有数据,如果有,则进一步判断当前是什么事件,如果是建立连接时间,则调用 accept() 接受客户端 socket,拿到建立连接的 socket,然后建立起来连接,如果是其他事件,则把数据进行写出

信号驱动

信号驱动 IO 是与内核建立 SIGIO 的信号关联并设置回调,当内核有 FD 就绪时,会发出 SIGIO 信号通知用户,期间用户应用可以执行其它业务,无需阻塞等待。

阶段一:

阶段二:

当有大量 IO 操作时,信号较多,SIGIO 处理函数不能及时处理可能导致信号队列溢出,而且内核空间与用户空间的频繁信号交互性能也较低。

异步IO

这种方式,不仅仅是用户态在试图读取数据后,不阻塞,而且当内核的数据准备完成后,也不会阻塞。

他会由内核将所有数据处理完成后,由内核将数据写入到用户态中,然后才算完成,所以性能极高,不会有任何阻塞,全部都由内核完成,可以看到,异步 IO 模型中,用户进程在两个阶段都是非阻塞状态。

对比

Redis的单线程

Redis到底是单线程还是多线程?

在 Redis 版本迭代过程中,在两个重要的时间节点上引入了多线程的支持:

因此,对于 Redis 的核心网络模型,在 Redis 6.0 之前确实都是单线程。是利用 epoll(Linux 系统)这样的 IO 多路复用技术在事件循环中不断处理客户端情况。

为什么 Redis 要选择单线程?

单线程和多线程网络模型变更

当我们的客户端想要去连接我们服务器,会去先到 IO 多路复用模型去进行排队,会有一个连接应答处理器,他会去接受读请求,然后又把读请求注册到具体模型中去,此时这些建立起来的连接,如果是客户端请求处理器去进行执行命令时,他会去把数据读取出来,然后把数据放入到 client 中, clinet 去解析当前的命令转化为 redis 认识的命令,接下来就开始处理这些命令,从 redis 中的 command 中找到这些命令,然后就真正的去操作对应的数据了,当数据操作完成后,会去找到命令回复处理器,再由他将数据写出。

通信协议

Redis 是一个 CS 架构的软件,通信一般分两步(不包括 pipeline 和 PubSub):

因此客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议。而在 Redis 自定义了一个通信协议称为 RESP(Redis Serialization Protocol)协议:

但目前,默认使用的依然是 RESP2 协议,也是我们要学习的协议版本(以下简称 RESP)。

在 RESP 中,通过首字节的字符来区分不同数据类型,常用的数据类型包括 5 种:

单行字符串:首字节是 ‘+’ ,后面跟上单行字符串,以 CRLF( “\r\n” )结尾。例如返回 “OK”: “+OK\r\n”

错误(Errors):首字节是 ‘-’ ,与单行字符串格式一样,只是字符串是异常信息,例如:”-Error message\r\n”

数值:首字节是 ‘:’ ,后面跟上数字格式的字符串,以 CRLF 结尾。例如:”:10\r\n”

多行字符串:首字节是 ‘$’ ,表示二进制安全的字符串,最大支持 512MB:

如果大小为 0,则代表空字符串:”$0\r\n\r\n”

如果大小为 -1,则代表不存在:”$-1\r\n”

数组:首字节是 ‘*’,后面跟上数组元素个数,再跟上元素,元素数据类型不限:

基于Socket自定义Redis的客户端

Redis 支持 TCP 通信,因此我们可以使用 Socket 来模拟客户端,与 Redis 服务端建立连接。

public class Main {

    static Socket s;
    static PrintWriter writer;
    static BufferedReader reader;

    public static void main(String[] args) {
        try {
            // 1.建立连接
            String host = "192.168.150.101";
            int port = 6379;
            s = new Socket(host, port);
            // 2.获取输出流、输入流
            writer = new PrintWriter(new OutputStreamWriter(s.getOutputStream(), StandardCharsets.UTF_8));
            reader = new BufferedReader(new InputStreamReader(s.getInputStream(), StandardCharsets.UTF_8));

            // 3.发出请求
            // 3.1.获取授权 auth 123321
            sendRequest("auth", "123321");
            Object obj = handleResponse();
            System.out.println("obj = " + obj);

            // 3.2.set name 虎哥
            sendRequest("set", "name", "虎哥");
            // 4.解析响应
            obj = handleResponse();
            System.out.println("obj = " + obj);

            // 3.2.set name 虎哥
            sendRequest("get", "name");
            // 4.解析响应
            obj = handleResponse();
            System.out.println("obj = " + obj);

            // 3.2.set name 虎哥
            sendRequest("mget", "name", "num", "msg");
            // 4.解析响应
            obj = handleResponse();
            System.out.println("obj = " + obj);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 5.释放连接
            try {
                if (reader != null) reader.close();
                if (writer != null) writer.close();
                if (s != null) s.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static Object handleResponse() throws IOException {
        // 读取首字节
        int prefix = reader.read();
        // 判断数据类型标示
        switch (prefix) {
            case '+': // 单行字符串,直接读一行
                return reader.readLine();
            case '-': // 异常,也读一行
                throw new RuntimeException(reader.readLine());
            case ':': // 数字
                return Long.parseLong(reader.readLine());
            case '$': // 多行字符串
                // 先读长度
                int len = Integer.parseInt(reader.readLine());
                if (len == -1) {
                    return null;
                }
                if (len == 0) {
                    return "";
                }
                // 再读数据,读len个字节。我们假设没有特殊字符,所以读一行(简化)
                return reader.readLine();
            case '*':
                return readBulkString();
            default:
                throw new RuntimeException("错误的数据格式!");
        }
    }

    private static Object readBulkString() throws IOException {
        // 获取数组大小
        int len = Integer.parseInt(reader.readLine());
        if (len <= 0) {
            return null;
        }
        // 定义集合,接收多个元素
        List<Object> list = new ArrayList<>(len);
        // 遍历,依次读取每个元素
        for (int i = 0; i < len; i++) {
            list.add(handleResponse());
        }
        return list;
    }

    // set name payphone
    private static void sendRequest(String ... args) {
        writer.println("*" + args.length);
        for (String arg : args) {
            writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }
        writer.flush();
    }
}

内存回收

Redis 内存回收主要有两方面的内容

注意:Redis 的过期策略和内存淘汰策略不是一回事,不要弄混淆了。

必要性

Redis 之所以性能强,最主要的原因就是基于内存存储。然而单节点的 Redis 其内存大小不宜过大,会影响持久化或主从同步性能。我们可以通过修改配置文件来设置 Redis 的最大内存:

# 格式
# maxmemory <bytes>
# 例如
maxmemory 1gb

当内存使用达到上限时,就无法存储更多数据了。为了解决这个问题,Redis 提供了一些策略实现内存回收。在学习 Redis 缓存的我们可以通过 expire 命令给 Redis 的 key 设置 TTL(存活时间)

127.0.0.1:6379> set name jack
OK
127.0.0.1:6379> expire name 5
(integer) 1
127.0.0.1:6379> get name
"jack"
127.0.0.1:6379> get name
(nil)

可以发现,当 key 的 TTL 到期以后,再次访问 name 返回的是 nil,说明这个 key 已经不存在了,对应的内存也得到释放。从而起到内存回收的目的。这里有两个问题需要我们思考:

过期key处理

这里需要先谈一下 Redis 的底层结构。Redis 本身是一个典型的 key-value 内存存储数据库,因此所有的 key、value 都保存在之前学习过的 Dict 结构中。不过在其 database 结构体中,有两个 Dict:一个用来记录 key-value;另一个用来记录 key-TTL。

typedef struct redisDb {
    dict *dict;	/* 存放所有key及value的地方,也被称为keyspace*/
    dict *expires;	/*存放每-个key及其对应的TTL存活时间,只包含设置了TTL的key*/
    dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;	/* Blocked keys that received a PUSH */
    dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
    int id;	/* Database ID,0~15 */
    long long avg_ttl; /* 记录平均TTL时长*/ 
    unsigned long expires_cursor; /* expire检查时在dict中抽样的索引位置. */
    list *defrag_later; /* 等待碎片整理的key列表*/
} redisDb;

下图为 Redis database 结构体的示意图。

知道了 Redis 如何存储时间了,那么 Redis 是如何处理过期的 key 呢?Redis 对过期 key 的处理策略有:惰性删除、周期删除(这种处理策略有两种模式,分别是 SLOW 模式和 FAST 模式)

惰性删除

惰性删除:顾明思议并不是在 TTL 到期后就立刻删除,而是在访问一个 key 的时候,检查该 key 的存活时间,如果已经过期才执行删除。

// 查找一个key执行写操作
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
    //检查key是否过期
    expirelfNeeded(db,key); 
    return lookupKey(db,key.flags);
}

//查找一 个key执行读操作
robj tlookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
    robj *val;
    //检查key是否过期
    if (expireifNeeded(db,key)== 1) {
        // 略...
    }
    return NULL;
}

int expirelfNeeded(redisDb *db, robj *key) {
    //判断是否过期,如果未过期直接结束并返回0
    if (!keylsExpired(db,key)) return 0; 
    // 略...
	//删除过期key
	deleteExpiredKeyAndPropagate(db,key);
    return 1;
}

周期删除

周期删除:顾明思议是通过一个定时任务,周期性的抽样部分过期的 key,然后执行删除。执行周期有两种。

SLOW 模式规则

FAST 模式规则(过期 key 比例小于 10% 不执行 )

从节点过期策略

从节点不会进行过期扫描,从节点对过期的处理是被动的。主节点在 key 到期时,会在 AOF 文件里增加一条 del 指令,同步到所有的从节点,从节点通过执行这条 del 指令来删除过期 key。

因为指令同步是异步进行的,所以如果主节点过期的 key 的 del 指令没有及时同步到从节点,就会出现主从数据不一致,主节点没有的数据在从节点中依旧存在。如果在含有主从集群的 Redis 中设置分布式锁的 id,获取数据时主节点数据过期而从节点没来得及同步就不出现过期的分布式锁。

总结

RedisKey 的 TTL 记录方式:在 RedisDB 中通过一个 Dict 记录每个 Key 的 TTL 时间

过期 key 的删除策略

定期清理的两种模式

内存淘汰策略

内存淘汰:就是当 Redis 内存使用达到设置的上限时,主动挑选部分 key 删除以释放更多内存的流程。执行 Redis 命令之前会检查一下内存够不够,不够则进行清理。Redis 会在处理客户端命令的方法 processCommand() 中尝试做内存淘汰:

int processCommand(client *c) {
    //如果服务器设置了server.maxmemory属性,并且并未有执行lua脚本
    if (server.maxmemory && !server.lua_timedout) {
        //尝试进行内存淘汰performEvictions
        int out of memory = (performEvictions() == EVICT FAIL);
        // ..
        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(C, shared.oomerr);
            return C_OK;
        }
        // ....
    }
}

Redis 支持 8 种不同策略来选择要删除的 key

比较容易混淆的有两个

要想使用 LRU 和 LFU 算法进行 key 的淘汰,我们需要得到 key 最后一次访问的时间/访问的频率,这又从何得知呢?这就需要我们去了解 Redis 如何去统计 key 最近访问的时间和频率了。这涉及到了 Redis 的 RedisObject 结构。Redis 的数据都会被封装为 RedisObject 结构。

typedef struct redisObject {
    unsigned type:4; // 对象类型
    unsigned encoding:4; // 编码方式
    unsigned lru:LRU_BITS; // LRU:以秒为单位记录最近一次访问时间,长度24bit
    						//LFU:高16位以分钟为单位记录最近一次访问时间,低8位记录逻辑访问次数
    int refcount;
    //引用计数,计数为0则可以回收
    void *ptr; 
    //数据指针,指向实数据
} robj;

LFU 的访问次数之所以叫做逻辑访问次数,是因为并不是每次 key 被访问都计数,而是通过运算

大对象的删除

del 指令会直接释放对象的内存,大部分情况下这个命令非常快,但是如果被删除的是一个很大的对象,如包含了上千万个元素的 hash,那么这个删除操作会比较慢,导致 Redis 卡顿,降低 QPS。这时可以使用 unlink key 指令来删除大 key。

unlink 指令会切断 key 和 Redis 数据库的关系,然后开启一个后台线程异步回收内存。