[关闭]
@dungan 2020-08-11T06:55:13.000000Z 字数 21458 阅读 175

Redis

Redis 进阶

安全

任何未授权的用户访问 redis 都应该被阻止,因此如果想提高 redis 的安全性就需要做一些如下安全措施。

不要暴露 redis 的公网 ip

bind 127.0.0.1

任何连接都需要密码认证

requirepass yourpassword

禁用远程客户端使用一些高危命令

  1. rename-command FLUSHALL ""
  2. rename-command CONFIG ""
  3. rename-command EVAL ""

不要以 root 权限来运行 redis

为 Redis 服务创建单独的用户和家目录,并且配置禁止登陆(nologin shell)。

redis-cli

redis-cli 有两种模式

  • 命令行参数模式:命令作为 redis-cli 的参数发送,执行并打印在标准输出上。
  • 交互模式:在命令行界面中用户输入命令并获得回复。

redis-cli 搭配一些选项能够让我们更有效的使用redis,下面对这些功能做一些介绍 !

命令行参数模式

连接远程 redis 服务

  1. #连接远程服务
  2. redis-cli -h remote_host -p port -a password
  3. #关闭远程服务
  4. redis-cli -h remote_host -p port shutdown

检测服务是否运行

  1. redis-cli -a passwd ping

输出重定向

  1. redis-cli MONITOR >> /tmp/command.txt

-x 读取标准输入作为key的值

  1. # stdin作为最后一个参数, 这种方式需要 -x 选项
  2. $ redis-cli -x set num < /tmp/num.txt
  3. $ redis-cli get num
  4. "100\n"

从文件中批量插入数据

  1. # 第一种:通过管道将包含 redis 命令的文件传给 redis-cli
  2. $ cat /tmp/commands.txt
  3. set foo 100
  4. incr foo
  5. append foo xxx
  6. get foo
  7. $ cat /tmp/commands.txt | redis-cli
  8. OK
  9. (integer) 101
  10. (integer) 6
  11. "101xxx"
  12. # 第二种:pipe mode 模式, --pipe 参数来启用pipe协议,
  13. # 它不仅仅能减少返回结果的输出,还能更快的执行指令
  14. server> cat d1.txt | redis-cli --pipe
  15. All data transferred. Waiting for the last reply...
  16. Last reply received from server.
  17. errors: 0, replies: 7
  18. #甚至还可以使用 --pipe 导入到远程主机
  19. server> cat d1.txt | redis-cli -p 6380 -h 192.168.1.166 --pipe
  20. All data transferred. Waiting for the last reply...
  21. Last reply received from server.
  22. errors: 0, replies: 7

读取文件时有可能由于换行符导致失败,如果出错,可以使用 unix2dos commands.txt 转码下。

选择某个数据库

  1. # -n 1 代表修改 index=1 的数据库中的数据
  2. redis-cli -n 1 set name tcl

重复执行命令

  1. # -r 表示重复的次数,-i 表示延迟的时间(seconds),-1(数字1)表示一直重复
  2. [root@VM_0_17_centos redis]# redis-cli -r 5 -i 1 incr foo
  3. ...
  4. (integer) 9
  5. (integer) 10
  6. [root@VM_0_17_centos redis]# redis-cli -r -1 -i 1 incr foo
  7. (integer) 11
  8. (integer) 12
  9. (integer) 13
  10. ...

导出为CSV格式

  1. # --csv 只能导出单个key的数据,不能导出整个数据库
  2. [root@VM_0_17_centos redis]# redis-cli lpush mylist a b c d
  3. (integer) 4
  4. [root@VM_0_17_centos redis]# redis-cli --csv lrange mylist 0 -1
  5. "d","c","b","a"

--eval 执行 lua 脚本

  1. redis-cli --eval script.lua key1 key2,arg1 arg2 arg3

交互模式

使用 connect 命令连接到其他实例

  1. 127.0.0.1:6379> connect host port

重复命令

  1. 127.0.0.1:6379> 5 incr num

查看命令帮助

  1. # help @<category> 显示给定类别的所有命令,help <command> 显示某个命令的帮助
  2. 127.0.0.1:6379> help @list
  3. ...
  4. LREM key count value
  5. summary: Remove elements from a list
  6. since: 1.0.0
  7. LTRIM key start stop
  8. summary: Trim a list to the specified range
  9. since: 1.0.0
  10. ...
  11. 127.0.0.1:6379> help LTRIM
  12. LTRIM key start stop
  13. summary: Trim a list to the specified range
  14. since: 1.0.0
  15. group: list

特殊选项模式
--stat 实时监控Redis实例

  1. # --stat 可以让我们轻松了解内存使用情况,连接的客户端等情况, 此外 -i 可以控制输出的间隔秒数
  2. [root@VM_0_17_centos redis]# redis-cli --stat -i 2
  3. ------- data ------ --------------------- load -------------------- - child -
  4. keys mem clients blocked requests connections
  5. 2651309 214.86M 1 0 7704088 (+0) 26
  6. 2651309 214.86M 1 0 7704089 (+1) 26
  7. 2651309 214.86M 1 0 7704090 (+1) 26
  8. ...

