[关闭]
@gzm1997 2018-08-08T09:49:50.000000Z 字数 4433 阅读 2231

使用rabbitMQ作为缓存队列

rabbitMQ go


高并发请求情况

我们在服务端开发过程中 往往会遇到需要处理高并发请求 并且将post过来的数据存进数据库的情况 这种情况一般会对我们的两部分造成很大压力

  1. nginx等服务器
  2. 数据库连接

因为当并发量很大的时候 nginx等服务器会因为负载均衡受不了那么大的压力会崩掉 一般这样的话

用Unix socket或者tcp socket的方式增多几个web app实例以供nginx进行轮训 或者配置一下nginx的最大连接数 这样一般可以有效解决nginx服务器的问题

但是数据库的操作这边是无论如何也是需要一点时间的 如果并发量很大 那么久比较难解决 一般来说mongodb是可以通过在公有的连接池里面拷贝出一个新的session 但是这样的效果也是有限的 而比较有效的方法是

使用消息队列暂时存储来不及处理的数据 等服务端缓过来之后再冲消息队列里面获取数据 进行数据库操作


rabbitMQ

rabbitMQ就是一个缓存队列 下面是一个缓存队列的基本结构

image_1ckc8btt31eftpo66tvgja139e9.png-73kB

交换区 bindingKey跟queue构成channel

1.生产者
第一部分是生产者并,生产者不知道到底有什么队列可以存东西 只是一个单纯的搬运工 将打上了bindingKey的信息转发给第二部分exchange交换区

2.交换区
第二部分是交换区 交换区位于生产者和队列之间 所有的消息都是由交换区转发给队列的 转发给队列之间需要进行QueueBind队列绑定 类似下面

  1. err = ch.QueueBind(
  2. //用来接收信息的queue的名字
  3. q.Name,
  4. //bingdingKey是什么
  5. b.String(),
  6. //交换区是什么
  7. EXCHANGE,
  8. false,
  9. nil)

简而言之 队列绑定就是当接收方声明一个队列作为接收队列的时候 用来说明 打上了什么样的bingdingKey的信息应该从交换区那边转发给给我这个队列里面 然后随后我自己会从这个队列里面拿出这些信息

3.队列
第三部分是队列 就是作为存储的空间 所有有待被接收方接收的信息都存储在这些队列里面 当一个信息被接收方接收成功之后 队列会删除这个信息以释放内存 如果接收方没有成功接收 那么这个信息会在队列里面重新排队 如果刚好有空闲的队列 那么这个信息会接着很快被处理 但是这里涉及到一个问题

队列是怎样知道接收方成功接收了信息呢?

详情如下

但是一旦发送方这边挂了 怎么办呢?

详情如下

公平分发 有时候有些种情况是当exchange类型是默认类型的话 需要分发信息12345 奇数的信息需要处理的时间更久一点 偶数处理时间比较短 那么还是轮流来分配对于第一个消费者来书是不公平的

如何实现公平分发

4.消费者
第四部分是消费者 消费者是可以多个的 消息队列的一个很大的好处是平行化工作 设想一下有一种情况是生产者不断生产工作 发送到队列里面 多个消费者从队列里面获取工作信息进行工作 这样的话可以大到平行化工作的效果 最大使用服务端这边的资源


exchange类型

exchange的集中类型

各种类型的交换区的使用方法有点啰嗦 这里就不讲了 可以去看官方文档 文档比较多例子 通俗易懂
rabbitMQ go文档

但是后面我会拿我这次在用户关系图谱这个项目上使用的rabbitMQ作为例子 我使用了direct类型的exchange

rabbitMQ在服务端上使用


队列怎么知道接收方成功接收

这里跟计算机网络里面的ACK是类似的 当接收方成功接收到了一个信息 会为这个信息给队列发送一个ACK信号说明我成功接收到了这个信息 在官方文档的hello world里面是设置了默认的自动发送ACK信号
微信图片_20180808161057.png-13kB

这个ACK机制其实是为了预防接收方挂了 因为一旦接收方不小心挂了 那么发送方这边就接收不到ACK信号 从而重新给那个信息排队

你也可以设置autoAck为false 然后自己在接收方那边使用

d.Ack(false)

来进行发送ACK信号


发送方不小心挂了怎么办

这就设置可靠性的设置了

首先需要设置队列是可靠的 durable可靠性这个选项是需要在服务端和客户端两边都需要设置的
微信图片_20180808161610.png-9.2kB

然后设置我们的信息是persistent持久的
微信图片_20180808161855.png-9.2kB

上面两个设置可以有效防止发送方这边挂了的情况 这样即使发送方不小心崩溃了 在重启之后也会重新发送还没发送成功的信息


公平分发

有时候有些种情况是当exchange类型是默认类型的话 需要分发信息12345 奇数的信息需要处理的时间更久一点 偶数处理时间比较短 那么还是轮流来分配对于第一个消费者来书是不公平的

  1. err = ch.Qos(
  2. 1, // prefetch count
  3. 0, // prefetch size
  4. false, // global
  5. )
  6. failOnError(err, "Failed to set QoS")

