1、下载 emq 源码
emq 使用 3.0 以上版本(emq 2.X 版本称为 emqttd,3.X 版本称为 emqx),并且自己编译源码(方便做扩展性修改)。git 地址:https://github.com/emqx/emqx-rel.git。
emq 的编译依赖于 Erlang
环境(emqx 依赖于 Erlang R21.2+ 版本,emqttd 依赖于 Erlang R20+ 版本)。Erlang 安装: http://www.erlang.org/。
源码下载完成后,进入 emqx-rel 主文件夹,然后直接 make 编译。编译完成后的文件结构如下:
由于 emq 文件结构过于复杂,因此只介绍需要用到的部分。可执行的命令以及配置文件位于_rel/emqx 中,插件位于 deps 中。
2、emq 配置、运行和部署
进入 emqx-rel/_rel/emqx/bin
文件夹下,用 emqx start
命令即可运行 emq,用 emqx stop
课停止 emq, emqx console
进入控制台运行模式,在此模式下可看见诸多类似设备连接、服务器报错等信息。
emq 的配置文件是 emqx-rel/_rel/emqx/etc/emqx.conf
,下面介绍配置文件中比较重要的部分。
node.name=emqx@192.168.4.31
服务器的节点名,需要 emqx@host
的格式,注意 host 中最好使用本机真实 ip,尤其是在集群配置中,否则会出现预期之外的错误。并且,emq 启动之后不要修改此配置,确保 emq 开启和关闭的时候此配置必须相同,否则会出现无法关闭 emqx 的错误(就算一开始写的 127.0.0.1
,然后启动之后发现错了,也要先关闭 emqx 再修改配置!)。
listener.tcp.external = 0.0.0.0:1885
: emq 对外监听的 tcp 端口,默认 1883。
listener.ssl.external = 8883
: emq 监听的 ssl 端口,默认 8883。
listener.ssl.external.keyfile = etc/certs/key.pem
: 服务器的私钥(关于 ssl 认证的具体介绍见另一篇文档)。
listener.ssl.external.certfile = etc/certs/cert.pem
: 服务器的公钥。
listener.ssl.external.cacertfile = etc/certs/cacert.pem
: ca 证书。
listener.ssl.external.verify = verify_peer
: 开启双向认证。
listener.ssl.external.fail_if_no_peer_cert = true
: 如果客户端证书错误,则禁止连接。
如果开启单向认证,只需要配置服务器公钥和私钥,如果开启双向认证,则需要配置 ca、开启双向认证、并禁止客户端无证书连接。
另外,在 emqx-rel/_rel/emqx/etc/acl.conf
中,有一处配置需要注意。
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
%%{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.
这里对 emq 客户端的权限作了一些配置,对某些 topic 禁止客户端订阅。我们需要将第三行的配置注释掉,否则客户端无法订阅 $SYS/#
的 topic(具体作用见下文)。
3、emq 集群部署
emq 提供了多种去中心化的集群部署方案:
在 emqx-rel/_rel/emqx/etc/emqx.conf
中可以配置集群的创建方式,在此介绍一下手动创建集群的方法。
cluster.discovery = manual
: 开启手动创建集群模式。
启动两台 emqx
节点(物理节点,非逻辑节点),假设两个节点分别为 emqx@node1
和 emqx@node2
,在任意一台节点(以 node1
为例)上,执行命令:emqx-rel/_rel/emqx/bin/emqx_ctl clsuter join emqx@node2
节点退出集群:mqx-rel/_rel/emqx/bin/emqx_ctl cluster leave
4、emq 的管理和监控
emq
提供了方便用户管理监控的控制台、api 及系统消息。控制台可直接访问 http://{host}:18083
,默认用户名 admin
,密码 public
。
此处是插件功能和 emq 提供的 HTTP API 列表,可直接在控制台中进行配置和插件的开关,emq 支持插件的热加载。
除此之外,emq 提供了一些用于监控 broker 状态的系统主题 $SYS。其中,我们的上下线通知中用到了下列两个 topic:
但是此处的上线通知流程存在问题,emq broker 会在客户端尝试连接(鉴权)时进行上线通知,而不是连接成功之后通知。还有一种解决方案是用插件进行通知,但这样会使用 http 同步调用,代价较大,因此暂时还是用系统消息进行异步通知。
5、插件的编写
插件位于 emqx-rel/deps 中,其中有若干已经由官方写好的插件,可以直接修改配置文件或直接在控制台中进行配置。
如果需要自己写插件,emq 也提供了插件模板,在 emqx-rel/deps/emqx_plugin_template
中。打开 emqx-rel/deps/emqx_plugin_template/src/emqx_plugin_template.erl
,其中可以看到 emq 提供了一些钩子:
emqx:hook('client.authenticate', fun ?MODULE:on_client_authenticate/2, [Env]),
emqx:hook('client.check_acl', fun ?MODULE:on_client_check_acl/5, [Env]),
emqx:hook('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqx:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
emqx:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
emqx:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqx:hook('session.resumed', fun ?MODULE:on_session_resumed/3, [Env]),
emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqx:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqx:hook('session.terminated', fun ?MODULE:on_session_terminated/3, [Env]),
emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]),
emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, [Env]).
当达成这些条件事,会触发钩子函数,用户可以自己定义钩子函数,例如**目前鉴权所用到的:
on_client_authenticate(Credentials = #{client_id := ClientId, password := Password, username := Username}, _Env) ->
io:format("Client(~s) authenticate, Password:~p ~n, Username:~p ~n", [ClientId, Password, Username]),
inets:start(),
{ok, {{Version, 200, ReasonPhrase}, Headers, Body}} = httpc:request(string:join(["http://www.example.com/api/mqtt/authenticate/login?username=", binary_to_list(base64:encode(binary_to_list(Username))), "&password=", binary_to_list(Password), "&clientId=", binary_to_list(base64:encode(binary_to_list(ClientId)))], "")),
if
Body == "0" ->
{stop, Credentials#{auth_result => success}};
true ->
{stop, Credentials#{auth_result => fail}}
end.
插件编写完成后,需要到主目录 emqx-rel 中重新 make。注意,make
之后配置文件会重新生成,因此 make 之前最好备份一下配置文件。然后启动 emq,到控制台中开启插件即可。
6、管理监控 API
emq 提供了一些 api,可以获取服务器的一些信息。
首先在 App 中新增用户
然后通过 http://host:8080/{api}
访问,访问需要 http basic authentication
,用刚刚创建的用户可以访问。
7、emq 使用过程中可能遇到的问题及注意事项
1)emqx start
指令后,卡住不动,没有出现 start successfully
的字样,可能是端口被其他进程占用。emq 会占用的端口如下:
这些端口都可以在配置文件中自己修改,但是只要有一个冲突就无法正常启动。
2)emqx stop
时,显示 emqx@xxx no response
字样,检查配置文件中的节点名是否正确。
3)在开启集群模式时,如果要关闭一个节点,最好先手动让该节点退出集群,否则下一次启动该节点时可能出现无法加入集群的情况(emq 会记住集群信息,重启时会自动恢复之前的集群状态,但这个过程经常会出现一些问题,所以最好的办法就是手动退出集群,然后重启之后再手动加入集群)。
4)emq 重启之后,会自动启动上一次启动过的插件,但是配置会回复初始化(除非修改配置文件)。
5)emq 的启动进程为 beam.smp
,守护进程为 epmd
,但是 Erlang 的进程结构比较奇怪,所以直接 kill
掉上述进程之后,emq 可能还在运行中。所以最好执行 emqx stop
指令来停止 emq。
8、部署架构
9、后记
在官方文档中有更为全面的说明和指南,本文只列出了部分实际用到的比较重要的功能。如果后续有扩展或本文中不详细之处,可参加官方文档:https://developer.emqx.io/docs/broker/v3/cn/getstarted.html
另附:插件编写指南 https://www.cnblogs.com/wunaozai/p/8067621.html
官方文档对于部分内容也并不十分详细,本文对实际中遇到的问题和注意事项作了部分补充。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于