Event 框架介绍及开发
OpenNJet Event 框架介绍
OpenNJet 实现了Copilot: Ctrl, 在Ctrl 中加载不同的动态模块,负责对外提供HTTP API 接口, 各个API模块中会调用 SendMsg 模块提供的函数进行消息的发送,及设置回调函数(RR消息)。 SendMsg 模块是MQTT协议的客户端,连接到服务端(Copilot: broker)。动态配置相关的数据将发送到 /worker_a/# 主题。
OpenNJet 中提供的KV模块(静态编译)对外提供了注册函数, 业务模块在模块初始化时,设置对应主题的回调函数,当KV 模块收到相应主题的消息后,将触发回调。KV模块是MQTT协议的客户端,将连接到Copilot: broker 并订阅一些主题。
Privilege Agent 进程中的KV模块订阅了 /worker_a/# 主题, 因此Ctrl 发出的配置数据 /worker_a/{ins, dyn}/{module},Privilege Agent 进程的KV 模块收到消息后,将查询回调函数列表,如果对应的模块有配置回调,就触发业务模块的调用,获取业务模块返回后,将直接使用 rpc_get_handler 获取业务模块的全量配置数据,然后将消息广播到 /dyn/{module} 主题。由于Privilege Agent是消息发送方,不会收到本次发送的消息,而其它所有 worker 进程将收到该消息,并触发业务模块的回调,从而将全量配置同步到所有worker中。
MQTT 协议及工具安装
MQTT介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议。
几个关于MQTT的重要概念:
-
RETAIN 消息 当我们使用MQTT客户端发布消息( PUBLISH )时,如果将RETAIN标志位设置为 true ,那么MQTT服务器会将最近收到的一条RETAIN标志位为 true 的消息保存在服务器端(内存或文件)。 特别注意:MQTT服务器只会为每一个Topic保存最近收到的一条RETAIN标志位为true的消息!
-
RR (Request/Response) 消息 在 MQTT 5.0 中,请求方可以在请求消息中指定一个自己期望的响应主题 (Response Topic)。响应方根据请求内容采取适当的操作后,向请求中携带的响应主题发布响应消息。如果请求方订阅了该响应主题,那么就会收到响应。
-
主题的通配符
MQTT 主题支持以下两种通配符: +和#。
+ :表示单层通配符,例如 a/+ 匹配 a/x 或 a/y 。
# :表示多层通配符,例如 a/# 匹配 a/x 、 a/b/c/d 。
-
客户端订阅时的消费顺序
- 具有相同前缀的消息 能够保证不同层级间消息的发送顺序,较多层级的消息在较少层级消息之后 /prefix/topic /prefix/topic/level_1 /prefix/topic/level_1/level_2 /prefix/topic/level_1/level_2/level_3
- 不同前缀的消息 服务端根据 mqtt 客户端初始化时设置的 topic 顺序,发送消息给客户端
-
共享订阅 MQTT 5.0 引入了共享订阅特性,它使得 MQTT 服务端可以在使用特定订阅的客户端之间均衡地分配消息负载。这表示,当我们有两个客户端共享一个订阅时,那么每个匹配该订阅的消息都只会有一个副本投递给其中一个客户端。
使用共享订阅,需要在订阅时使用遵循以下命名规范的主题:
$share/{Share Name}/{Topic Filter}
其中
$share
是一个固定的前缀,以便服务端知道这是一个共享订阅主题。{Topic Filter}
则是我们实际想要订阅的主题。中间的{Share Name}
是一个由客户端指定的字符串,表示当前共享订阅使用的共享名。很多时候,{Share Name}
这个字段也会被叫作 Group Name 或者 Group ID。
工具安装
mosquitto.org 提供了MQTT 协议的一个实现,OpenNJet 使用的也是标准的mqtt 协议,因此可以使用mosquitto实现的命令行客户端做为测试及调试的工具。
下载mosquitto源码 https://mosquitto.org/files/source/mosquitto-2.0.18.tar.gz, 并编译
make WITH_CJSON=no
make install
安装后 /usr/local/bin/ 目录下将有接下来需要使用到的mosquitto_pub,mosquitto_rr,mosquitto_sub 可执行程序。
工具使用
默认njet 的配置中,broker将监听unix sock:/usr/local/njet/data/mosquitto.sock, 因此启动njet 后,可以用相应的命令行工具进行消息的收发测试。
-
发送消息 使用mosquitto_pub 命令发送消息, –unix 参数指定文件系统中的unix sock文件, -t 指定发送的主题, -m 指定消息报文, -r 表示是Retain消息
mosquitto_pub --unix /usr/local/njet/data/mosquitto.sock -t "/mytest/a" -m 'testmsg2' -r
-
订阅消息
使用mosquitto_sub 命令订阅消息, –unix 参数指定文件系统中的unix sock文件, -t 指定订阅的主题,-F 指定格式
mosquitto_sub --unix /usr/local/njet/data/mosquitto.sock -F "%t : %p" -t "/mytest/#"
- RR 消息
使用mosquitto_rr 命令发送rr 类型消息, –unix 参数指定文件系统中的unix sock文件, -t 指定订阅的主题, -m 指定消息报文,-e 指定响应的主题名
mosquitto_rr --unix /usr/local/njet/data/mosquitto.sock -t "/rpc/njt_http_kv_module" \
-m "get" -e myresp
各模块说明
Copilot Broker 说明
Copilot Broker 是MQTT的服务端,提供了消息接收,持久化存储及消息分发的功能,相关的配置在Copilot 章节做了简单说明。这边说明一些关键点,及如何查看broker输出的日志。
-
监听方式 MQTT 服务端支持监听TCP端口,及Unix Socket, 默认不配置的时候使用 Unix Socket 方式,socket 文件为 $prefix/data/mosquitto.sock, 由于KV 模块及SendMsg模块做为消息的客户端都需要连接到broker,不建议更改服务端的监听方式。如果更改broker监听方式为TCP端口方式,kv 模块及sendmsg 模块的配置文件也需要做相应修改。
-
调整日志级别 使用helper指令配置broker 时,不指定配置文件,或配置文件内容为空时,所有配置项将使用默认值。只有error 级别的日志会输出到 logs/mosquitto.log 文件中。
例如helper 指令中设置broker 使用 conf/mqtt.conf 作为配置文件。
helper broker modules/njt_helper_broker_module.so conf/mqtt.conf;
conf/mqtt.conf 配置文件中设置 debug 级别的日志也进行输出
log_type error log_type debug
查看日志文件,可以看到各个worker 已经连接到broker,并且订购了一些主题。相关主题的详细说明在接下来的KV模块说明中进行描述。KV 模块连接服务端时使用的client_id 是 ${node_name}w${pid}, 其中${node_name}是配置文件中配置指令node_name设置的字符串,没有配置时取默认值"def_n"。
SendMsg 模块说明
提供消息发送的功能,编译为动态模块,目前在 Copilot Ctrl 中加载该模块。
在控制面配置文件njet_ctl.conf 的 main block 中加载该模块:
load_module modules/njt_http_sendmsg_module.so;
配置
dyn_sendmsg_conf
Syntax: | dyn_sendmsg_conf path/off; |
---|---|
Default: | dyn_sendmsg_conf; |
Context: | http |
说明:开启或关闭sendmsg 模块功能, 默认开启。
dyn_sendmsg_rpc_timeout
Syntax: | dyn_sendmsg_rpc_timeout time; |
---|---|
Default: | 2s |
Context: | http |
说明:配置rpc 超时时间。
SendMsg模块配置文件说明
配置项 | 必须修改 | 配置说明 |
---|---|---|
broker_addr | 否 | MQ broker 地址. 默认:unix:$PREFIX/data/mosquitto.sock |
log_type`` | 否`` | 日志级别: debug, error, warning, notice, information,不同的日志级别需要单独配置一行。 默认:error |
keepalive | 否 | 给服务端发送PING命令的间隔时间. 默认:30 |
kv_store_dir | 否 | Key value store 持久化文件路径. 默认:$PREFIX/data |
protocol_version`` | 否 | 使用的协议版本,要支持 request response消息,必须填 5 . 默认:5 |
API
使用SemdMsg模块提供API,需include 头文件njt_http_sendmsg_module.h
消息发送
#define RPC_RC_OK 0
#define RPC_RC_TIMEOUT 1
struct njt_dyn_rpc_res_s
{
int session_id;
void *data;
int rc;
};
typedef struct njt_dyn_rpc_res_s njt_dyn_rpc_res_t;
typedef int (*rpc_msg_handler)(njt_dyn_rpc_res_t* res, njt_str_t *msg);
//发送 RR 消息,收到response 后,调用设置的回调函数rpc_msg_handler
int njt_dyn_rpc(njt_str_t *topic, njt_str_t *request, int retain_flag, int session_id, rpc_msg_handler handler, void *data);
//发送普通消息,retain_flag 表示消息是否进行持久化
int njt_dyn_sendmsg(njt_str_t *topic, njt_str_t *content, int retain_flag);
KV Store 键值操作
int njt_dyn_kv_get(njt_str_t *key, njt_str_t *value);
int njt_dyn_kv_set(njt_str_t *key, njt_str_t *value);
int njt_dyn_kv_del(njt_str_t *key);
KV 模块 说明
KV 模块是静态编译到NJet 可执行文件中的,提供消息回调的注册,消息发送及KV Store 键值设置功能。 默认方式开启,可以通过配置指令指定配置文件或关闭KV 功能。
配置
Syntax: | dyn_kv_conf path/off; |
---|---|
Default: | dyn_kv_conf; |
Context: | http |
说明:如果使用配置指令指定了配置文件,配置文件必须存在,可以是空文件,未配置的配置项将使用默认配置
KV配置文件说明
配置项 | 必须修改 | 配置说明 |
---|---|---|
broker_addr | 否 | MQ broker 地址。默认: unix:$PREFIX/data/mosquitto.sock |
log_type | 否 | 日志级别: debug, error, warning, notice, information,不同的日志级别需要单独配置一行。默认: error |
topic | 否 | Worker 进程默认订阅的主题:/ins/srv/# 用于动态server的指令式API /ins/loc/# 用于动态location的指令式API /ins/ssl/# 用于动态ssl的指令式API /dyn/# 用于动态 location 更新 /worker_n/# (n是worker 的id) 用于向指定worker 发送消息,一般是向 worker_0 发消息 $share/njet//rpc/# 用于发送rpc消息, 只有一个worker 会处理,用于获取配置Helper 进程没有订阅默认主题,需要单独配置。Privilege Agent进程配置的主题 / ins /{srv,loc, ssl /# , /worker_a/#, /dyn/#, |
keepalive | 否 | 给服务端发送PING命令的间隔时间。 默认:30 |
kv_store_dir | 否 | Key value store 持久化文件路径。 默认: $PREFIX/data |
protocol_version`` | 否 | 使用的协议版本,要支持 request response消息,必须填 5 。 默认:5 |
API
使用KV 提供API,需include 头文件njt_http_kv_module.h
消息回调注册
typedef int (*kv_change_handler)(njt_str_t *key, njt_str_t *value, void *data);
// u_char * 字符串在回调函数中使用malloc分配,由调用方负责释放, 返回的字符串长度需要通过len 变量进行设置
typedef u_char *(*kv_rpc_handler)(njt_str_t *topic, njt_str_t *request, int *len, void *data);
typedef enum {
NJT_KV_API_TYPE_DECLATIVE=0,
NJT_KV_API_TYPE_INSTRUCTIONAL
} njt_kv_api_type_e;
struct njt_kv_reg_handler_s {
njt_str_t *key;
kv_change_handler handler; // 非 rr 消息,全量配置更新的回调函数
kv_rpc_handler rpc_get_handler; // 获取全量配置
kv_rpc_handler rpc_put_handler; // rr 消息更新的回调函数
void *data;
njt_kv_api_type_e api_type;
};
typedef struct njt_kv_reg_handler_s njt_kv_reg_handler_t;
int njt_kv_reg_handler(njt_kv_reg_handler_t *handler_t);
消息发送
int njt_kv_sendmsg(njt_str_t *topic, njt_str_t *content, int retain_flag);
KV Store 键值操作
int njt_db_kv_get(njt_str_t *key, njt_str_t *value);
int njt_db_kv_set(njt_str_t *key, njt_str_t *value);
int njt_db_kv_del(njt_str_t *key);
注册代码示例
staticnjt_int_ttest_msg_init_worker(njt_cycle_t *cycle)//需要在init_process 阶段执行
{
njt_str_t topic_key = njt_string("topickey"); //注册时, /[dyn |ins]/topickey/... 主题的,只需要提供第二段的关键字 topickey
njt_kv_reg_handler_t h;
njt_memzero(&h, sizeof(njt_kv_reg_handler_t));
h.key = &topic_key ;
h.rpc_get_handler = njt_test_get_handler;
h.rpc_put_handler = njt_test_put_handler;
h.handler = njt_test_handler;
h.api_type = NJT_KV_API_TYPE_DECLATIVE;
njt_kv_reg_handler(&h);
return NJT_OK;
}
事件框架常见问题
如何关闭事件框架
从架构图上,可以看到 Copilot: Ctrl, Copilot: broker, Privileged Agent, KV 模块共同构成了整个事件处理的框架,要关闭事件处理框架:
- 不启动 Copilot
- privileged_agent 配置指令设置成 off。
- dyn_kv_conf 配置指令设置成 off。
。。。
privileged_agent off;
http {
dyn_kv_conf off;
。。。
}
。。。
如何跟踪消息流向,定位问题
要跟踪消息的流向,可以使用上文提到的 mosquitto_sub 工具,订阅所有的主题或指定主题,然后结合broker 输出的日志,进行查看。
mosquitto_sub --unix /usr/local/njet/data/mosquitto.sock -F "%t : %p" -t "/#"
假设我们需要查看http_log 模块的GET 请求
curl localhost:8081/config/2/config/http_log
查询后返回的数据截图如下:
mosquitto_sub 返回:
logs/mosquitto.log 日志 (需要打开debug 级别)