--bigkeys 扫描大键

  1. [root@VM_0_17_centos redis]# redis-cli --bigkeys
  2. # Scanning the entire keyspace to find biggest keys as well as
  3. # average sizes per key type. You can use -i 0.1 to sleep 0.1 sec
  4. # per 100 SCAN commands (not usually needed).
  5. [00.00%] Biggest string found so far 'lru:1588927' with 5 bytes
  6. [19.23%] Biggest hash found so far 'user' with 2 fields
  7. [37.72%] Sampled 1000000 keys so far
  8. [75.43%] Sampled 2000000 keys so far
  9. [93.35%] Biggest list found so far 'mylist' with 4 items
  10. -------- summary -------
  11. Sampled 2651309 keys in the keyspace!
  12. Total key length in bytes is 28105191 (avg len 10.60)
  13. Biggest string found 'lru:1588927' has 5 bytes
  14. Biggest list found 'mylist' has 4 items
  15. Biggest hash found 'user' has 2 fields
  16. 2651307 strings with 13256521 bytes (100.00% of keys, avg size 5.00)
  17. 1 lists with 4 items (00.00% of keys, avg size 4.00)
  18. 0 sets with 0 members (00.00% of keys, avg size 0.00)
  19. 1 hashs with 2 fields (00.00% of keys, avg size 2.00)
  20. 0 zsets with 0 members (00.00% of keys, avg size 0.00)
  21. 0 streams with 0 entries (00.00% of keys, avg size 0.00)

打印结构有两部分,上面部分展示了具体的 key 的信息。summary 部分清晰的展示了不同数据结构 size 所占百分比,以及该数据结构其含有几个成员等信息!

--scan 获取所有的 key

  1. [root@VM_0_17_centos redis]# redis-cli --scan
  2. user
  3. list
  4. arr
  5. name
  6. # 还可以像 keys 一样使用模糊匹配功能
  7. [root@VM_0_17_centos redis]# redis-cli --scan --pattern "*r"
  8. user
  9. arr

--scan 和 --bigkeys 底层使用的是 scan 命令,因此不会像 KEYS * 一样阻塞服务器 !

监控Redis实例的延迟

监控redis延迟的主要有两个选项,--latency 和 --latency-history。

  • --latency : 每秒发送 100 次 ping 命令到 redis 实例,实时展示数据在终端,数据中的时间单位是毫秒。
  • --latency-history : 统计一段时间内的延迟,与--latency类似,但每隔15秒才刷新一下终端数据,当然你可以使用选项 -i 控制间隔几秒。
  1. [root@VM_0_17_centos redis]# redis-cli --latency
  2. min: 0, max: 1, avg: 0.11 (360 samples)
  3. [root@VM_0_17_centos redis]# redis-cli --latency-history -i 5
  4. min: 0, max: 1, avg: 0.13 (486 samples) -- 5.01 seconds range
  5. min: 0, max: 1, avg: 0.11 (488 samples) -- 5.00 seconds range
  6. min: 0, max: 1, avg: 0.13 (489 samples) -- 5.01 seconds range
  7. min: 0, max: 1, avg: 0.12 (489 samples) -- 5.00 seconds range
  8. ...

--rdb 获取指定redis实例的rdb文件,保存到本地

  1. [root@VM_0_17_centos redis]# redis-cli --rdb /tmp/dump.rdb
  2. SYNC sent to master, writing 280 bytes to '/tmp/dump.rdb'
  3. Transfer finished with success.

注意:在脚本或cron任务中使用此选项时,请确保检查命令的返回值,如果它不为零,则意味出错了。

发布订阅

发布订阅(pub/sub)是一种消息通信模式,主要的目的是解耦消息发布者和消息订阅者之间的
耦合。

redis 中的订阅者可以通过 subscribe/psubscribe 订阅一个或多个频道,发布者可以通过 publish 往这个频道上发送消息。

命令 描述
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道
PUBLISH channel message 将信息发送到指定的频道
PUNSUBSCRIBE [pattern [pattern ...]] 退订所有给定模式的频道
SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息
UNSUBSCRIBE [channel [channel ...]] 退订给定的频道
PUBSUB subcommand [argument [argument ...]] 查看订阅与发布系统状态

这里开三个会话窗口,我们做个示例

  1. # client1 订阅 tv1
  2. 127.0.0.1:6379> SUBSCRIBE tv1
  3. Reading messages... (press Ctrl-C to quit)
  4. 1) "subscribe"
  5. 2) "tv1"
  6. 3) (integer) 1
  1. # client2 订阅 tv1 和 tv2
  2. 127.0.0.1:6379> SUBSCRIBE tv1 tv2
  3. Reading messages... (press Ctrl-C to quit)
  4. 1) "subscribe"
  5. 2) "tv1"
  6. 3) (integer) 1
  7. 1) "subscribe"
  8. 2) "tv2"
  9. 3) (integer) 2

这样 client1 和 client2 就成功订阅了,紧接着在会话3中向这个两个频道发送消息

  1. # client3 向 tv1 和 tv2 这个频道发送消息
  2. 127.0.0.1:6379> PUBLISH tv1 "from tv1"
  3. (integer) 2
  4. 127.0.0.1:6379> PUBLISH tv2 "from tv2"
  5. (integer) 1
  6. 127.0.0.1:6379>

消息发送成功后,你会发现client1 和 cleint2 成功收到了订阅消息

  1. # client1
  2. 127.0.0.1:6379> SUBSCRIBE tv1
  3. Reading messages... (press Ctrl-C to quit)
  4. 1) "subscribe"
  5. 2) "tv1"
  6. 3) (integer) 1
  7. 1) "message"
  8. 2) "tv1"
  9. 3) "from tv1"
  10. # client2
  11. 127.0.0.1:6379> SUBSCRIBE tv1 tv2
  12. Reading messages... (press Ctrl-C to quit)
  13. 1) "subscribe"
  14. 2) "tv1"
  15. 3) (integer) 1
  16. 1) "subscribe"
  17. 2) "tv2"
  18. 3) (integer) 2
  19. 1) "message"
  20. 2) "tv1"
  21. 3) "from tv1"
  22. 1) "message"
  23. 2) "tv2"
  24. 3) "from tv2"

持久化

虽然 redis 是一个内存数据库,但不同于 memcache 的是,redis 能够定时将内存数据写入到磁盘,这称之为 持久化

redis 支持两种持久化方案

