发布于 2015-07-09 09:00:51 | 236 次阅读 | 评论: 0 | 来源: 网友投递
emqttc Erlang 的 MQTT 客户端
emqttc 是一个 Erlang 的 MQTT 客户端,支持 MQTT V3.1/V3.1.1 协议规范。支持并行连接和连接自动恢复。要求 Erlang R17+.
emqttd 0.9.0-alpha版本正式发布,该版本改进内容包括:
MQTT连接会话(Session)管理
每个MQTT客户端连接,不管是否持久的(Persistent),都启动一个连接会话进程。由该会话进程管理:
客户端的全部订阅(Subscription)。
由服务器发动到客户端的,已发送未确认的Qos1/2消息。
从客户端发送到服务端,为接收到PUBREL消息的QoS2消息。
客户端离线时,保存离线的Qos1/2消息。
可选设置,保存离线的QoS0消息。
MQTT客户端连接可以resume在其他集群节点上的会话(Session)。
消息队列(Message Queue)和飞行窗口(Inflight Window)
每个MQTT会话创建一个简单的内存消息队列,和一个正在处理消息的飞行窗口。
设计如下:
|<----------------- Max Len ----------------->| ----------------------------------------------- IN -> | Pending Messages | Inflight Window | -> Out ----------------------------------------------- |<--- Win Size --->|
飞行窗口(Inflight Window)保存当前正在发送未确认的Qos1/2消息。窗口值越大,吞吐越高;窗口值越小,消息顺序越严格。
当客户端离线或者飞行窗口(Inflight Window)满时,消息缓存到队列。
如果消息队列满,先丢弃Qos0消息,或者丢弃最早进入队列的消息。
Name | Type | Description |
---|---|---|
client.connected | foreach | Run when client connected successfully |
client.subscribe | foldl | Run when client subscribe topics |
client.unsubscribe | foldl | Run when client unsubscribe topics |
message.publish | foldl | Run when message is published |
message.acked | foldl | Run when message is acked |
client.disconnected | foreach | Run when client is disconnnected |
每一条QoS1/2消息,分配一个全局唯一的、时间序列的消息ID,用于端到端的消息处理。
PktId <-- --> MsgId <-- --> MsgId <-- --> PktId |<--- Qos --->|<---PubSub--->|<-- Qos -->|
全局唯一消息ID结构:
-------------------------------------------------------- | Timestamp | NodeID + PID | Sequence | |<------- 64bits ------->|<--- 48bits --->|<- 16bits ->| --------------------------------------------------------
64bits时间戳: erlang:system_time if Erlang >= R18, otherwise os:timestamp
Erlang节点ID: 编码为2字节
Erlang进程PID: 编码为4字节
进程内部序列号: 2字节的进程内部序列号
4.4章节的消息发布重传 - Message delivery retry(#166)
4.6章节的消息发布顺序保证 - Message ordering(#167)
emqttd_alarm模块可以发布JSON格式告警消息到'$SYS/brokers/+/alarms/#'的系统Topic
合并emqtt, emqttd应用,调整代码结构,以便其他Erlang项目嵌入emqttd消息服务器。