抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-AMQP协议

AMQP 历史

​ 消息队列(Message Queue)起源于一位来自 MIT 的硬件设计教育工作者 Vivek Ranadivé 设想了一种通用软件总线,就像主板上的总线那样,供其他应用程序接入。Vivek在1983年成立了 Teknekron,高盛等公司作为第一批用户再金融交易中采用了 Teknekron的软件,同时还诞生了第一代消息队列软件:Teknekron 的 The Information Bus(TIB)。

​ Teknekron 的 TIB 允许应用开发者建立一系列规则去描述消息内容,只要消息按照这些规则发布出去,任何消费者应用都可以订阅感兴趣的内容,信息的生产者和消费者完全解耦,并且可以再传输过程中灵活混合。这个特性引起了电信特别是新闻机构的注意。1994年路透社收购了 Teknekron 。

​ 由于消息队列再金融交易中应用的反响,BIM 在1990年也开始研发自己的消息队列软件(BIM MQ),并且逐步演化成 WebSphere MQ 并统治着商业消息队列平台市场。同时微软开发了Microsoft Message Queue(MSMQ)。然而这些商业MQ问题在供应商壁垒,各个厂商的 MQ 之间无法互通。为了解决这个问题,Java Message Service(JMS)在2001年诞生了,试图通过提供公共 Java API的方式隐藏MQ各个供应商提供的实际接口,从而跨越壁垒和解决互通问题,但是由于使用单独的标准化接口来胶合众多不同的接口使应用程序反而变得更加脆弱。

​ 2004年 JPMorgan Chase 和 iMatix 公司一起合作开发 Advanced Message Queuing Protocol (AMQP,高级消息队列协议),从一开始就设计成为开放标准,任何人都可以执行这一标准,针对该标准任何人都可以和任何 AMQP 供应商提供的 MQ 服务器进行交互。

AMQP 协议

​ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

​ AMQP协议这种降低耦合的机制是基于与上层产品,语言无关的协议。是一种二进制协议,提供客户端应用与消息中间件之间多通道、协商、异步、安全、中立和高效地交互。

​ AMQP 是可扩展的,如自定义 exchange 类型、声明 exchange 和 queue 的时候附加属性、服务端特定拓展、新的 AMQP 方法、利用插件扩展等等。

AMQP协议基本介绍

目前Rabbitmq最新的版本默认支持的是AMQP 0-9-1,该协议总共包含了3部分:

Module Layer

​ 位于协议的最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自定义的业务逻辑。例如,客户端可以是使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息。

Session Layer

​ 位于中间层,主要负责将客户端的命令发送给服务端,在将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性的同步机制和错误处理。

Transport Layer

​ 传输层,基于二进制数据流传输,用于将应用程序调用的指令传回服务器,并返回结果,同时可以处理信道复用,帧处理,内容编码,心跳传输,数据传输和异常处理。

​ 传输层可以被任意传输替换,只要不改变应用可见的功能层相关协议,也可以使用相同的传输层,同时使用不同的高级协议

AMQP 传输层设计驱动基于如下要求:

  1. 使用二进制数据流压缩和解压,提高效率;

  2. 可以处理任意大小的消息,且不做任何限制;

  3. 单个连接支持多个通信通道;

  4. 客户端和服务端基于长链接实现,且无特殊限制;

  5. 允许异步指令基于管道通信;

  6. 易扩展,基于新的需求和变化支持扩展;

  7. 新版本向下兼容老版本;

  8. 基于断言模型,异常可以快速定位修复;

  9. 对编程语言保持中立;

  10. 适应代码发展演变;

AMQP 通用组件

AMQ Model 架构

​ AMQ 作为中间层服务,把消息生产和消费分隔开来,当消息生产者出现异常,不影响消费者对消息的消费,当消费者异常时,生产者生产的消息可以存放到服务的内存或者磁盘,不会影响到消费的速率,同时,消息也可以基于路由的规则可以投递到指定的消费者消费。

​ AMQ 基于模块化通过 Exchange 和 Message Queue 两个组建组合实现消息路由分发。

模型重要组件
Message Queue

能够将发送过来的消息进行存储,同时将消息转发给消费者;

Exchange 和 Message Queue之间存在绑定关系,消息到了 Exchange 后基于路由策略可以将消息投递到已绑定且符合路由策略的 Message Queue。

​ 消息队列会将消息存储到内存或者磁盘中,并将这些消息按照一定顺序转发给一个或者多个消费者,每个消息队列都是独立隔离的,相互不影响。