RDB : 将某一时刻的所有数据写入硬盘里面,RDB是一个紧凑的单一文件,方便传输,非常适用于灾难恢复,在恢复大数据集时要比 AOF 快。
但是由于保存的是快照数据,因此其备份的数据有可能不完整,数据量大时 fork 出的子进程导致 cpu 使用率升高,有可能导致停顿。

AOF: 记录每次对服务器的写命令,备份的数据要比 RDB 完整, 重启的时候会优先载入AOF文件来恢复原始的数据。
但是 AOF 文件的体积通常要大于 RDB 文件的体积,并且根据所使用的 fsync 策略,AOF 的速度可能会慢于 RDB。

RDB

RDB 文件的备份的选项是 save,save 用来实现满足 N 秒内数据集至少有 M 个改动 才写入 RDB 文件。
你还可以通过调用 SAVE 或者 BGSAVE 命令,手动让 Redis 进行数据集保存操作。

save 命令会阻塞所有 redis 客户端直到快照生成完毕,save 命令不常用,通常只会在对于持久化耗费时长无所谓的情况下才使用 save。

bgsave 命令对 RDB 文件创建是通过 fork 出的子进程来实现,当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件。

配置文件中的 save 选项一旦满足条件会自动触发一次 bgsave;当使用 sync 命令在两个 redis 服务器间复制数据时,如果主服务器近期没有进行过 bgsave,那么主服务器就会执行一次 bgsave。

由于 redis 会自动触发 bgsave,而在数据量特别大,内存又不足时,子进程有可能导致系统长时间的停顿,如果出现这种情况可以考虑关闭自动备份,改用手动调用 save 或 bgsave 命令,这样就能人为的控制多久记录一次快照数据。

  1. save 900 1 # 900 秒内有一个操作
  2. save 300 10 # 300 秒内有10个操作
  3. save 60 10000

AOF

AOF 不同于 RDB 的在于它不会像 RDB 一样会有丢失大量数据的情况,会将被执行的写命令追加到 AOF 文件末尾,redis 只要重新执行一遍 AOF 中的所有写命令就可以恢复所有数据集。

可以在配置文件中开启 AOF :

appendonly yes

选项 appendfsync 选项提供备份到 AOF 文件功能,该选项有三种备份策略:

  • no : 不对 AOF 文件进行任何显式的同步操作,而是交由操作系统来决定在何时对 AOF 文件进行同步,性能最好,持久化没保证。
  • always : 每次有新命令追加到 AOF 文件时就执行一次 fsync,非常慢,但是保证完全的持久化,这种策略会对硬盘进行大量写入。
  • everysec : 每秒 fsync 一次,足够快(和使用 RDB 持久化差不多),即使有故障,最多也就丢失 1 秒钟的数据,推荐这种方式,可以兼顾速度和安全性。
AOF 文件压缩/重写

由于 AOF 文件保存了所有的写命令,这就是导致 AOF 文件体积很大,里面可能会存在一些重复的命令,例如你 set name tcl 了10次这样的冗余操作,而使用 BGREWRITEAOF 重构 AOF 文件后就会去除重复的命令,这样就只需要的最少命令就能重建当前数据集。

因此 redis 提供了对 AOF 文件压缩的解决方案,通过命令 BGREWRITEAOF,用来压缩 AOF 文件,同时在配置文件也提供了两个选项 auto-aof-rewrite-percentage(本次的文件体积是上次文件体积的多少百分比后压缩文件) 和 auto-aof-rewrite-min-size(文件体积达到多少后压缩文件) 来自动执行 BGREWRITEAOF。

确保内存中的数据已被保存到硬盘

info 命令的输出结果中的 aof_pending_bio_fsync 属性如果为 0,就表示服务器已经将所有数据保存到硬盘里面了。

定时备份 RDB 和 AOF

对于RDB 和 AOF 可以设定crontab进行定时备份: cp /usr/local/redis/var/dump.rdb /dump/redis/dump.$(date +%Y%m%d%H%M).rdb

数据恢复

在 redis 实例重启时,先使用 RDB 文件重新构建内存,再使用 AOF 重放近期的操作指令来实现完整恢复重启之前的状态。

复制

redis 的复制和关系型数据的复制一样,从主服务器(master) 向多个从服务器(slave) 发送数据副本,进而实现合理的分配流量,提升系统性能,同时也保护了数据的安全,而这里的这里数据副本就是持久化生成的 RDB 或 AOF 文件。

所以整个复制的核心就在于,服务器间是如果发送 RDB 和 AOF 文件的。

redis 的复制过程

当从服务器通过 slaveof 连接主服务器的时候,会触发主服务器执行 bgsave,这样主服务器就生成了 RDB 快照文件并发送至从服务器。

redis 的主从关系设置非常简单,你只需要配置 SLAVEOF host port 就可以将一个 redis 服务器变为某个 host 的从服务器。

大致步骤如下:

主服务器 从服务器
1) 等待命令进入
2) 连接主服务器,发送 sync 命令
3) 开始执行bgsave,并在缓冲区记录 bgsave 之后执行的所有写命令
4) 根据配置选项决定给客户端返回现有的数据,还是直接返回错误
5) bgsave 执行完毕后,向从服务器发送快照文件,并在发送期间继续使用缓冲区记录收到的写命令
6) 删除旧数据,替换成主服务器发来的快照数据
7) 快照文件发送完成后,开始向从服务器发送存储在缓冲区的写命令
8) 完成快照文件的解析并恢复数据,像平常一样开始接命令请求
9) 缓冲区存储的命令发送完毕后,从现在开始,每执行一个写命令,就向从服务器发送相同的写命令
10) 执行主服务器发送来的所有缓冲区的写命令,并从现在开始,接收并执行主服务器传来的每个写命令

