Http2mqtt

1.功能描述

能够接收http1.1 post消息,通过连接mqtt broker, 把数据发送给mqtt broker。

2.依赖模块

njet.conf:

load_module modules/njt_http_mqtt_module.so;

3.指令说明

3.1 upstream块指令

mqtt_server

Syntax mqtt_server ip[:port] user={user} password={password}
Default -
Context upstream

mqtt upstream server 配置,匿名的不需要配置user和password,配置了也实际不使用

mqtt_keepalive

Syntax mqtt_keepalive off | max=count
Default max=10
Context upstream

keep_alive配置: max: 表示最多保持多少长连接

mqtt_retry_times

Syntax mqtt_retry_times {num}
Default 1,默认尝试一次
Context upstream

当前mqtt broker不可用时,尝试寻找下一个broker的次数,默认是1,尝试一次

mqtt_send_buffer_size

Syntax mqtt_send_buffer_size {size}
Default 4M
Context upstream

每个mqtt连接使用的发送队列缓存大小,默认4M,如果发送很长的消息或者频繁发送消息,应该设置该值比较足够大一点

mqtt_recv_buffer_size

Syntax mqtt_recv_buffer_size {size}
Default 1M
Context upstream

每个mqtt连接使用的接收数据缓存大小,默认1M,一般够用,mqtt主要是发送数据给mqtt broker,接收数据主要是一些状态数据等,所以不需要太大缓存

mqtt_ping_time

Syntax mqtt_ping_time {time}
Default 5s
Context upstream

mqtt broker发送ping包间隔,默认5s

mqtt_read_timeout

Syntax mqtt_read_timeout {timeout}
Default 30s
Context upstream

从Mqtt broker 接收消息超时时间

3.2 location块指令

mqtt_topic

Syntax mqtt_topic {topic_name}
Default -
Context upstream

配置发送mqtt消息的topic

mqtt_retain

Syntax mqtt_retain [0|1]
Default 0, 不保留
Context upstream

对应的topic的消息是否保留最后一条消息

0: 不保留,默认值

1:保留

mqtt_qos

Syntax mqtt_qos [0|1|2]
Default 0
Context upstream

MQTT 设计了 3 个 QoS 等级。

QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。

QoS 1:消息传递至少 1 次。

QoS 2:消息仅传送一次。

QoS 0 是一种 “fire and forget” 的消息发送模式:Sender (可能是 Publisher 或者 Broker) 发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。

QoS 1 包含了简单的重发机制,Sender 发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。

QoS 2 设计了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次

mqtt_pass

Syntax mqtt_pass {upstream}
Default -
Context NJT_HTTP_LOC_CONF|NJT_HTTP_LIF_CONF|NJT_CONF_TAKE1

配置pass 到的mqtt upstream

4.配置样例

4.1 mqtt 实例配置

...
load_module modules/njt_http_mqtt_module.so;      //加载mqtt模块
...

http {
        client_max_body_size 1G; //如果mqtt消息超过1M,需要将此配置设置大一点
       
        upstream mqtt_upstream {
                mqtt_server 192.168.40.136:1885 user=test password=112233;
                mqtt_server 192.168.40.136:1884 user=test password=112233;
                #mqtt_keepalive off;     //默认是开启长连接,如果不用,设置为off
                #mqtt_keepalive max=10;  //默认开启长连接,max默认为10
                //每个链接使用的发送缓冲区大小,默认4M,如果是长消息或者短时间大量消息,则要将次缓存设置足够大
                #mqtt_send_buffer_size 10M;    
                mqtt_recv_buffer_size 1M;      //每个链接使用的接收缓冲区大小,默认1M
                mqtt_ping_time 10s;            //ping 消息发送间隔,默认5s
                mqtt_read_timeout 10s;
                #mqtt_retry_times 2;           //next upstream尝试次数,默认为1
        }

     server {
        listen       80;
        location /mqtt {
              mqtt_topic /test/mqtt;    //需要配置发送的topic
              mqtt_retain 1;            //保留topic的最后一条消息,默认为0,不保留
              mqtt_qos 0;               //mqtt qos级别,默认是0

              client_body_buffer_size 10M; //如果mqtt消息比较大,将此buffer设置大一点

              //目前只支持mqtt upstream, 不支持ip或者unix socket等
              mqtt_pass mqtt://mqtt_upstream;
        }
     }
}

4.2 mqtts 实例配置

...
load_module modules/njt_http_mqtt_module.so;      //加载mqtt模块
...