​ 消息队列具有不同的属性:私有,共享,持久化,临时,客户端定义 或者服务端定义等,可以基于实际需求选择对应的类型,以 RabbitMQ 队列特性为例:

​ 共享持久化消息队列:将发送的消息存储到磁盘,然后将消息转发给订阅该队列的所有消费者;

​ 私有临时消息队列:RabbitMQ 支持 rpc 调用,再调用过程中消费者都会临时生成一个消息队列,只有当前消费者可见,且由服务端生成,调用完就会销毁队列。

Exchange

基于消息生产者和路由规则可以将消息投递到指定的 Message Queue;

​ 交换机收到生产者投递的消息,基于路由规则及队列绑定关系匹配到投递对应的交换机或者队列进行分发,交换机不存储消息,只做转发。

​ AMQP定义了许多标准交换类型,基本涵盖了消息传递所需的路由类型,一般 AMQP 服务器都会提供默认的交换机基于交换机类型命名,AMQP 的应用程序也可以创建自己的交换机用于绑定指定的消息队列发布消息。

消息的流转过程

消息生命周期

​ 消息主要由属性及消息内容组成,生产者创建消息时可以给消息设置属性及消息内容,同时也可以标记路由信息在消息上,可以将消息发送到指定交换机。

​ 当消息到达交换机时,交换机会基于路由规则判断消息能否转发,如果不能转发会丢弃消息同时反馈给生产者。

​ 交换机基于路由规则可以将消息投递到一个或者多个消息队列,服务器通过复制或者计数器的方式将消息保存到不同队列中,每个队列中的消息内容是相同的,但是操作是隔离的,相互不影响。

​ 当消息到达消息队列后,消息队列会基于 AMQP 协议投递给消费者,如果无法投递给消费者或者没有消费者,消息将在内存或者磁盘中存储,等待消费者。

​ 当消息队列可以将消息传递给消费者时,消息将从其内部缓冲区中删除。 删除操作可能立刻执行也可以再消费者确认消息消费后再执行,删除策略消费者可以选择。

​ 生产消息投递确认和消费消息消费确认可以作为两个事务,然后提交或者回滚事务。

AMQP 指令架构

协议指令(类和方法)

​ 作为消息中间件传统的 API 定义的操作非常复杂,为了解决这个问题 AMQP 基于传统 API 的功能,定义方法来对应实现 API 的操作每个方法只完成一件事,通过方法之间的组合来实现完整的功能,所以AMQP 形成了一个非常庞大的指令集,但是指令集中的方法都是便于理解的。

​ AMQP 指令集中指令,基于对应的特定功能域被划分为不同的类,其中有一些类作为特定类的支持类,属于可选的。

有如下两个场景:

  • 同步请求:一边等待对方发送请求,一边等待对方发送回复。适用于对性能要求不高的场景。

  • 异步请求:发送请求后不等待回复,使用场景对性能要求比较高的场景。

​ 为了简化指令处理,我们给每个同步请求定义不同的回复指令,也就是说同一个回复指令不可能返回给2个不同的请求。这也意味着发送同步请求的发送方可以接受和处理回复的指令,知道获得有效的同步回复指令为止。这种方式可以将 AMQP 与传统的 RPC 协议区分开来。

​ 一条指令可以被定义为同步请求,同步回复(针对特定请求)或者异步回复,但是每种指令真正再被定义是在客户端(即服务器到客户端)或者服务端(即户端到服务器)。

AMQP 映射到中间层 API

​ AMQP 映射到中间层 API,这个映射过程并不是所有方法和参数完全映射,因为有部分方法或者参数对应用程序没有意义。同时映射规则也是固定的,基于已定的一些规则,所有方法按照这个规则映射,不需要人工干预。

例如:队列声明方法:

1
2
3
4
Queue.Declare 
queue=my.queue
auto-delete=TRUE
exclusive=FALSE

可以作为一条线性记录

1
2
3
4
+--------+---------+----------+-----------+-----------+ 
| Queue | Declare | my.queue | 1 | 0 |
+--------+---------+----------+-----------+-----------+
class method name auto-delete exclusive

也可以作为高级 API

1
queue_declare (session, "my.queue", TRUE, FALSE);

​ 对于大多数应用程序来说,中间层(指令层)隐藏再技术层面,应用程序实际使用的 API 功能对比中间层相对会较少。

AMQP 传输层架构

简要概述