实际使用过程中,最好让主服务器只使用 50%-65% 的内存,留下 30%-45% 的内存以便于创建的子进程执行 bgsave和缓冲区命令写入。

确保从服务器真正收到了主服务器的数据

为了检验数据是否成功发送,只需要在在主服务器写入一些测试数据,如果从服务器能读取到这些数据,说明整个复制过程式成功的。

需要注意的是不能设置两个 redis 服务器互为主从关系

互为主从关系的两个 redis 实例只会持续占用大量资源并不断尝试与对方通信,这就很有可能导致客户端请求会得到不一致的数据。

尽管两个 redis 服务器不能互为主从,但是从服务器可以拥有自己的从服务器,这样就能构造成一个链条,称之为 主从链

那为什么从服务器要拥有自己的从服务器呢?

这是因为随着负载不断上升,主服务器可能无法快速的将数据同步到所有从服务器,因此大家就想了个办法,主服务器只把数据同步给它下面的某几台从服务器作为中转,再让这几台从服务器将数据同步到各自下面的从服务器,这样沿着整个主从链就将副本同步给了所有从服务器。

由于这几台中转服务器扮演了主服务器的角色,因此为了成功转发 master 发来的缓冲区的写命令,需要设置一下: appendonly yesappendfsync everysec

故障处理

复制的整个过程我们现在很清楚了,但如果主服务器出现故障奔溃了怎么办?RDB 或 AOF 文件有损毁怎么办? 这就涉及到了主服务器的故障处理。

对于检验和修复损坏的 RDB 和 AOF 文件,redis 提供了两个工具

redis-check-aof [--fix] : 使用该工具时如果设了 --fix 参数,它会扫描指定的 AOF 文件,当发现第一个错误命令的时候,会删除该错误命令之后的所有命令。
redis-check-dump : 该工具并不能像 redis-check-aof 一样修复出错的快照文件(因为快照保存的是数据而非命令),只能对快照文件做一些hash校验来判断其是否完整。

对于主服务器出现故障的情况,只需要替换故障主服务器就行了

例如有三台服务器 A(master),B(slave),C(slave),A 出现故障后,首先向 B 发送一个 SAVE 命令,让 B 创建快照文件,接着将快照文件发送给 C,然后让 B 作为 C 的 从服务器就行了,这本质上是高可用的一种解决方案。

这里以 redis 为例,说明一组概念:

读请求发给从服务器,写请求发给主服务器叫做 读写分离

在主从架构的前提下,主服务器挂了以后自动将从服务器的角色升级为主服务器叫做 高可用

在多个从服务器间合理分发请求的行为叫做 负载均衡。当然为了稳定可以将每个节点进行高可用部署,即包含成对的主从关系。

事务

Redis 提供的事务和关系数据库的事务不可同日而语,其并不支持完整的 ACID 事务。

由于 redis 是单线程的,能保证一个 client 发起的事务中的命令可以连续的执行, 而中间不会插入其他 client 的命令,也就是说在执行完事务内所有命令前是不可能再去同时执行其他 client 的请求的。

需要的注意的是:

提交事务之前,所有的命令都会先放到一个队列中,当执行 exec 后,才会顺序执行队列中的所有命令,并将所有命令的结果一起返回给客户端。

并且 redis 的事务是非原子的:

redis 只能保证一个事务中的命令连续执行,但是并不能保证事务的一致性,redis 的事务是非原子的,也就是说是如果事务中的一个命令失败了,并不回滚其他命令,这点和 mysql 事务不一样。

redis 事务相关的命令

  • MULTI : 开始一个事务。
  • EXEC : 提交事务。
  • DISCARD: 取消事务。
  • WATCH key [key ...] : 监视一个或多个 key,当 exec 时候如果监视的 key 从调用 watch 后发生过变化 ,则整个事务会失败。
  • UNWATCH : 取消 WATCH 命令对所有 key 的监视; exec,discard,unwatch 都会yichu移除对 key 的监视。

watch 本质上是为了防止数据被篡改而提供的一种乐观锁机制,事务只能在所有被监视键都没有被修改的前提下执行, 如果这个前提不能满足的话,事务就不会被执行。

示例1:在提交事务之前,所有的命令会被放到队列中

  1. 127.0.0.1:6379> set name tcl
  2. OK
  3. 127.0.0.1:6379> MULTI
  4. OK
  5. 127.0.0.1:6379> SET name tcl_m1
  6. QUEUED
  7. 127.0.0.1:6379> SET name tcl_m2
  8. QUEUED
  9. 127.0.0.1:6379> EXEC
  10. 1) OK
  11. 2) OK
  12. 127.0.0.1:6379> GET name
  13. "tcl_m2"
  14. 127.0.0.1:6379>

实例二:取消事务

  1. 127.0.0.1:6379> GET age
  2. "20"
  3. 127.0.0.1:6379> MULTI
  4. OK
  5. 127.0.0.1:6379> SET age 21
  6. QUEUED
  7. 127.0.0.1:6379> SET age 22
  8. QUEUED
  9. 127.0.0.1:6379> DISCARD
  10. OK
  11. 127.0.0.1:6379> GET age
  12. "20"
  13. 127.0.0.1:6379>

示例三:非原子的事务

  1. 127.0.0.1:6379> MGET name age
  2. 1) "tcl_m2"
  3. 2) "20"
  4. 127.0.0.1:6379> MULTI
  5. OK
  6. 127.0.0.1:6379> INCR name
  7. QUEUED
  8. 127.0.0.1:6379> INCR age
  9. QUEUED
  10. 127.0.0.1:6379> EXEC
  11. 1) (error) ERR value is not an integer or out of range
  12. 2) (integer) 21
  13. 127.0.0.1:6379> MGET name age
  14. 1) "tcl_m2"
  15. 2) "21"
  16. 127.0.0.1:6379>