http {
        client_max_body_size 1G; //如果mqtt消息超过1M,需要将此配置设置大一点
       
        upstream mqtt_upstream {
                mqtt_server 192.168.40.136:1885 user=test password=112233;
                mqtt_server 192.168.40.136:1884 user=test password=112233;
                #mqtt_keepalive off;     //默认是开启长连接,如果不用,设置为off
                #mqtt_keepalive max=10;  //默认开启长连接,max默认为10
                //每个链接使用的发送缓冲区大小,默认4M,如果是长消息或者短时间大量消息,则要将次缓存设置足够大
                #mqtt_send_buffer_size 10M;    
                mqtt_recv_buffer_size 1M;      //每个链接使用的接收缓冲区大小,默认1M
                mqtt_ping_time 10s;            //ping 消息发送间隔,默认5s
                mqtt_read_timeout 10s;
                #mqtt_retry_times 2;           //next upstream尝试次数,默认为1
        }

     server {
        listen       80;
        location /mqtt {
              mqtt_topic /test/mqtt;    //需要配置发送的topic
              mqtt_retain 1;            //保留topic的最后一条消息,默认为0,不保留
              mqtt_qos 0;               //mqtt qos级别,默认是0

              client_body_buffer_size 10M; //如果mqtt消息比较大,将此buffer设置大一点

              //目前只支持mqtt upstream, 不支持ip或者unix socket等
              mqtt_pass mqtt://mqtt_upstream;
        }
     }
}

5.调用样例

基于mosquitto相关工具进行测试

安装mosquitto

yum install mosquitto

5.1 匿名测试

配置匿名mqtt broker

编辑/etc/mosquitto/mosquitto.conf,

将allow_anonymous 配置为true, 也可不配置,默认为true,允许匿名访问

allow_anonymous true       #此处配置匿名访问, 不配置也可,默认就是true
  • Mosquito broker启动
mosquitto -c /etc/mosquitto/mosquitto.conf -p 1884
  • 消费者启动
mosquitto_sub -p 1884  -F '%t : %p' -t "#"

njet.conf配置

...
load_module modules/njt_http_mqtt_module.so;   #加载http2mqtt模块
...

http{
...   
     upstream mqtt_upstream {
                mqtt_server 192.168.40.136:1884;  #不配置用户名和密码
                mqtt_keepalive max=10;
                mqtt_send_buffer_size 10M;
                mqtt_recv_buffer_size 1M;
                mqtt_ping_time 10s;
     }

     server {
        listen       80;
        location /mqtt {
              mqtt_topic /test/mqtt;   #配置topic
              mqtt_retain 1;           #保留消息
              mqtt_pass mqtt_upstream;
        }
     }
...
}

正常消息发送与接收

curl 发送请求
curl -X POST http://localhost:80/mqtt -d '==========1' -v
查看消费者,正确收到消息
[root@CDN157 test_mqtt]$ mosquitto_sub -p 1884  -F '%t : %p' -t "#" 
/test/mqtt : ==========1

5.2 用户名密码测试

配置带用户名密码验证的mqtt broker

  • mosquitto配置改为非匿名

编辑/etc/mosquitto/mosquitto.conf,

将allow_anonymous 配置为false, 默认为true,允许匿名访问

同时配置密码文件路径

allow_anonymous false       #此处关闭匿名访问
password_file  /etc/mosquitto/password_file    #此处配置密码文件路径
  • mosquito生成用户名密码(使用mosquitto_passwd工具)
#配置用户名为admin 密码为admin, 第一个用户配置需要指定-c, 密码会让输入一遍确认一遍
mosquitto_passwd -c /etc/mosquitto/password_file admin

#去查看密码文件发现有两个用户
cat /etc/mosquitto/password_file
admin:$6$iVtan6BFKlJCOb4y$c6tz/+NCaulkIPh9wOfO9Ba2pmg4pZOG8kuQNHfe7ViHzuqZczsS90w38M9QvflBamDOzxKtRnnL//QnHGW4Lw==
  • Mosquito broker启动(带用户名密码验证)
mosquitto -c /etc/mosquitto/mosquitto.conf -p 1884
  • 消费者启动(指定用户名密码admin/admin)
mosquitto_sub -p 1884  -F '%t : %p' -t "#" -u admin -P admin

njet.conf配置

...
load_module modules/njt_http_mqtt_module.so;   #加载http2mqtt模块
...

http{
...   
     upstream mqtt_upstream {
                mqtt_server 192.168.40.136:1884 user=test password=112233;  #使用用户名密码访问
                mqtt_keepalive max=10;
                mqtt_send_buffer_size 10M;
                mqtt_recv_buffer_size 1M;
                mqtt_ping_time 10s;
     }

     server {
        listen       80;
        location /mqtt {
              mqtt_read_timeout 20s;
              mqtt_topic /test/mqtt;
              mqtt_pass mqtt_upstream;
        }
     }
...
}

正常消息发送与接收

curl 发送请求
curl -X POST http://localhost:80/mqtt -d '==========1' -v
查看消费者,正确收到消息
[root@CDN157 test_mqtt]$ mosquitto_sub -p 1884  -F '%t : %p' -t "#"  -u admin -P admin
/test/mqtt : ==========1