​ AMQP 传输基于二进制协议,传输的信息被组织成各种类型的帧,帧携带协议方法和其他相关信息,所有的帧具有相同个格式:帧头,有效内容,帧尾。帧的有效内容格式取决于帧的类型。

​ 假设再一个可靠的面向流的网络传输层(例如:TCP / IP)

​ 再一个 Socket 连接中,可以有多个独立的线程访问,这种情况就是上文中提到的 Channel(通道),每个帧都有一个属于自己的通道号码,再同一个连接中所有的帧混合在一起,不同的通道共享连接,但是针对每个通道自身的帧都是按照严格的顺序运行。

​ 由于帧的有效内容都是由帧头和帧尾包装,所以对应帧数据的解析是相当简单便捷的,同时基于协议规范生成帧数据也是非常容易。

数据类型

AMQP 使用的数据类型如下:

  • Integers(数值范围1-8, 8个字节):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。
  • Bits(统一为8个字节):用于表示开/关值。
  • Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节
  • Long strings:用于保存二进制数据块。
  • Field tables:包含键值对,字段值一般为字符串,整数等。
协议协商

​ AMQP 客户端和服务器存在协商协议。这意味着当客户端连接时,服务端会提出一些客户端可以接受或者修改的选项,如果双方达成一致,连接继续,基于协商协议,可以设定好一些先决条件。

在AMQP中,协商协议的一些具体方面:

  • 实际的协议和版本。服务器可以在同一端口上监听多个协议。

  • 加密参数和双方的身份验证。

  • 最大帧大小,通道数量和其他操作限制。

AMQP消息属性详解

​ 在消息队列中,事件通过消息总线发布到消费者应用程序,每个事件都执行自己特有的任务。但是,如果没有一个标准化的消息格式,我们就难以预测特定的消息类型如何被序列化以及这些消息包含了哪些具体数据。

​ 为了提高消息格式的复用性,AMQP协议的Basic.Properties数据结构提供了一种标准的消息格式,通过AMQP协议发布到RabbitMQ的每条消息都包含这一结构,这使得消费者应用程序可以进行自动反序列化消息,在处理消息之前验证消息的来源以及类型等等。

Basic.Properties的属性

  • content-type 指定消息类型(mime-types)便于序列化/反序列化
  • content-encoding 消息体使用某种特殊的方式进行压缩或者编码
  • message-id 和 correlation-id 唯一标识消息和消息响应,用于实现消息跟踪
  • timestamp 减少消息大小,描述消息创建时间
  • expiration 表明消息过期
  • delivery-mode 在RabbitMQ中表明将消息写入磁盘或者内存队列
  • app-id 和 user-id 帮助追踪出现问题的消息发布者应用程序
  • type 定义发布者和消费者之间的契约
  • reply-to 实现响应消息的路由
  • headers 映射表定义字有格式的属性、实现RabbitMQ路由

​ 本文中对于“契约”的定义:一种确定消息格式和内容的规范。通常用来描述API、对象和系统的预定义规范。契约规范中通常包含有关发送和接收消息的精确信息,例如数据类型、格式以及各种需要遵守的条件。

content-type

​ 通过RabbitMQ发布的消息,我们很容易对它进行复用。例如:最初的消费者应用程序是使用Python编写的,但不久之后,使用PHP、JAVA和C语言编写的程序同样成为的消息的消费者。

​ 当消息格式中没有对消息体内容的描述时,应用程序会倾向使用一种隐式契约,这种隐式契约天生容易出错(例如,Python程序使用pickle序列化的数据无法在其他程序中反序列化),所以你的应用程序非常可能出现问题。

​ 通过指定消息类型,程序员和消费者应用程序不需要猜测如何反序列化消息体中的数据,甚至根本就不需要执行反序列化操作。如果你在消费者代码中使用了一个框架,在消费者代码处理消息之前,通过框架对其进行预处理,消息体可以自动地被反序列化并加载到你所使用的编程语言的本地数据结构中。从而降低消费者应用程序代码的复杂性

注意

​ 应该尽量使用标准的序列化格式例如JSON、Msgpack或XML。这些格式允许使用任何编程语言编写任意的消费者应用程序,因为数据是以这些格式进行自我描述的,所以编写潜在的消费者应用程序会很容易。并且在程序外部对消息解码也更简单。

​ 通过cintent-type属性指定序列化格式,可以更好地支持未来的消费者应用程序—–当消费者可以自动识别它们所支持的消息格式并选择性地处理消息时,那就不必担心在使用新的序列化格式并将其路由到相同的队列时会发生什么情况。

