StreamNative 宣布开源 MoP:Apache Pulsar 支持原生 MQTT 协议


我们很高兴地宣布 StreamNative 开源了 “MoP”(MQTT on Pulsar)。MoP 将 MQTT 协议处理插件引入 Pulsar broker。这样一来,Apache Pulsar 就可以支持原生 MQTT 协议。

与 KoP 相似,MoP 是一种可插拔的协议处理插件。将 MoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 MQTT 应用程序和服务迁移到 Pulsar。

这样 MQTT 应用程序就可以利用 Pulsar 的特性,例如 Apache Pulsar 计算和存储分离的架构以及 Apache BookKeeper 保存事件流和 Pulsar 分层存储等特性。

什么是 Apache Pulsar

Apache Pulsar 是一个云原生的分布式消息传递和流数据平台,每天管理数千亿个事件。Pulsar 最初由 Yahoo 开发,于 2016 年底开源,并于 2018 年成为 Apache 软件基金会的顶级项目。Pulsar 将重要的 Yahoo 应用程序(例如 Yahoo Finance、Yahoo Mail 和 Flickr)连接到数据的整合消息传递平台。

GitHub:https://github.com/apache/pulsar 。

Pulsar 是一种多租户、高性能解决方案,用于在服务器之间传递消息。Pulsar 支持以下关键特性:

  • Pulsar 的单个实例原生支持多个集群,支持跨地域在集群间无缝复制消息
  • 极低的发布延迟和端到端延迟
  • 支持扩展到数百万个 Topic
  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递

目前,Apache Pulsar 已经广泛应用于多个领域。腾讯、Verizon Media、Splunk、中国移动、BIGO 等都在使用 Pulsar 来实现业务目标。

更多用户案例,可以参考:https://streamnative.io/blog 。

什么是 MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(pubish/subscribe)模式的"轻量级"通讯协议。

该协议构建于 TCP/IP 协议之上,由 IBM 在 1999 年发布。MQTT 最大优点在于,它可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,MQTT 在物联网、小型设备、移动应用等方面得到广泛应用。

为什么需要 MoP

Apache Pulsar 为队列和流工作负载提供统一的消息模型。Apache Pulsar 支持基于 protobuf 的二进制协议,以确保高性能和低延迟。protobuf 有利于实现 Pulsar 客户端:https://pulsar.apache.org/docs/en/client-libraries/ 。

而且,该项目也支持 Java,Go,Python 和 C ++ 语言以及社区提供的第三方客户端:https://pulsar.apache.org/docs/en/client-libraries/#thirdparty-clients 。

Pulsar 支持多租户,基于 Apache BookKeeper 构建持久化机制。因此,越来越多的公司正在探索使用 Pulsar 搭建其底层服务、转变业务架构。但是,想要采用 Pulsar 的统一消息协议,用户必须重写使用其他消息协议编写的现有应用程序。

为了解决这个问题,StreamNative 一直致力于开发新项目。今年,StreamNative 开源了KoP(Kafka-on-Pulsar)和AoP(AMQP-on-Pulsar)协议处理插件,方便将使用 Kafka 和 AMQP 协议的应用程序和服务迁移到 Pulsar。

  • KoP 是一种可插拔的协议处理插件。将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar
  • AoP 是一种可插拔的协议处理插件。将 AoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 RabbitMQ 应用程序和服务迁移到 Pulsar

StreamNative 收到大量的用户请求,请求帮助他们从 MQTT 迁移到 Pulsar。同时,StreamNative 也意识到在 Pulsar 上原生支持 MQTT 消息传输协议的必要性。所以,StreamNative 开始致力于将通用协议处理插件框架引入到 Pulsar 中。该框架允许使用 MQTT 传输协议的开发人员使用 Pulsar。

MoP 架构

MoP 是一个可插拔的协议处理插件。通过使用 Pulsar 的 Topic、Cursor 等特性,实现在 Pulsar 上支持原生 MQTT 协议。

下图展示了 MoP 协议处理插件与 Pulsar 集群的结合。MQTT Proxy 服务和 MQTT 协议处理插件都与 Pulsar broker 一起运行。

MoP 概念

消息的服务质量

为了适应不同的场景需求,MQTT 协议支持以下三种QoS 等级:

  • QoS0:最多只发送一次消息,或者当网络传送受阻时,根本不发送消息。也不会保存发送的消息。

  • QoS1:至少发送一次消息。如果发送方没有收到确认包,则会再次发送加上 DUP 标志的消息,直到发送方收到确认包。

  • QoS2:只成功发送一次消息。消息必须存储在发送方和接收方的本地环境中,直到被妥善处理。该 QoS 等级最高的消息服务等级。

目前,MoP 协议处理插件只支持 QoS0 和 QoS1 级别的消息服务质量。计划在未来版本中支持 QoS2。

MoP Proxy

在 MoP 中,MoP Proxy 是一个可选组件,主要用来代理 MoP 的服务。MoP Proxy 支持将 MoP 扩展至多个节点,以实现横向扩展服务。MoP Proxy 主要用于正确地转发 MQTT Client 和 Pulsar Broker 之间传递的消息数据,因此 MQTT Client 只需连接到 MoP Proxy,发送并接收数据,而无需关注 Topic 的所属 Pulsar Broker。

MoP Proxy 可以感知 Topic 所属 Pulsar Broker 的变化。一旦所属 Pulsar Broker 发生变化,MoP Proxy 可以将 MQTT Client 的网络数据包发送至新的所属 Pulsar Broker。

下图说明了 MoP Proxy 的服务流程。

  1. MQTT 客户端建立与 MoP Proxy 的连接。
  2. MoP Proxy 向 Pulsar 集群发送查找请求,确定当前 Topic 的 owner broker 的 URL 地址。
  3. Pulsar 集群将 owner broker 的 URL 地址返回给 MoP Proxy。
  4. MoP Proxy 建立与 Topic 所在的 owner broker 的连接,并开始在 MQTT 客户端和 Topic 所在的 owner broker 之间传输数据。

目前,MoP Proxy 以插件的方式与 Pulsar broker 一起运行。用户可以通过修改配置来开启 Proxy。

有关详细信息,可以参考:https://github.com/streamnative/mop#how-to-use-proxy 。

开始使用 MoP

MoP 是一个开源项目,采用 Apache License V2。

下载 MoP 协议处理插件最新发布版本,开始使用 MoP 协议处理插件:https://github.com/streamnative/mop/releases/ 。

关于如何使用 MoP 协议处理插件,可以参考文档:https://github.com/streamnative/mop/blob/master/README.md 。

如果在使用中遇到任何问题,可以在 MoP 仓库中提交 issue,我们会在第一时间回应。同时,我们也欢迎你为 MoP 贡献特性:https://github.com/streamnative/mop/issues 。