可以看到即使第一条命令失败了,但第二条命令依然执行成功,说明 redis 的事务是非原子的。

所以最好不要使用 redis 提供的事务来保证数据的一致性,你应该做的是在代码逻辑层面控制数据的一致性。

示例四:watch 提供了乐观锁的机制,这里我们开两个回话窗口。

  1. # 会话 1
  2. 127.0.0.1:6379> GET age
  3. "21"
  4. 127.0.0.1:6379> WATCH age
  5. OK
  6. 127.0.0.1:6379> MULTI
  7. OK
  8. 127.0.0.1:6379>

当我们在会话1中监视了 age 后,我们紧接着在会话2中修改 age

  1. # 会话 2
  2. 127.0.0.1:6379> SET age 22
  3. OK
  4. 127.0.0.1:6379>

紧接着回到会话1提交未完成的事务

  1. 127.0.0.1:6379> SET age 23
  2. QUEUED
  3. 127.0.0.1:6379> EXEC
  4. (nil)
  5. 127.0.0.1:6379> GET age
  6. "22"
  7. 127.0.0.1:6379>

可以看到当在会话1中watch了 age 施加了乐观锁后,紧接着在会话2中修改了 age,再在会话1中修改 age 发现返回了 nil,这是因为加锁期间会话2篡改了 age,导致整个事务失败了。

Redis 事务的意义

因为 redis 事务中的命令的是通过管道的方式提交的,而管道能极大的提升命令的执行性能,当然 multi 和 exec 也会消耗资源并会导致其他命令被延迟执行,我们称其为事务型管道。

管道 pipeline

redis 和 http 服务器一样都是一种请求-应答的模式,客户端发送一条命令,服务端返回一条命令结果,整个过程伴随着 tcp 报文的传输和网络延迟等,如果命令越多,则耗时越多。

因此 redis 提供了管道用来批量发送命令到服务端,这样就能明显的缩短客户端-服务端的通信往返时间,毕竟一次发十条命令和发十次命令还是有区别的

需要注意到是 :

用管道方式打包命令发送,会导致 redis 必须在处理完所有命令前先缓存起所有命令的处理结果。
因此打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。

分布式锁

锁是什么?

琐是为了解决多端对同一资源争抢进而篡改数据而设计的,常表现于某一个进程对某数据加锁,然后修改数据,最后释放琐,redis 中可以使用 watch 命令来对数据行施加乐观锁。

redis 的分布式锁又是什么?

上文的讲到的锁,一般用于同一机器上的多个线程或多个进行,而分布式锁则是由不同的机器上的 redis 客户端进行加锁和释放的,通过 setnx 命令可以实现分布式锁。

分布式锁实现

分布式锁的概念现在清楚了,但是对于并发场景下,如果不能正确使用锁则会引起一些问题,例如:

  • 持有锁的进程因为操作时间太长导致锁被释放,但进程本身并不知道。
  • 持有锁的进程奔溃导致锁无法释放。
  • 当一个进程持有的锁过期后,导致多个进程同时获取了锁,而每个进程都以为自己是唯一获取锁的进程。

接下来带着这些问题,redis 来学习下分布式锁的实现原理。

加锁

我们知道 setnx 只会在 key 不存在的情况下设置成功,一旦这个 key 设置成功,也就说明加锁成功。如果获取锁失败,则会在指定的时间内重试,直至达到超时间。

  1. /**
  2. * @param $redis
  3. * @param $lock_name 锁的名称
  4. * @param int $timeout 尝试加锁的超时时间
  5. * @return bool|string
  6. */
  7. function set_lock($redis, $lock_name, $timeout=10)
  8. {
  9. $lock_value = uniqid();
  10. $end = time() + $timeout;
  11. while (time() < $end)
  12. {
  13. if($redis->setnx("lock:{$lock_name}", $lock_value))
  14. {
  15. return $lock_value;
  16. }
  17. sleep(0.001);
  18. }
  19. return false;
  20. }

可以看到加锁的原理其实就是弄一个之前不存在的标志位,来标识数据行正在被某个客户端操作。

释放琐

使用 watch 命令确保代表锁的键没有被修改过,接着检查当前值是否和加锁时的值相同,如果相同则删除该键,实现锁的释放。

  1. /**
  2. * @param $redis
  3. * @param $lock_name 锁的名称
  4. * @param $lock_value 锁的值
  5. * @return bool
  6. */
  7. function delete_lock($redis, $lock_name, $lock_value)
  8. {
  9. $lock_name = "lock:{$lock_name}";
  10. while (true)
  11. {
  12. $redis->watch($lock_name);
  13. if($redis->get($lock_name) != $lock_value)
  14. {
  15. return false;
  16. }
  17. $redis->multi();
  18. $redis->delete($lock_name);
  19. if($redis->exec() == false)
  20. {
  21. return false;
  22. }
  23. $redis->unwatch();
  24. }
  25. return true;
  26. }

带有超时时间的锁

由于加了琐的客户端有可能出现崩溃导致其持有的锁无法释放,因此我们就需要给锁设置一个过期时间,这样即使崩溃了,其他客户端就可以尝试获取已经被释放的锁。

  1. /**
  2. * @param $redis
  3. * @param $lock_name
  4. * @param $timeout 尝试加锁的超时时间
  5. * @param $expire 锁的超时时间
  6. * @return bool|string
  7. */
  8. function set_expire_lock($redis, $lock_name, $timeout, $expire)
  9. {
  10. $end = time() + $timeout;
  11. $lock_name = "lock:{$lock_name}";
  12. $lock_value = uniqid();
  13. while (time() < $end)
  14. {
  15. if($redis.setnx($lock_name, $lock_value))
  16. {
  17. $redis->expire($lock_name, $expire);
  18. return $lock_value;
  19. }
  20. // 客户端会在获取锁失败后,会检查锁的超时时间,并为未设置超时时间锁设置超时时间
  21. if(! $redis->ttf($lock_name))
  22. {
  23. $redis->expire($lock_name, $expire);
  24. return $redis->get($lock_name);
  25. }
  26. sleep(0.001);
  27. }
  28. return false;
  29. }