gzip和 content-encoding

默认情况下,通过AMQP发送的消息并不会被压缩。

​ 在处理像XML这种过于繁杂的标记语言,甚至在消息数量较大的场景下处理像JSON或YAML等轻量级数据格式时,你的发布者可以在发布消息之前压缩消息,并在收到消息时进行解压缩,就像我们使用gzip在服务器上压缩网页然后在浏览器端实时解压这些网页之后再进行展示一样。

​ 通过与content-type属性相结合,content-encoding属性使消费者应用程序能够基于一种明确的契约与发布者进行交互。你可以编写扩展性更强的代码,确保代码不会由于消息格式变更而导致意外错误。例如:在应用程序的生命周期中,你可能发现bzip2压缩更加适合你的消息内容。如果你在编写消费者应用程序来检查content-encoding属性,则可以拒绝那些不能解码的消息,并把它们留在队列中供其它支持这种解码方式的消费者去消费。

message-id和correlation-id

​ 在AMQP规范中,message-id和correlation-id是“应用级别”的属性,并没有提供正式的行为定义。这就意味着你可以利用它们实现任何目的,这两个字段允许最多255个字节的UTF-8编码数据,并以未压缩的方式存储在Basic.Properties数据结构中。

message-id

​ 某些消息类型(如登录事件)并不需要与其关联的唯一标识,但是订单类型的消息可能需要具备这个唯一标识。当消息流对系统中的各个组件进行耦合时,message-id属性使得消息能够在消息头中携带数据从而唯一地识别该消息。

correlation-id

​ 在AMQP规范中没有关于correlation-id的正式定义,但是通过指定该消息的correlation-id为另一条消息关联消息的message-id,可以指定该消息是另一个消息的响应。另一种用法是使用它来传递关联消息的事务ID或其他类似。

timtstamp属性

​ 与message-id和correlation-id一样,timsstamp属性也是“应用级别”的属性。通过timestamp属性来指定消息的创建事件,消费者可以评估消息投递过程的性能、决定是否处理消息、丢弃消息、甚至对应用程序发布警报消息。

注意:时间戳没有上下文,因此建议在所有消息中使用统一的时区。

expiration

消息自动过期

​ 如果消息没有被消费,expiration概述RabbitMQ何时应该丢弃消息。expiration属性在AMQP的规范定义中比较奇怪:“用于实现,但没有正式的行为”。这意味着RabbitMQ可以提供任何它认为合理的实现方式。同时,expiration的格式是一个短字符串,最多允许255个字符,而代表时间单位的另一个属性timstamp则是一个整数值。

​ 由于规范中没有给出明确说明,当使用不同的消息代理服务器甚至同一消息代理服务器的不同版本时,expiration可能会有不同的含义。想要利用expiretion属性来实现RabbitMQ消息的自动过期,必须把一个UNIX时间戳存储为字符串。

​ 使用expiration属性时,如果把一个已经过期的消息发布到服务器,那么这条消息不会被路由到任何队列,而是直接被丢弃

注意:RabbitMQ3.0以上才支持expiration属性。

delvery-mode

使用delvery-mode平衡速度和安全性

​ delivery-mode属性是一个字节字段,像消息代理服务器表明在将消息投递到任何正在等待的消费者之前,你希望先将它持久化到磁盘上。delivery-mode属性有两个可能的值:1 代表非持久化消息,2 代表持久化消息。

消息的持久性与队列的持久性(durable)

  • 队列的持久性(durable)告诉RabbitMQ该队列在重新启动RabbitMQ服务器或集群之后是否仍然有效。
  • 只有消息的delivery-mode为2时,才会向RabbitMQ指定消息是否应该被持久化。
  • 一个队列可能包含持久化和为持久化的消息

app-id和user-id

使用app-id和user-id验证消息来源

​ app-id和user-id属性提供了关于消息的另一层信息,并且有很多潜在的用途

app-id

​ app-id属性在AMQP规范中定义为“短字符串”,最多允许255个UTF8字符,如果应用程序采用的时以带版本的API为中心的设计,那么在生成消息时可以使用app-id传递特定的版本号,在处理消息之前检查app-id允许应用程序丢弃那些来源不明或者不受支持的消息

​ app-id的另一个属性是收集统计数据。例如,如果你使用消息来传递登录事件,则可以将app-id设置为触发登录事件的平台和应用程序版本。在一个需要同时支持web端、桌面端和移动端应用的环境中,如果希望跟踪并统计各个平台的登录数据,使用这种方式我们甚至不需要检查消息体。