其实这样设置一下就可以了 这样表明队列是不会一次性地给一个消费者分发超过一个的消息 直到消费者处理完并且发送了前一个信息的ack信号之后再分发 或者将这个信息发送给下一个不是很忙的消费者


rabbitMQ在服务端上使用

项目部分代码地址
用户关系图谱路由server

其中rabbitMQ主要使在cache模块
进行rabbit的初始化
一定要注意如果你是在init函数中进行rabbit的初始化的话一定不要在init函数了里面使用defer进行关闭连接跟channel 因为这样的话后面要使用rabbit资源就会报错显示连接或者channel断开 rabbit资源的释放需要在其后关闭 最好需要关闭

  1. func init() {
  2. //加载rabbitMQ在配置文件中的配置
  3. rabbitUser := beego.AppConfig.String("rabbitUser")
  4. rabbitPsw := beego.AppConfig.String("rabbitPsw")
  5. rabbitIp := beego.AppConfig.String("rabbitIp")
  6. rabbitPort, _ := beego.AppConfig.Int("rabbitPort")
  7. dbUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", rabbitUser, rabbitPsw, rabbitIp, rabbitPort)
  8. var err error
  9. //建立链接
  10. if conn, err = amqp.Dial(dbUrl); err != nil {
  11. panic(err)
  12. }
  13. //声明一个channel
  14. if ch, err = conn.Channel(); err != nil {
  15. panic(err)
  16. }
  17. //声明一个类型为direct的交换区
  18. err = ch.ExchangeDeclare(
  19. EXCHANGE,
  20. "direct",
  21. true,
  22. false,
  23. false,
  24. false,
  25. nil,
  26. )
  27. if err != nil {
  28. panic(nil)
  29. }
  30. }

发布新消息
发布信息

  1. func PublishMsg(json []byte, bingdingKey BindingKey) error {
  2. err := ch.Publish(
  3. //指定我们要使用的direct类型的交换区
  4. EXCHANGE,
  5. //根据bingdingKey进行转发
  6. bingdingKey.String(),
  7. false,
  8. false,
  9. amqp.Publishing{
  10. //发送的信息时候持久的 即使发送方突然挂了 重启之后还会继续发
  11. DeliveryMode: amqp.Persistent,
  12. //发送的类型是json对象
  13. ContentType: "application/json",
  14. Body: json,
  15. })
  16. return err
  17. }

接受信息
接受信息
接收方因为用户关系图谱这个项目的逻辑比较复杂 所以写的说起来有点麻烦

  1. func GetMsg() {
  2. //在这里释放rabbit的资源
  3. defer conn.Close()
  4. defer ch.Close()
  5. forever := make(chan bool)
  6. for _, bingdingKey := range AllBindingKeys {
  7. go func(b BindingKey) {
  8. fmt.Println("queue for", b)
  9. //声明一个匿名队列
  10. q, err := ch.QueueDeclare(
  11. "",
  12. true,
  13. false,
  14. true,
  15. false,
  16. nil,
  17. )
  18. if err != nil {
  19. panic(err)
  20. }
  21. //进行队列绑定
  22. err = ch.QueueBind(
  23. q.Name,
  24. b.String(),
  25. EXCHANGE,
  26. false,
  27. nil)
  28. if err != nil {
  29. panic(err)
  30. }
  31. //声明一个消费者 从这个匿名队列里面读取信息
  32. msgs, err := ch.Consume(
  33. q.Name, // queue
  34. "", // consumer
  35. false, // auto ack
  36. false, // exclusive
  37. false, // no local
  38. false, // no wait
  39. nil, // args
  40. )
  41. if err != nil {
  42. panic(err)
  43. }
  44. store := session.GetGraph()
  45. switch b {
  46. case CreateGroupShareLink:
  47. for d := range msgs {
  48. cgsl := models.CreateGroupShareLink{}
  49. if err := json.Unmarshal(d.Body, &cgsl); err == nil {
  50. fmt.Println("add CreateGroupShareLink", cgsl)
  51. fmt.Println(cgsl.AddCreateGroupShareLinkToCayley(store))
  52. } else {
  53. panic(err)
  54. }
  55. //发送ack信号 代表已经接受成功
  56. d.Ack(false)
  57. }
  58. //省略部分 太长省略
  59. }
  60. }(bingdingKey)
  61. }
  62. <- forever
  63. }

释放rabbit资源

如果没有释放rabbit的资源 而且设置了channel和queue为可靠以及持久性的话 一般会导致一个bug

receiveed unexpected response

这是因为有可能在你挑食的时候突然关闭 而你却没有在代码里面主动close掉connection和channel 那么下一次运行的时候 接收方会重新发送一次一个信息 这导致发送方不知道这是什么ACK的信息 从而导致这个报错 一般来说

  1. //在这里释放rabbit的资源
  2. defer conn.Close()
  3. defer ch.Close()

可以有效解决这个问题 但是还是会偶尔出现的 出现的话就重新开几次 两三次之后缓存被清理掉就没有这个报错了

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注