队列

队列从功能上大致可以划分为两种,用于执行耗时作业的任务队列用于应用程序间通信的消息队列,任务队列重在执行(例如 linux crontab),消息队列重在消息传递(例如 pub/sub)。

多说一句 任务队列消息队列RPC 这三者的区别 ,任务队列是逻辑模型,消息队列是通信模型,RPC是包含了通信模型的框架。

任务队列

任务队列可以分为两种:

  • 根据任务被插入队列的顺序来执行的顺序队列
    • 顺序队列又可以分 先进先出队列(FIFO)先进后出队列(LIFO)多任务队列,以及优先级队列(priority)。
  • 在某个特定时间执行的延迟队列

先进先出队列和先进后出队列

借助 redis 的 list 数据结构提供的api可以很方便的实现这两种队列,这里不再赘述。

多任务队列和优先级队列

多任务队列指的是一个队列能够处理多种不同类型的任务,通过为队列中的元素注册回调函数来实现多任务,元素的格式为 json 编码过的数组 [function_name, [arg1, arg2, arg3...]]。

优先级队列就更简单,因为 blpop,brpop 函数有个很重要的特性就是依次弹出各个列表的第一个元素,根据这个特性只要调整各个列表的顺序就可以实现一个优先级队列。

  1. /**
  2. * 注册元素和回调
  3. * @param $redis
  4. * @param $queue
  5. * @param $callback
  6. * @param $args
  7. * @return mixed
  8. */
  9. function set_multi_task($redis, $queue, $callback, $args)
  10. {
  11. return $redis->lpush("queue:{$queue}", json_encode([$callback, $args]));
  12. }
  13. /**
  14. * @param $redis
  15. * @param array $queues
  16. * @return bool
  17. */
  18. function run_multi_task($redis, Array $queues, $sleep=0)
  19. {
  20. while (true)
  21. {
  22. // 修改 $queues 中的队列顺序就可以调整优先级
  23. $item = $redis->blpop($queues, 30);
  24. if(! $item)
  25. {
  26. return true;
  27. }
  28. $item = json_decode($item, true);
  29. $callback = $item[1][0];
  30. $args = $item[1][1];
  31. // 通过回调实现了多任务
  32. $callback($args);
  33. sleep($sleep)
  34. }
  35. }

安全高可用的队列

上面介绍的任务队列都是基于 list 实现,但有个问题是,如果列表中你弹出一个元素但在接下来的逻辑中你未成功处理该元素,这就会导致数据不一致的问题,因为 redis 并没有像 mysql 一样提供那种保证数据一致性事务处理。

因此就需要实现一个安全高可用的队列,能够保证一旦出错能够回滚。

那如何实现一个安全高可用的队列?

可以将 blpop/brpop 改用 brpoplpush/rpoplpush 来取走数据,并同时将数据放进自己的队列中,然后在真正处理完之后再从列表中删除(使用LREM),这样如果发生了程序崩溃之类的情况,可以将子队列中尚未移除的元素重新放回原始队列中重新处理。

延迟队列

redis 中通过有序集合来实现延迟队列,将待执行任务添加到有序集合中,有序集合中中存储的每个延迟任务的格式为 [处理任务的回调函数, 回调参数, 队列的名字 用于加锁的唯一标识符] ,并将任务的执行时间设置为分值,之后从这个有序集合里面移除到期的任务,并将它添加到适当的任务队列中去。并使用上面介绍的分布式锁来保证从延迟队列移到任务队列时的安全性

  1. /**
  2. * 设置延迟任务
  3. * @param $redis
  4. * @param $queue
  5. * @param $callback
  6. * @param $args
  7. * @param int $delay
  8. * @return bool
  9. */
  10. function set_delay_task($redis, $queue, $callback, $args, $delay=0)
  11. {
  12. $identifier = uniqid();
  13. $item = json_encode([$callback, $args, $queue, $identifier]);
  14. // 如果 $delay 为0,说明该任务不需要延迟而是立马执行,因此将其推入到一个立即执行的队列中去
  15. if($delay > 0)
  16. {
  17. $redis->zadd('delay', $item, time() + $delay);
  18. }
  19. else
  20. {
  21. $redis->rpush("queue:{$queue}", $item);
  22. }
  23. return true;
  24. }
  25. /**
  26. * 执行延迟任务
  27. * @param $redis
  28. * @param null $delay_queue
  29. * @return mixed
  30. */
  31. function run_delay_task($redis, $delay_queue=null $sleep=3)
  32. {
  33. while (true)
  34. {
  35. $element = $redis->zrange($delay_queue, 0, 0, true);
  36. if(empty($element) || $element[0][1] > time())
  37. {
  38. // 如果队列中没有任何任务或者任务执行时间还未到,则在短暂等待后重试
  39. sleep($sleep);
  40. continue;
  41. }
  42. $item = json_decode($element[0][0], true);
  43. $callback = $item[0];
  44. $args = $item[1];
  45. $queue = $item[2];
  46. $lock_name = $item[3];
  47. //加锁
  48. $locked = set_lock($redis, $lock_name);
  49. if(! $locked)
  50. {
  51. continue;
  52. }
  53. if($redis->zrem($delay_queue, $item))
  54. {
  55. set_delay_task($redis, $queue, $callback, $args);
  56. }
  57. //释放琐
  58. return delete_lock($redis, $lock_name, $locked);
  59. }
  60. }

消息队列

消息队列用在多个应用程序间通信,通信的方式有消息推送消息拉取