​ 如果一个新的消息发布者错误地使用了与现有发布者应用程序相同的Exchange和routing_key时,通过app-id可以更容易地追踪恶意消息的来源

user-id

​ 在需要验证用户身份时,可以使用user-id属性来标识已登录的用户。但大多数情况下,并不推荐这种做法。RabbitMQ会根据发布消息的RabbitMQ用户信息检擦每条已发布消息的user-id属性值,如果这两个值不匹配,那么该消息会被拒绝。

type

使用type属性获取明细

​ AMQP规范的0-9-1版本将type属性定义为“消息类型名称”,它用来描述消息中的内容。

​ 像JSON和XML这样的自描述格式被一些人认为太冗长了。它们可能在网络传输或者内存存储上带来不必要的开销,序列化和反序列化相较一些语言也比较慢。当消息体没有以自描述格式进行序列化(例如Apache Thrift、ProtoBuf这样的序列化格式),这些二进制编码的消息格式不是自描述的,需要依赖外部定义的文件来进行序列化和反序列化。这时,可以通过type属性指定记录类型或者外部定义文件,如果无法正确访问处理消息所需的.thrift或.proto文件,消费者就能够拒绝这些消息。

实现动态工作流

​ AMQP规范中,reply-to属性被指定用于应用程序,但是没有规定的行为,他还有一个附加说明:使用reply-to可以构架一个用来回复消息的私有响应队列。

​ 尽管在AMQP规范中没有说明私有响应队列的确切定义,但是该属性可以在最初发布消息的相同Exchange中携带特定的队列名称或者routing_key,这些队列名称或者routing_key可以用于回复消息。

消息头

使用消息头定义头属性

​ headers是一个键值对映射表,允许用户自定义任何的key/value。键可以是ASCII或者Unicode字符串,最大长度为255个字符。值可以是任何有效的AMQP值类型。

​ headers属性允许添加任何你想要的数据到消息头中。除此之外,它还具有另一个独特的功能:RabbitMQ可以根据headers表中填充值来进行消息的路由,而不需要依赖于routing_key

优先级属性

优先级属性

​ 截至3.5.0版本,RabbitMQ已经按照AMQP规范实现了priority字段,它的取值范围是一个介于0~9之间的整数,用于指定队列中消息的优先级。

​ 如果首先发布一条优先级为9的消息,随后再发布一条优先级为0的消息,则新连接的消费者将会先接收到优先级为0的消息

注意:RabbitMQ将priority字段实现为无符号字节,所以优先级可以是0到255之间的任意值,但最好将取值范围限制在0到9之间以保证规范性。

不能使用的属性 cluster-id/reserved

cluster-id属性是AMQP 0-8中定义的,但随后被删除,RabbitMQ从未实现过关于改属性的任何行为。

AMQP 0-9-1将cluster-id属性重新命名为reserved,并声明它必须为空,虽然RabbitMQ目前没有根据规范要求它是空的,但是最好规避这个属性。

总结

属性 类型 用途 使用建议或特殊用法
app-id short-string 应用程序 用于发布消息的应用程序
content-encoding short-string 应用程序 指定消息体是否以某种特殊方式编码,如zlib、deflate或Base64
content-type short-string 应用程序 使用mime-types指定消息体的类型
correlation-id short-string 应用程序 如果消息引用了某个其他消息或具有唯一标识的项目,那么correlation-id可以用来指定这种引用关系
delivery-mode octet RabbitMQ 值为1告诉RabbitMQ可以将消息保存在内存中;值为2表示它也应该被写入磁盘
expiration short-string RabbitMQ 用文本字符串表示的纪元时间或者UNIX时间戳,表示消息的过期时间
headers table 应用程序/RabbitMQ 一个自由格式的键值表,可以使用它来添加消息相关的附加元数据;RabbitMQ也可以基于它进行路由
message-id short-string 应用程序 唯一的标识符,例如在应用程序中可以使用uuid来标识消息
priority octet RabbitMQ 队列中标识消息的优先顺序
timestamp timestamp 应用程序 用文本字符串表示的纪元时间或者UNIX时间戳,表示消息的创建时间
type short-string 应用程序 一个文本字符串,用于表示应用程序中描述消息或有效负载的类型
user-id short-string 应用程序/RabbitMQ 一个自由格式的字符串,如果启用该属性,RabbitMQ会验证当前连接的用户,若不匹配则丢弃消息

评论