redis 中的消息推送是通过发布/订阅实现的,但发布订阅有个问题是由于客户端短线或者服务器宕机会导致消息丢失,因此使用 pub/sub 构造的消息队列是不安全的,因此才使用消息拉取。

消息拉取是为了保证消息的可靠性,消息拉取的本质是构造一个消息列表,客户端通过发送请求来获取最新消息,发送者和接收者使用套接字来通信。

消息拉取

接下来实现这么一个需求 :

有一个聊天微信群,即使用户退出登录后再次进入也一样能收到他的未读消息。

分析一下这个需求,由于群里每个人的读的消息是不一样,为了确保用户正确的读到自己的未读消息,因此需要做到:

  • 一个类型为 zset 的聊天群(chat:{id}),记录记录每个人目前未读的最小消息id,用户名作为元素,消息 id 作为分值;由于群消息必须有id,因此群消息还应该有个自增计数器。
  • 一个类型为 zset 的用户已读消息记录(usermsg:{chatid}),记录用户在每个聊天群里的读到的最大消息,元素为群id,已读消息数作为分数。
  • 一个类型为 zset 的有序集合用来记录聊天内容(chatmsg:{chatid}),元素为消息内容,分值为该消息在群里的自增id。
  • 用户还可以自由的加入或离开群组。

需求捋清楚以后,我们就可以设计出数据结构了

  1. -----聊天群的用户未读记录 key=chat:{id}--
  2. ---------------zset----------------------
  3. bob | 5
  4. james | 6
  5. ----------------------------------------
  6. -----聊天群里的消息内容 key=chatmsg:{chatid}---
  7. -------------zset-----------------------------
  8. bob | 5
  9. james | 6
  10. ----------------------------------------------
  11. ---bob 的消息记录 key=usermsg:{name}----
  12. ---------------zset--------------------
  13. chat1 | 5
  14. ---------------------------------------
  15. ---james 的消息记录 key=usermsg:{name}--
  16. --------------zset----------------------
  17. chat1 | 10
  18. ----------------------------------------
  19. ---聊天群中的消息id, key=msgid:{chatid}--
  20. ---------------string--------------------
  21. 0
  22. -----------------------------------------

代码实现如下

  1. /**
  2. * 创建聊天群
  3. * @param $redis
  4. * @param $sender
  5. * @param array $subscribers
  6. * @param $msg
  7. * @param $chat_id
  8. */
  9. function create_chat($redis, $sender, Array $subscribers, $msg, $chat_id)
  10. {
  11. $redis->zadd("chat:{$chat_id}", $subscribers, $unread=0);
  12. foreach ($subscribers as $user)
  13. {
  14. $redis->zadd("usermsg:{$user}", $chat_id, $msg_id = 0);
  15. }
  16. send_msg($redis, $chat_id, $sender, $msg);
  17. }
  18. /**
  19. * 向群里发送消息
  20. * @param $redis
  21. * @param $chat_id
  22. * @param $sender
  23. * @param $msg
  24. * @return mixed
  25. */
  26. function send_msg($redis, $chat_id, $sender, $msg)
  27. {
  28. $msg_id = $redis->incr("msgid:{$chat_id}");
  29. $data = [
  30. 'id' => $msg_id,
  31. 'time' => time(),
  32. 'sender' => $sender,
  33. 'msg' => $msg
  34. ];
  35. $redis->zadd("chatmsg:{$chat_id}", json_encode($data), $msg_id);
  36. return $chat_id;
  37. }
  38. /**
  39. * 用户获取未读消息
  40. * @param $redis
  41. * @param $user
  42. * @param $chatid
  43. * @return null
  44. */
  45. function read_msg($redis, $user, $chatid)
  46. {
  47. $usermsg = $redis->zrange("usermsg:{$user}", 0, -1, true);
  48. foreach ($usermsg as $val)
  49. {
  50. $chat_id = $val[0];
  51. $readed_msgid = $val[1];
  52. if($chatid != $chat_id)
  53. {
  54. continue;
  55. }
  56. //获取聊天群中最新一条消息
  57. $chat_info = $redis->zrangebyscore(
  58. "chatmsg:{$chat_id}",
  59. $readed_msgid + 1,
  60. '+inf',
  61. ['withscores' => TRUE]
  62. );
  63. //读取了最新消息后,更新用户在聊天群中状态,以及用户消息记录中的状态
  64. $len = count($chat_info);
  65. $max_msgid = $chat_info[$len-1][1];
  66. $redis->zadd("chat:{$chat_id}", $user, $max_msgid);
  67. $redis->zadd("usermsg:{$user}", $chat_id, $max_msgid);
  68. $allmsg[$chat_id] = $chat_info;
  69. }
  70. return $allmsg ?? null;
  71. }
  72. /**
  73. * 加入群组
  74. * @param $redis
  75. * @param $chat_id
  76. * @param $user
  77. */
  78. function join_chat($redis, $chat_id, $user)
  79. {
  80. // 进入群组读取最近一条消息后,更新群组状态和用户消息状态
  81. $msg_id = $redis->get("chatmsg:{$chat_id}");
  82. $redis->zadd("chat:{$chat_id}", $user, $msg_id);
  83. $redis->zadd("usermsg:{$user}", $chat_id, $msg_id);
  84. }
  85. /**
  86. * 退出群组
  87. * @param $redis
  88. * @param $chat_id
  89. * @param $user
  90. */
  91. function exit_chat($redis, $chat_id, $user)
  92. {
  93. $redis->zrem("chat:{$chat_id}", $user);
  94. $redis->zrem("usermsg:{$user}", $chat_id);
  95. }

内存淘汰策略

maxmemory 配置指令用于配置Redis存储数据时指定限制的内存大小。通过redis.conf可以设置该指令,或者之后使用CONFIG SET命令来进行运行时配置。

当 maxmemory 限制达到的时候,Redis会使用由 maxmemory-policy 配置指令来指定的策略淘汰内存中的数据:

测试与监控

info 输出的几个重要参数

  • total_connections_received : 代表 redis 的连接数,如果该值不断升高,则需要修改应用,采用连接池方式进行,因为频繁关闭再创建连接redis的开销很大。
  • latest_fork_usec : 代表上次生成 RDB 文件花费的时间,微妙。
  • info commandstats: 可以查看各个命令执行的次数,每个命令的花费总时间和平均时间,单位为毫秒。

debug 调试

  • debug segfault 模拟宕机。
  • debug sleep 30 模拟挂起。
  • debug populate num 快速产生大量测试 key。
  • debug reload 模拟 load RDB 文件的场景。
  • debug loadaof 模拟 load AOF 文件的场景。
  • used_memory:859192 代表占用的内存空间, used_memory_rss:7634944 代表实际使用的内存, mem_fragmentation_ratio:8.89 代表前两者者的比例,一般1.N为佳,如果此值过大,说明redis的内存的碎片化严重,需要重建下内存。

慢查询

慢查询用来展示命令执行时花费的时间(不包括IO操作,比如与客户端通信,发送回复等等),可以用两个参数配置慢查询日志

  • slowlog-log-slower-than : 设置执行时间超过多少微妙(1秒=1000毫秒=1000000微秒)将会被记录进慢查询日志。
  • slowlog-max-len : 设置慢查询日志的长度,最小值是0。
    • Redis使用了一个列表来存储慢查询日志,slowlog-max-len 就是列表的最大长度.一个新的命令满足慢查询条件时会被插入到这个列表中。 慢查询日志达到其最大长度时,将从日志列表中移除删除最旧的命令以腾出空间。

有了慢查询日志就可以使用相关命令来分析日志了。

slowlog get n : 返回最近的 n 条日志。

  1. redis 127.0.0.1:6379> slowlog get 2
  2. 1) 1) (integer) 14
  3. 2) (integer) 1309448221
  4. 3) (integer) 15
  5. 4) 1) "ping"
  6. 2) 1) (integer) 13
  7. 2) (integer) 1309448128
  8. 3) (integer) 30
  9. 4) 1) "slowlog"
  10. 2) "get"
  11. 3) "100"

每一个条目由四个字段组成:

1 每个慢查询条目的唯一的递增标识符(ID),条目ID只会在 Redis 服务重启才重置它。
2 记录进慢查询日志的时间(时间戳)。
3 命令执行花费的总时间,以微秒为单位。
4 组成该命令的参数的数组。

slowlog len : 获得慢查询日志的长度。

  1. 127.0.0.1:6379> SLOWLOG len
  2. (integer) 1

slowlog reset : 清空慢查询日志。

  1. 127.0.0.1:6379> SLOWLOG get 2
  2. 1) 1) (integer) 0
  3. 2) (integer) 1544580561
  4. 3) (integer) 29288
  5. 4) 1) "CONFIG"
  6. 2) "GET"
  7. 3) "dir"
  8. 5) "127.0.0.1:9471"
  9. 6) ""
  10. 127.0.0.1:6379>
  11. 127.0.0.1:6379>
  12. 127.0.0.1:6379> SLOWLOG reset
  13. OK
  14. 127.0.0.1:6379>
  15. 127.0.0.1:6379>
  16. 127.0.0.1:6379> SLOWLOG get 2
  17. (empty list or set)
  18. 127.0.0.1:6379>

基准测试

redis 自带了一个工具 redis-benchmark 来做基准测试(类似于 Apache ab 程序),你可以使用 redis-benchmark -h 来查看它的帮助信息。

  1. [root@VM_0_17_centos ~]# redis-benchmark -h
  2. Invalid option "-h" or option argument missing
  3. Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests>] [-k <boolean>]
  4. -h <hostname> Server hostname (default 127.0.0.1)
  5. -p <port> Server port (default 6379)
  6. -s <socket> Server socket (overrides host and port)
  7. -a <password> Password for Redis Auth
  8. -c <clients> Number of parallel connections (default 50)
  9. -n <requests> Total number of requests (default 100000)
  10. -d <size> Data size of SET/GET value in bytes (default 3)
  11. --dbnum <db> SELECT the specified db number (default 0)
  12. -k <boolean> 1=keep alive 0=reconnect (default 1)
  13. -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD
  14. Using this option the benchmark will expand the string __rand_int__
  15. inside an argument with a 12 digits number in the specified range
  16. from 0 to keyspacelen-1. The substitution changes every time a command
  17. is executed. Default tests use this to hit random keys in the
  18. specified range.
  19. -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).
  20. -e If server replies with errors, show them on stdout.
  21. (no more than 1 error per second is displayed)
  22. -q Quiet. Just show query/sec values
  23. --csv Output in CSV format
  24. -l Loop. Run the tests forever
  25. -t <tests> Only run the comma separated list of tests. The test
  26. names are the same as the ones produced as output.
  27. -I Idle mode. Just open N idle connections and wait.

选项说明:

  • -c : 客户端数量。
  • -n : 同一时间发出的请求数(默认为100000)。
  • -d : SET/GET值的数据大小(以字节为单位)(默认为3)。
  • -q : 只输出耗费时间(秒)。
  • -t : 只测试跟在后面命令。
  • -r : 生成给定数量的随机 key。
    • 例如设置 10 万随机 key 连续 SET 100 万次 : redis-benchmark -t set -r 100000 -n 1000000
  • -d : 设置随机产生 key 的值大小(默认为3个字节)。
  • -P : 以管道的方式提交多条命令的测试,因为默认情况下都是在一个请求完成之后才发送下一个请求。
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注