动态协议server
1. 功能描述
拓展OpenNJet Stream模块,实现对特定协议的解析。提供脚本逻辑接口以及函数,实现不限于动态协议分析,上下游数据分析处理,自定义路由算法,等代理功能,以及通用web server和ws server的功能。
2. 依赖配置
配置动态协议功能需要在njet.conf中配置:
http {
dyn_kv_conf conf/iot-work.conf;
}iot-work.conf中需要新增订阅topic:
topic $share/njet//${CLUSTER_NAME}_${NODE_NAME}/stream_proto/disconnect
will_topic /${CLUSTER_NAME}_${NODE_NAME}/stream_proto/disconnect
will_payload ${PID}
topic /worker_all/${NODE_NAME}/session/#
topic /worker_pid_${PID}/${NODE_NAME}/session/#从4.0.1.0版本开始,proto server模块更改为动态模块,因此使用时需要在njet.conf中配置so,且因为会调用stream动态VS中的函数,所以要优先加载stream动态VS的so:
load_module modules/njt_stream_dyn_server_module.so;
load_module modules/njt_stream_proto_server_module.so;注:优先加载stream动态VS模块,两个模块必须严格按顺序配置。
且需要在ctrl.conf中配置:
load_module modules/njt_stream_dyn_server_api_module.so;3. 指令说明
3.1 proto_server
| Syntax: | proto_server 【on/off】; |
|---|---|
| Default: | |
| Context: | stream server |
proto_server :开启协议server 功能。
3.2 proto_buffer_size
| Syntax: | proto_buffer_size 【size】; |
|---|---|
| Default: | |
| Context: | stream server |
proto_buffer_size :客户端的连接buffer 大小
3.3 proto_server_code_file
| Syntax: | proto_server_code_file 【path】; |
|---|---|
| Default: | 可以多行,加载多个文件 |
| Context: | stream server |
proto_server_code_file :脚本代码的文件路径。
3.4 proto_server_idle_timeout
| Syntax: | proto_server_idle_timeout 【time】; |
|---|---|
| Default: | |
| Context: | stream server |
proto_server_idle_timeout :连接空闲超时时间
3.5 proto_server_client_update_interval
| Syntax: | proto_server_client_update_interval 【time】; |
|---|---|
| Default: | |
| Context: | stream server |
proto_server_client_update_interval :客户端定时帧更新
3.6 proto_server_update_interval
| Syntax: | proto_server_update_interval 【time】; |
|---|---|
| Default: | |
| Context: | stream server |
proto_server_update_interval :服务端定时帧更新
3.7 proto_upstream
| Syntax: | proto_upstream 【on/off】; |
|---|---|
| Default: | off |
| Context: | upstream |
proto_upstream:开启tcc 脚本,upstream 算法支持. 如果请求方server 没有配置tcc 脚本的情况,走默认算法。
3.8 proto_pass
| Syntax: | proto_pass address; |
|---|---|
| Default: | |
| Context: | server |
proto_proxy :协议代理地址,同proxy_pass 指令。
3.9 proto_session_max_mem_size
| Syntax: | proto_session_max_mem_size 【size】; |
|---|---|
| Default: | |
| Context: | server |
proto_session_max_mem_size :发送缓冲区大小
3.10 proto_session_zone
| Syntax: | proto_session_zone zone_name size; |
|---|---|
| Default: | |
| Context: | server |
proto_session_zone 设置多worker 共享内存名,以及大小
zone_name: zone 名字,可以写成业务名
例如:proto_session_zone 0.0.0.0:2225 10M;
特别说明:
proto_server功能跨进程发送需要 启用动态共享内存,并配置proto_session_zone指令。
4. 函数介绍
4.1 script逻辑接口及函数:
-
int proto_server_process_connection(tcc_stream_request_t *r)
- 功能:连接进来,并且获取了客户端的ip+port, 该阶段可以对ip 地址进行过滤。
- 获取ip: r->addr_text
- 参数:
- r:客户端的标识
- 返回值:
- 禁止连接:NJT_STREAM_FORBIDDEN
- 允许连接:NJT_OK
-
int proto_server_process_preread(tcc_stream_request_t *r,tcc_str_t *msg)
- 功能:在该阶段,可以预读取数据, 例如预处理 ssl 协议。
- 参数:
- r:客户端的标识
- msg:获取的数据。msg->data 表示数据的指针。 msg->len 表示数据的长度。
- 返回值:
- 处理完成,或不处理:NJT_DECLINED
- 数据长度不够,等待数据:NJT_AGAIN
- 错误:NJT_ERROR
-
int proto_server_process_message(tcc_stream_request_t *r,tcc_str_t *msg)
-
功能:读取下游客户端,数据的回调函数 参数:
- r:客户端的标识
- msg:获取的数据。msg->data 表示数据的指针。 msg->len 表示数据的长度。
返回值: 处理完成:NJT_OK 继续回调:NJT_AGAIN。 出错:NJT_ERROR, 将关闭客户端。
-
-
int proto_server_process_log(tcc_stream_request_t *r)
功能:日志阶段的回调函数
参数:
r:客户端的标识
返回值:
处理完成:NJT_OK
出错:NJT_ERROR。
-
int proto_server_process_client_update(tcc_stream_request_t *r,tcc_str_t *msg)
功能:定时的客户端回调函数。例如:客户端数据的定时更新。延迟处理情况时会有数据。
参数:
- r:客户端的标识
- msg:获取的数据。msg->data 表示数据的指针。 msg->len 表示数据的长度。
返回值:
处理完成:NJT_OK
出错:NJT_ERROR, 将关闭客户端。
- int proto_server_process_connection_close(tcc_stream_request_t *r)
功能:客户端的异常关闭回调函数。
- int proto_server_init(tcc_stream_server_ctx *srv_ctx)
功能:配置阶段,初始化,绑定server 上下文数据。
- int proto_server_process_init(tcc_stream_server_ctx *srv_ctx)
功能:worker 启动时,初始化资源。
- int proto_server_exit(tcc_stream_server_ctx *srv_ctx)
功能:worker 退出时,释放资源。
- int proto_server_update(tcc_stream_server_ctx *srv_ctx)
功能:服务端的定时回调函数。例如:服务端定时广播逻辑。
- 宏 tcc_get_client_ctx(r, module_id)
功能:客户端会话,获取协议客户端上下文。module_id 可以自定义0 – 127, 目前0 已经使用。
- 宏 tcc_set_client_ctx(r,module_id,c)
功能:客户端会话,设置协议客户端上下文。module_id 可以自定义0 – 127, 目前0 已经使用。
- 宏 tcc_client_get_srv_ctx(r)
功能:客户端会话,获取协议服务端上下文
- 宏 tcc_set_srv_ctx(s)
功能:服务端会话,设置协议服务端上下文
- 宏 tcc_get_srv_ctx(s)
功能:服务端会话,获取协议服务端上下文
- int proto_server_upstream_connection_close(tcc_stream_request_t *r)
功能:upstream 后端server 关闭回调。
参数:
r:客户端的标识
- int proto_server_upstream_message(tcc_stream_request_t *r, tcc_str_t *msg)
功能:读取从upstream 后端server 来的消息回调。
参数:
r:客户端的标识
msg:获取的数据。msg->data 表示数据的指针。 msg->len 表示数据的长度。
返回值:
处理完成:NJT_OK
继续回调:NJT_AGAIN。
出错:NJT_ERROR, 将关闭客户端。
- void *proto_server_check_upstream_peer(tcc_stream_client_upstream_data_t *cli_ups_info)
功能:自定义算法,选择upstream server。
返回值:
NULL:没有可用的server。
非空:返回server 的对象peer。 参考结构体。tcc_stream_client_upstream_data_s
- tcc_stream_request_t *cli_local_find_by_session(tcc_stream_server_ctx *srv_ctx, tcc_str_t *session)
功能:通过session,查找客户端对象。
返回值:
NULL:未找到客户端对象。
非空: 返回客户端对象指针。
4.2 其他辅助函数:
- extern void *proto_malloc(void *ctx, int len)
- 参数:ctx 分配内存的上下文句柄。
- tcc_stream_request_t *ctx, 从客户连接池上分配,需要主动释放,并在客户端断开时自动释放。
- tcc_stream_server_ctx *ctx, 从服务端池分配,需要主动释放,reload 时释放。
功能:申请内存。使用后需要调用proto_free 释放
- extern void *proto_realloc(void *ctx, void *p, int len)
参数:ctx 分配内存的上下文句柄。
tcc_stream_request_t *ctx, 从客户连接池上分配,需要主动释放,并在客户端断开时自动释放。
tcc_stream_server_ctx *ctx, 从服务端池分配,需要主动释放,reload 时释放。
功能:扩充通过proto_malloc,申请的内存,空间到制定的len 大小。
- extern void proto_free(void *ctx, void *p)
参数:ctx 分配内存的上下文句柄。
tcc_stream_request_t *ctx, 从客户连接池上分配,需要主动释放,并在客户端断开时自动释放。
tcc_stream_server_ctx *ctx, 从服务端池分配,需要主动释放,reload 时释放。
功能:释放通过proto_malloc, 或 proto_realloc 申请的内存。
- extern tcc_str_t cli_get_variable(tcc_stream_request_t *r,char *name)
功能:获取字符串变量。
- extern void cli_close(tcc_stream_request_t *r)
功能:关闭客户端连接
-
int cli_set_session(tcc_stream_request_t *r, tcc_str_t *session, tcc_str_t *data)
参数:
r:客户端的标识
session:客户端的session
data: 客户端的数据,nick_name 之类,主要用在共享内存的在线列表。
返回值:
成功:APP_OK
失败:APP_ERROR
功能:设置客户端的,session key,session 值。
- void cli_session_foreach(tcc_stream_server_ctx *server_ctx, njt_proto_session_foreach_pt foreach_handler, void *data)
参数:
server_ctx: 服务server 的ctx;
foreach_handler: 处理遍历对象的函数指针。
data: 处理遍历对象需要的额外数据。
功能:遍历客户端列表。
- int proto_server_send(tcc_stream_request_t *r, char *data, size_t len)
功能:向某个客户端,发送消息。
返回值:
成功:返回发送的字节数。
失败:NJT_ERROR
- int proto_server_send_broadcast(tcc_str_t *sender_session, tcc_stream_server_ctx *srv_ctx, char *data, size_t len)
功能:向server 上所有的client,广播消息。
返回值:
成功:NJT_OK
失败:NJT_ERROR
- int proto_server_send_others(tcc_stream_request_t sender, char data, size_t len)
功能:广播消息,给该server 除了发送者外的,其他用户。
参数:
sender_session: 发送者的session。 不可以为空。
srv_ctx: 服务server 的ctx;
data: 数据的指针。
len: 数据的长度。
返回值:
成功:NJT_OK
失败:NJT_ERROR
- int proto_server_send_upstream(tcc_stream_request_t *r,char *data, size_t len)
功能:发送消息,到upstream server。
返回值:
成功:返回发送的字节数。
失败:NJT_ERROR
等待:NJT_AGAIN
- void proto_server_log(int level, const char *fmt, …)
功能:写日志到文件。
参数:
level 等级: NJT_LOG_DEBUG,NJT_LOG_INFO,NJT_LOG_ERR。
5. 配置样例
5.1 配置tcc_proxy脚本,配置自定义tccproxy路由算法
njet.conf
helper broker modules/njt_helper_broker_module.so conf/mqtt.conf;
helper ctrl modules/njt_helper_ctrl_module.so conf/ctrl.conf;
user root root;
worker_processes 2;
cluster_name helper;
node_name node1;
error_log logs/error.log info;
pid logs/njet.pid;
events {
worker_connections 1024;
}
http {
dyn_kv_conf conf/iot-work.conf;
include mime.types;
access_log logs/access.log;
sendfile on;
keepalive_timeout 65;
upstream backend1 {
zone backend2 128k;
server 127.0.0.1:5999;
}
server {
listen 5555;
server_name localhost;
location / {
proxy_pass http://backend1;
}
}
server {
listen 5999;
location / {
return 200 "test 5999 ok";
}
}
}
stream {
upstream backend_s1 {
zone backend_s1 128k;
server 127.0.0.1:22222;
server 192.168.40.119:22222;
proto_upstream on;
}
upstream backend_s2 {
zone backend_s2 128k;
server 127.0.0.1:5999;
proto_upstream on;
}
server {
listen 22222 ;
listen [::]:22222;
return "test 22222 ok";
}
server {
listen 22223;
preread_buffer_size 1024;
proto_buffer_size 1024;
proto_server on;
proto_server_code_file "conf/test_proto_server.c";
proto_pass backend_s1;
}
}test_proto_server.c
#include <tcclib.h>
#include <njt_tcc.h>
#include <tcc_ws.h>
#include <ctype.h>
#include <string.h>
int proto_server_process_connection(tcc_stream_request_t *r)
{
tcc_str_t ip = njt_string("127.0.0.1");
if (ip.len == r->addr_text->len && memcmp((void *)r->addr_text->data, ip.data,ip.len) == 0)
{
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%V,NJT_STREAM_FORBIDDEN !",r->addr_text);
}
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%V ok!", r->addr_text);
return NJT_OK;
}
int proto_server_process_preread(tcc_stream_request_t *r, tcc_str_t *msg)
{
proto_server_log(NJT_LOG_DEBUG, "2 tcc preread ip=%V ok!",r->addr_text);
proto_server_log(NJT_LOG_DEBUG, "tcc preread msg=%V!",msg);
r->used_len = msg->len;
return NJT_DECLINED;
}
int proto_server_process_log(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "4 tcc log ip=%V ok!",r->addr_text);
return NJT_OK;
}
int proto_server_process_client_update(tcc_stream_request_t *r)
{
return NJT_OK;
}
int proto_server_process_connection_close(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_process_connection_close!");
return NJT_OK;
}
int proto_server_update(tcc_stream_server_ctx *srv_ctx)
{
return NJT_OK;
}
int proto_server_init(tcc_stream_server_ctx *srv_ctx)
{
WSServer *srv_data = proto_malloc(srv_ctx, sizeof(WSServer));
if (srv_data != NULL)
{
tcc_set_srv_ctx(srv_ctx, srv_data);
}
return NJT_OK;
}
void* proto_server_check_upstream_peer(tcc_stream_client_upstream_data_t *cli_ups_info) { //自定义算法
int i;
tcc_uint_t weight;
int mask_len = 7;
tcc_stream_upstream_rr_peer_t *peer_list = cli_ups_info->peer_list;
for(i = 0; i < cli_ups_info->peer_num; i++) {
if(peer_list[i].server->len >= mask_len && cli_ups_info->cli_addr_text->len >= mask_len && njt_memcmp(peer_list[i].server->data,cli_ups_info->cli_addr_text->data,mask_len) == 0) {
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "1 tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
}
if(cli_ups_info->peer_num > 0) {
i = 0;
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "default tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
return NULL; // NJT_BUSY;
}
int proto_server_upstream_connection_close(tcc_stream_request_t *r) {
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_upstream_connection_close!");
}使用curl访问
根据算法匹配上游upstream 192.168.40.119:22222
根据算法匹配上游upstream127.0.0.1:22222
5.2 配置tcc_proxy脚本,脚本中对上下游数据做修改
njet.conf
helper broker modules/njt_helper_broker_module.so conf/mqtt.conf;
helper ctrl modules/njt_helper_ctrl_module.so conf/ctrl.conf;
user root root;
worker_processes 2;
cluster_name helper;
node_name node1;
error_log logs/error.log info;
pid logs/njet.pid;
events {
worker_connections 1024;
}
http {
dyn_kv_conf conf/iot-work.conf;
include mime.types;
access_log logs/access.log;
sendfile on;
keepalive_timeout 65;
upstream backend1 {
zone backend2 128k;
server 127.0.0.1:5999;
}
server {
listen 5555;
server_name localhost;
location / {
proxy_pass http://backend1;
}
}
server {
listen 5999;
location / {
return 200 "test 5999 ok";
}
}
}
stream {
upstream backend_s1 {
zone backend_s1 128k;
server 127.0.0.1:22222;
proto_upstream on;
}
upstream backend_s2 {
zone backend_s2 128k;
server 127.0.0.1:5999;
proto_upstream on;
}
server {
listen 22222 ;
listen [::]:22222;
return "test 22222 ok";
}
server {
listen 22223;
preread_buffer_size 1024;
proto_buffer_size 1024;
proto_server on;
proto_server_code_file "conf/test_proto_server.c";
proto_pass backend_s1;
}
}test_proto_server.c
#include <tcclib.h>
#include <njt_tcc.h>
#include <tcc_ws.h>
#include <ctype.h>
#include <string.h>
int proto_server_process_connection(tcc_stream_request_t *r)
{
tcc_str_t ip = njt_string("127.0.0.1");
if (ip.len == r->addr_text->len && memcmp((void *)r->addr_text->data, ip.data,ip.len) == 0)
{
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%V,NJT_STREAM_FORBIDDEN !",r->addr_text);
}
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%V ok!", r->addr_text);
return NJT_OK;
}
int proto_server_process_preread(tcc_stream_request_t *r, tcc_str_t *msg)
{
proto_server_log(NJT_LOG_DEBUG, "2 tcc preread ip=%V ok!",r->addr_text);
proto_server_log(NJT_LOG_DEBUG, "tcc preread msg=%V!",msg);
r->used_len = msg->len;
return NJT_DECLINED;
}
int proto_server_process_log(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "4 tcc log ip=%V ok!",r->addr_text);
return NJT_OK;
}
int proto_server_process_message(tcc_stream_request_t *r, tcc_str_t *msg) //修改上行数据
{
char *p;
tcc_str_t end_tok = njt_string("\r\n\r\n");
tcc_str_t add_data = njt_string("\r\ntcc-up-set:upset\r\n");
tcc_str_t new_msg;
p = njt_strlcasestrn(msg->data,msg->data + msg->len,end_tok.data,end_tok.len -1);
if (p == NULL) {
proto_server_send_upstream(r,msg->data,msg->len);
} else {
new_msg.data = msg->data;
new_msg.len = p - msg->data;
proto_server_send_upstream(r,new_msg.data,new_msg.len);
proto_server_send_upstream(r,add_data.data,add_data.len);
proto_server_send_upstream(r,end_tok.data,end_tok.len);
}
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_send_upstream msg=%V",msg);
//proto_server_send_upstream(r,new_url.data,new_url.len);
r->used_len = msg->len;
return NJT_OK;
}
int proto_server_process_client_update(tcc_stream_request_t *r)
{
return NJT_OK;
}
int proto_server_process_connection_close(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_process_connection_close!");
return NJT_OK;
}
int proto_server_update(tcc_stream_server_ctx *srv_ctx)
{
return NJT_OK;
}
int proto_server_init(tcc_stream_server_ctx *srv_ctx)
{
WSServer *srv_data = proto_malloc(srv_ctx, sizeof(WSServer));
if (srv_data != NULL)
{
tcc_set_srv_ctx(srv_ctx, srv_data);
}
return NJT_OK;
}
void* proto_server_check_upstream_peer(tcc_stream_client_upstream_data_t *cli_ups_info) {
int i;
tcc_uint_t weight;
int mask_len = 7;
tcc_stream_upstream_rr_peer_t *peer_list = cli_ups_info->peer_list;
for(i = 0; i < cli_ups_info->peer_num; i++) {
if(peer_list[i].server->len >= mask_len && cli_ups_info->cli_addr_text->len >= mask_len && njt_memcmp(peer_list[i].server->data,cli_ups_info->cli_addr_text->data,mask_len) == 0) {
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "1 tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
}
if(cli_ups_info->peer_num > 0) {
i = 0;
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "default tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
return NULL; // NJT_BUSY;
}
int proto_server_upstream_connection_close(tcc_stream_request_t *r) {
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_upstream_connection_close!");
}
int proto_server_upstream_message(tcc_stream_request_t *r, tcc_str_t *msg){ //修改下行数据
u_char *p;
tcc_str_t end_tok = njt_string("\r\n\r\n");
tcc_str_t add_data = njt_string("\r\ntcc-down-set: downset");
tcc_str_t new_msg;
tcc_str_t old_end_msg;
proto_server_log(NJT_LOG_DEBUG, "tcc from upstream msg=%V",msg);
p = njt_strlcasestrn(msg->data,msg->data + msg->len,end_tok.data,end_tok.len - 1);
if (p == NULL) {
proto_server_send(r,msg->data,msg->len);
} else {
new_msg.data = msg->data;
new_msg.len = p - msg->data;
old_end_msg.data = p;
old_end_msg.len = msg->data + msg->len - old_end_msg.data;
proto_server_send(r,new_msg.data,new_msg.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client msg1=%V",&new_msg);
proto_server_send(r,add_data.data,add_data.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client msg2=%V",&add_data);
proto_server_send(r,old_end_msg.data,old_end_msg.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client len=%d,msg3=%V",old_end_msg.len,&old_end_msg);
}
r->used_len = msg->len;
return NJT_OK;
}使用curl发送
添加http header,追加upset和downset
日志中有操作记录
5.3 Ws server,使用文本模式发送数据
njet.conf
helper broker modules/njt_helper_broker_module.so conf/mqtt.conf;
helper ctrl modules/njt_helper_ctrl_module.so conf/ctrl.conf;
user root root;
worker_processes 2;
cluster_name helper;
node_name node1;
error_log logs/error.log info;
pid logs/njet.pid;
events {
worker_connections 1024;
}
http {
dyn_kv_conf conf/iot-work.conf;
include mime.types;
access_log logs/access.log;
sendfile on;
keepalive_timeout 65;
server {
listen 5999;
location / {
return 200 "test 5999 ok";
}
}
}
stream {
server {
listen 22223;
preread_buffer_size 1024;
proto_buffer_size 1024;
proto_server on;
proto_server_code_file "conf/ws_proto_server.c";
proto_server_code_file "conf/tcc_ws.c";
}
}ws_proto_server.c
#include <tcclib.h>
#include <njt_tcc.h>
#include <tcc_ws.h>
#include <ctype.h>
// global vari
int max_frm_size = 6553500;
// ws_headers headers;
typedef struct
{
uint32_t state[5];
uint32_t count[2];
uint8_t buffer[64];
} SHA1_CTX;
#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits))))
/* blk0() and blk() perform the initial expand. */
/* I got the idea of expanding during the round function from SSLeay */
#ifdef LITTLE_ENDIAN
#define blk0(i) (block->l[i] = (rol(block->l[i], 24) & 0xFF00FF00) | (rol(block->l[i], 8) & 0x00FF00FF))
#else
#define blk0(i) block->l[i]
#endif
#define blk(i) (block->l[i & 15] = rol(block->l[(i + 13) & 15] ^ block->l[(i + 8) & 15] ^ block->l[(i + 2) & 15] ^ block->l[i & 15], 1))
/* (R0+R1), R2, R3, R4 are the different operations used in SHA1 */
#define R0(v, w, x, y, z, i) \
z += ((w & (x ^ y)) ^ y) + blk0(i) + 0x5A827999 + rol(v, 5); \
w = rol(w, 30);
#define R1(v, w, x, y, z, i) \
z += ((w & (x ^ y)) ^ y) + blk(i) + 0x5A827999 + rol(v, 5); \
w = rol(w, 30);
#define R2(v, w, x, y, z, i) \
z += (w ^ x ^ y) + blk(i) + 0x6ED9EBA1 + rol(v, 5); \
w = rol(w, 30);
#define R3(v, w, x, y, z, i) \
z += (((w | x) & y) | (w & x)) + blk(i) + 0x8F1BBCDC + rol(v, 5); \
w = rol(w, 30);
#define R4(v, w, x, y, z, i) \
z += (w ^ x ^ y) + blk(i) + 0xCA62C1D6 + rol(v, 5); \
w = rol(w, 30);
#define CRLF "\r\n"
#define WS_MAGIC_STR "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
#define WS_SWITCH_PROTO_STR "HTTP/1.1 101 Switching Protocols"
#ifndef SHA_DIGEST_LENGTH
#define SHA_DIGEST_LENGTH 20
#endif
//
void *
xmalloc(size_t size)
{
void *ptr;
if ((ptr = malloc(size)) == NULL)
proto_server_log(NJT_LOG_DEBUG, "Unable to allocate memory - failed.");
return (ptr);
}
char *
xstrdup(const char *s)
{
char *ptr;
size_t len;
len = strlen(s) + 1;
ptr = xmalloc(len);
strncpy(ptr, s, len);
return (ptr);
}
/* Self-checking wrapper to calloc() */
void *
xcalloc(size_t nmemb, size_t size)
{
void *ptr;
if ((ptr = calloc(nmemb, size)) == NULL)
proto_server_log(NJT_LOG_DEBUG, "Unable to calloc memory - failed.");
return (ptr);
}
void *
xrealloc(void *oldptr, size_t size)
{
void *newptr;
if ((newptr = realloc(oldptr, size)) == NULL)
proto_server_log(NJT_LOG_DEBUG, "Unable to reallocate memory - failed");
return (newptr);
}
static char *
strtoupper(char *str)
{
char *p = str;
if (str == NULL || *str == '\0')
return str;
while (*p != '\0')
{
*p = toupper((int)*p);
p++;
}
return str;
}
static const char *
ws_get_method(const char *token)
{
const char *lookfor = NULL;
if ((lookfor = "GET", !memcmp(token, "GET ", 4)) ||
(lookfor = "get", !memcmp(token, "get ", 4)))
return lookfor;
return NULL;
}
char *
base64_encode(const void *buf, size_t size)
{
static const char base64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
char *str = (char *)xmalloc((size + 3) * 4 / 3 + 1);
char *p = str;
const unsigned char *q = (const unsigned char *)buf;
size_t i = 0;
while (i < size)
{
int c = q[i++];
c *= 256;
if (i < size)
c += q[i];
i++;
c *= 256;
if (i < size)
c += q[i];
i++;
*p++ = base64[(c & 0x00fc0000) >> 18];
*p++ = base64[(c & 0x0003f000) >> 12];
if (i > size + 1)
*p++ = '=';
else
*p++ = base64[(c & 0x00000fc0) >> 6];
if (i > size)
*p++ = '=';
else
*p++ = base64[c & 0x0000003f];
}
*p = 0;
return str;
}
static void
ws_set_header_key_value(ws_headers *headers, char *key, char *value)
{
if (strcasecmp("Host", key) == 0)
headers->host = xstrdup(value);
else if (strcasecmp("Origin", key) == 0)
headers->origin = xstrdup(value);
else if (strcasecmp("Upgrade", key) == 0)
headers->upgrade = xstrdup(value);
else if (strcasecmp("Connection", key) == 0)
headers->connection = xstrdup(value);
else if (strcasecmp("Sec-WebSocket-Protocol", key) == 0)
headers->ws_protocol = xstrdup(value);
else if (strcasecmp("Sec-WebSocket-Key", key) == 0)
headers->ws_key = xstrdup(value);
else if (strcasecmp("Sec-WebSocket-Version", key) == 0)
headers->ws_sock_ver = xstrdup(value);
else if (strcasecmp("User-Agent", key) == 0)
headers->agent = xstrdup(value);
else if (strcasecmp("Referer", key) == 0)
headers->referer = xstrdup(value);
}
static char *
ws_parse_request(char *line, char **method, char **protocol)
{
const char *meth;
char *req = NULL, *request = NULL, *proto = NULL;
ptrdiff_t rlen;
if ((meth = ws_get_method(line)) == NULL)
{
return NULL;
}
else
{
req = line + strlen(meth);
if ((proto = strstr(line, " HTTP/1.0")) == NULL &&
(proto = strstr(line, " HTTP/1.1")) == NULL)
return NULL;
req++;
if ((rlen = proto - req) <= 0)
return NULL;
request = xmalloc(rlen + 1);
strncpy(request, req, rlen);
request[rlen] = 0;
(*method) = strtoupper(xstrdup(meth));
(*protocol) = strtoupper(xstrdup(++proto));
}
return request;
}
static int
ws_set_header_fields(char *line, ws_headers *headers)
{
char *path = NULL, *method = NULL, *proto = NULL, *p, *value;
if (line[0] == '\n' || line[0] == '\r')
return 1;
if ((strstr(line, "GET ")) || (strstr(line, "get ")))
{
if ((path = ws_parse_request(line, &method, &proto)) == NULL)
return 1;
headers->path = path;
headers->method = method;
headers->protocol = proto;
return 0;
}
if ((p = strchr(line, ':')) == NULL)
return 1;
value = p + 1;
while (p != line && isspace((unsigned char)*(p - 1)))
p--;
if (p == line)
return 1;
*p = '\0';
if (strpbrk(line, " \t") != NULL)
{
*p = ' ';
return 1;
}
while (isspace((unsigned char)*value))
value++;
ws_set_header_key_value(headers, line, value);
return 0;
}
static int
parse_headers(ws_headers *headers)
{
char *tmp = NULL;
const char *buffer = headers->buf;
const char *line = buffer, *next = NULL;
int len = 0;
while (line)
{
if ((next = strstr(line, "\r\n")) != NULL)
len = (next - line);
else
len = strlen(line);
if (len <= 0)
{
proto_server_log(NJT_LOG_DEBUG, "1 tcc content parse_headers error!");
return 1;
}
tmp = xmalloc(len + 1);
memcpy(tmp, line, len);
tmp[len] = '\0';
if (ws_set_header_fields(tmp, headers) == 1)
{
free(tmp);
proto_server_log(NJT_LOG_DEBUG, "2 tcc content parse_headers error!");
return 1;
}
free(tmp);
line = next ? (next + 2) : NULL;
if (next && strcmp(next, "\r\n\r\n") == 0)
break;
}
return 0;
}
static int
ws_verify_req_headers(ws_headers *headers)
{
if (!headers->host)
return 1;
if (!headers->method)
return 1;
if (!headers->protocol)
return 1;
if (!headers->path)
return 1;
if (!headers->connection)
return 1;
if (!headers->ws_key)
return 1;
if (!headers->ws_sock_ver)
return 1;
return 0;
}
static void
SHA1Init(SHA1_CTX *context)
{
context->state[0] = 0x67452301;
context->state[1] = 0xEFCDAB89;
context->state[2] = 0x98BADCFE;
context->state[3] = 0x10325476;
context->state[4] = 0xC3D2E1F0;
context->count[0] = context->count[1] = 0;
}
void SHA1Transform(uint32_t state[5], uint8_t buffer[64])
{
uint32_t a, b, c, d, e;
typedef union
{
uint8_t c[64];
uint32_t l[16];
} CHAR64LONG16;
CHAR64LONG16 *block;
#ifdef SHA1HANDSOFF
static uint8_t workspace[64];
block = (CHAR64LONG16 *)(void *)workspace;
memcpy(block, buffer, 64);
#else
block = (CHAR64LONG16 *)(void *)buffer;
#endif
/* Copy context->state[] to working vars */
a = state[0];
b = state[1];
c = state[2];
d = state[3];
e = state[4];
/* 4 rounds of 20 operations each. Loop unrolled. */
R0(a, b, c, d, e, 0);
R0(e, a, b, c, d, 1);
R0(d, e, a, b, c, 2);
R0(c, d, e, a, b, 3);
R0(b, c, d, e, a, 4);
R0(a, b, c, d, e, 5);
R0(e, a, b, c, d, 6);
R0(d, e, a, b, c, 7);
R0(c, d, e, a, b, 8);
R0(b, c, d, e, a, 9);
R0(a, b, c, d, e, 10);
R0(e, a, b, c, d, 11);
R0(d, e, a, b, c, 12);
R0(c, d, e, a, b, 13);
R0(b, c, d, e, a, 14);
R0(a, b, c, d, e, 15);
R1(e, a, b, c, d, 16);
R1(d, e, a, b, c, 17);
R1(c, d, e, a, b, 18);
R1(b, c, d, e, a, 19);
R2(a, b, c, d, e, 20);
R2(e, a, b, c, d, 21);
R2(d, e, a, b, c, 22);
R2(c, d, e, a, b, 23);
R2(b, c, d, e, a, 24);
R2(a, b, c, d, e, 25);
R2(e, a, b, c, d, 26);
R2(d, e, a, b, c, 27);
R2(c, d, e, a, b, 28);
R2(b, c, d, e, a, 29);
R2(a, b, c, d, e, 30);
R2(e, a, b, c, d, 31);
R2(d, e, a, b, c, 32);
R2(c, d, e, a, b, 33);
R2(b, c, d, e, a, 34);
R2(a, b, c, d, e, 35);
R2(e, a, b, c, d, 36);
R2(d, e, a, b, c, 37);
R2(c, d, e, a, b, 38);
R2(b, c, d, e, a, 39);
R3(a, b, c, d, e, 40);
R3(e, a, b, c, d, 41);
R3(d, e, a, b, c, 42);
R3(c, d, e, a, b, 43);
R3(b, c, d, e, a, 44);
R3(a, b, c, d, e, 45);
R3(e, a, b, c, d, 46);
R3(d, e, a, b, c, 47);
R3(c, d, e, a, b, 48);
R3(b, c, d, e, a, 49);
R3(a, b, c, d, e, 50);
R3(e, a, b, c, d, 51);
R3(d, e, a, b, c, 52);
R3(c, d, e, a, b, 53);
R3(b, c, d, e, a, 54);
R3(a, b, c, d, e, 55);
R3(e, a, b, c, d, 56);
R3(d, e, a, b, c, 57);
R3(c, d, e, a, b, 58);
R3(b, c, d, e, a, 59);
R4(a, b, c, d, e, 60);
R4(e, a, b, c, d, 61);
R4(d, e, a, b, c, 62);
R4(c, d, e, a, b, 63);
R4(b, c, d, e, a, 64);
R4(a, b, c, d, e, 65);
R4(e, a, b, c, d, 66);
R4(d, e, a, b, c, 67);
R4(c, d, e, a, b, 68);
R4(b, c, d, e, a, 69);
R4(a, b, c, d, e, 70);
R4(e, a, b, c, d, 71);
R4(d, e, a, b, c, 72);
R4(c, d, e, a, b, 73);
R4(b, c, d, e, a, 74);
R4(a, b, c, d, e, 75);
R4(e, a, b, c, d, 76);
R4(d, e, a, b, c, 77);
R4(c, d, e, a, b, 78);
R4(b, c, d, e, a, 79);
state[0] += a;
state[1] += b;
state[2] += c;
state[3] += d;
state[4] += e;
a = b = c = d = e = 0;
}
static void
SHA1Update(SHA1_CTX *context, uint8_t *data, unsigned int len)
{
unsigned int i, j;
j = (context->count[0] >> 3) & 63;
if ((context->count[0] += len << 3) < (len << 3))
context->count[1]++;
context->count[1] += (len >> 29);
if ((j + len) > 63)
{
memcpy(&context->buffer[j], data, (i = 64 - j));
SHA1Transform(context->state, context->buffer);
for (; i + 63 < len; i += 64)
{
SHA1Transform(context->state, &data[i]);
}
j = 0;
}
else
i = 0;
memcpy(&context->buffer[j], &data[i], len - i);
}
static void
SHA1Final(uint8_t digest[20], SHA1_CTX *context)
{
uint32_t i, j;
uint8_t finalcount[8];
for (i = 0; i < 8; i++)
{
finalcount[i] = (uint8_t)((context->count[(i >= 4 ? 0 : 1)] >> ((3 - (i & 3)) * 8)) & 255); /* Endian independent */
}
SHA1Update(context, (uint8_t *)"\200", 1);
while ((context->count[0] & 504) != 448)
{
SHA1Update(context, (uint8_t *)"\0", 1);
}
SHA1Update(context, finalcount, 8); /* Should cause a SHA1Transform() */
for (i = 0; i < 20; i++)
{
digest[i] = (uint8_t)((context->state[i >> 2] >> ((3 - (i & 3)) * 8)) & 255);
}
/* Wipe variables */
i = j = 0;
memset(context->buffer, 0, 64);
memset(context->state, 0, 20);
memset(context->count, 0, 8);
memset(&finalcount, 0, 8);
#ifdef SHA1HANDSOFF /* make SHA1Transform overwrite its own static vars */
SHA1Transform(context->state, context->buffer);
#endif
}
static void
ws_sha1_digest(char *s, int len, unsigned char *digest)
{
SHA1_CTX sha;
SHA1Init(&sha);
SHA1Update(&sha, (uint8_t *)s, len);
SHA1Final(digest, &sha);
}
static void
ws_set_handshake_headers(ws_headers *headers)
{
size_t klen = strlen(headers->ws_key);
size_t mlen = strlen(WS_MAGIC_STR);
size_t len = klen + mlen;
char *s = xmalloc(klen + mlen + 1);
uint8_t digest[SHA_DIGEST_LENGTH];
memset(digest, 0, sizeof *digest);
memcpy(s, headers->ws_key, klen);
memcpy(s + klen, WS_MAGIC_STR, mlen + 1);
ws_sha1_digest(s, len, digest);
headers->ws_accept = base64_encode((unsigned char *)digest, sizeof(digest));
headers->ws_resp = xstrdup(WS_SWITCH_PROTO_STR);
if (!headers->upgrade)
headers->upgrade = xstrdup("websocket");
if (!headers->connection)
headers->connection = xstrdup("Upgrade");
free(s);
}
static void
ws_append_str(char **dest, const char *src)
{
size_t curlen = strlen(*dest);
size_t srclen = strlen(src);
size_t newlen = curlen + srclen;
char *str = xrealloc(*dest, newlen + 1);
memcpy(str + curlen, src, srclen + 1);
*dest = str;
}
int ws_send_handshake_headers(tcc_stream_request_t *r, ws_headers *headers)
{
int bytes = 0;
char *str = xstrdup("");
ws_append_str(&str, headers->ws_resp);
ws_append_str(&str, CRLF);
ws_append_str(&str, "Upgrade: ");
ws_append_str(&str, headers->upgrade);
ws_append_str(&str, CRLF);
ws_append_str(&str, "Connection: ");
ws_append_str(&str, headers->connection);
ws_append_str(&str, CRLF);
ws_append_str(&str, "Sec-WebSocket-Accept: ");
ws_append_str(&str, headers->ws_accept);
ws_append_str(&str, CRLF CRLF);
bytes = proto_server_send(r, str, strlen(str));
free(str);
return bytes;
}
/* Allocate memory for a websocket frame */
static WSFrame *
new_wsframe(void)
{
WSFrame *frame = xcalloc(1, sizeof(WSFrame));
memset(frame->buf, 0, sizeof(frame->buf));
frame->reading = 1;
return frame;
}
static int ws_get_data(WSClient *client, char *buffer, int size)
{
int len;
len = size;
if (client->msg.len < size)
{
len = client->msg.len;
}
memcpy(buffer, client->msg.data, len);
client->msg.data = client->msg.data + len;
client->msg.len = client->msg.len - len;
client->r->used_len = client->r->used_len + len;
return len;
}
static int
read_socket(WSClient *client, char *buffer, int size)
{
int bytes = 0;
bytes = ws_get_data(client, buffer, size);
return bytes;
}
/* Read a websocket frame's header.
*
* On success, the number of bytes read is returned. */
static int
ws_read_header(WSClient *client, WSFrame *frm, int pos, int need)
{
char *buf = frm->buf;
int bytes = 0;
if (client->msg.len == 0)
{
return 0;
}
/* read the first 2 bytes for basic frame info */
if ((bytes = read_socket(client, buf + pos, need)) < 1)
{
return bytes;
}
frm->buflen += bytes;
frm->buf[frm->buflen] = '\0'; /* null-terminate */
return bytes;
}
static int
ws_set_status(WSClient *client, WSStatus status, int bytes)
{
client->status = status;
return bytes;
}
static int
ws_set_front_header_fields(WSClient *client)
{
WSFrame **frm = &client->frame;
char *buf = (*frm)->buf;
(*frm)->fin = WS_FRM_FIN(*(buf));
(*frm)->masking = WS_FRM_MASK(*(buf + 1));
(*frm)->opcode = WS_FRM_OPCODE(*(buf));
(*frm)->res = WS_FRM_R1(*(buf)) || WS_FRM_R2(*(buf)) || WS_FRM_R3(*(buf));
/* should be masked and can't be using RESVd bits */
if (!(*frm)->masking || (*frm)->res)
{
proto_server_log(NJT_LOG_DEBUG, "tcc ws_set_front_header_fields masking=%d,res=%d", (*frm)->masking, (*frm)->res);
return ws_set_status(client, WS_ERR | WS_CLOSE, 1);
}
return 0;
}
/* Set the extended payload length into the given pointer. */
static void
ws_set_extended_header_size(const char *buf, int *extended)
{
uint64_t payloadlen = 0;
/* determine the payload length, else read more data */
payloadlen = WS_FRM_PAYLOAD(*(buf + 1));
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
*extended = 2;
break;
case WS_PAYLOAD_EXT64:
*extended = 8;
break;
}
}
/* Set the masking key into our frame structure. */
static void
ws_set_masking_key(WSFrame *frm, const char *buf)
{
uint64_t payloadlen = 0;
/* determine the payload length, else read more data */
payloadlen = WS_FRM_PAYLOAD(*(buf + 1));
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
memcpy(&frm->mask, buf + 4, sizeof(frm->mask));
break;
case WS_PAYLOAD_EXT64:
memcpy(&frm->mask, buf + 10, sizeof(frm->mask));
break;
default:
memcpy(&frm->mask, buf + 2, sizeof(frm->mask));
}
}
/* Set the extended payload length into our frame structure. */
static void
ws_set_payloadlen(WSFrame *frm, const char *buf)
{
uint64_t payloadlen = 0, len64;
uint16_t len16;
/* determine the payload length, else read more data */
payloadlen = WS_FRM_PAYLOAD(*(buf + 1));
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
memcpy(&len16, (buf + 2), sizeof(uint16_t));
frm->payloadlen = ntohs(len16);
break;
case WS_PAYLOAD_EXT64:
memcpy(&len64, (buf + 2), sizeof(uint64_t));
frm->payloadlen = be64toh(len64);
break;
default:
frm->payloadlen = payloadlen;
}
}
static int
ws_realloc_frm_payload(WSFrame *frm, WSMessage *msg)
{
char *tmp = NULL;
uint64_t newlen = 0;
newlen = msg->payloadsz + frm->payloadlen;
tmp = realloc(msg->payload, newlen);
if (tmp == NULL && newlen > 0)
{
free(msg->payload);
msg->payload = NULL;
return 1;
}
msg->payload = tmp;
return 0;
}
static void
ws_unmask_payload(char *buf, int len, int offset, unsigned char mask[])
{
int i, j = 0;
/* unmask data */
for (i = offset; i < len; ++i, ++j)
{
buf[i] ^= mask[j % 4];
}
}
static int
ws_error(tcc_stream_request_t *r, unsigned short code, const char *err)
{
unsigned int len;
unsigned short code_be;
char buf[128] = {0};
len = 2;
code_be = htobe16(code);
memcpy(buf, &code_be, 2);
if (err)
len += snprintf(buf + 2, sizeof buf - 4, "%s", err);
return ws_send_frame(r, WS_OPCODE_CLOSE, buf, len);
}
static int
ws_read_payload(WSClient *client, WSMessage *msg, int pos, int need)
{
char *buf = msg->payload;
int bytes = 0;
/* read the first 2 bytes for basic frame info */
if ((bytes = read_socket(client, buf + pos, need)) < 1)
{
if (client->status & WS_CLOSE)
ws_error(client->r, WS_CLOSE_UNEXPECTED, "Unable to read payload");
return bytes;
}
msg->buflen += bytes;
msg->payloadsz += bytes;
return bytes;
}
static void
ws_free_message(WSClient *client)
{
if (client->message && client->message->payload)
free(client->message->payload);
if (client->message)
free(client->message);
client->message = NULL;
}
static uint32_t
verify_utf8(uint32_t *state, const char *str, int len)
{
int i;
uint32_t type;
for (i = 0; i < len; ++i)
{
type = utf8d[(uint8_t)str[i]];
*state = utf8d[256 + (*state) * 16 + type];
if (*state == UTF8_INVAL)
break;
}
return *state;
}
int ws_validate_string(const char *str, int len)
{
uint32_t state = UTF8_VALID;
if (verify_utf8(&state, str, len) == UTF8_INVAL)
{
return 1;
}
if (state != UTF8_VALID)
{
return 1;
}
return 0;
}
static int
ws_handle_err(WSClient *client, unsigned short code, WSStatus status, const char *m)
{
client->status = status;
return ws_error(client->r, code, m);
}
static void
ws_handle_text_bin(WSClient *client, WSServer *server)
{
tcc_str_t content, out_data;
WSFrame **frm = &client->frame;
WSMessage **msg = &client->message;
int offset = (*msg)->mask_offset;
if ((*frm)->opcode == WS_OPCODE_CONTINUATION)
{
// proto_server_log(NJT_LOG_DEBUG,"2 tcc websocket CONTINUATION\n");
}
/* All data frames after the initial data frame must have opcode 0 */
if ((*msg)->fragmented && (*frm)->opcode != WS_OPCODE_CONTINUATION)
{
client->status = WS_ERR | WS_CLOSE;
return;
}
/* RFC states that there is a new masking key per frame, therefore,
* time to unmask... */
ws_unmask_payload((*msg)->payload, (*msg)->payloadsz, offset, (*frm)->mask);
/* Done with the current frame's payload */
(*msg)->buflen = 0;
/* Reading a fragmented frame */
(*msg)->fragmented = 1;
content.data = (*msg)->payload;
content.len = (*msg)->payloadsz;
if (!(*frm)->fin)
{
proto_server_log(NJT_LOG_DEBUG, "tcc frm CONTINUATION ws_get_frm_payload = %V!", &content);
return;
}
else
{
proto_server_log(NJT_LOG_DEBUG, "tcc frm ws_get_frm_payload = %V!", &content);
}
// proto_server_log(NJT_LOG_DEBUG, "2 tcc ws_get_frm_payload = %V!",&content);
/* validate text data encoded as UTF-8 */
if ((*msg)->opcode == WS_OPCODE_TEXT)
{
if (ws_validate_string((*msg)->payload, (*msg)->payloadsz) != 0)
{
ws_handle_err(client, WS_CLOSE_INVALID_UTF8, WS_ERR | WS_CLOSE, NULL);
proto_server_log(NJT_LOG_DEBUG, "3 tcc ws_get_frm_payload = %V!", &content);
return;
}
}
if ((*msg)->opcode != WS_OPCODE_CONTINUATION)
{
// ws_write_fifo (server->pipeout, (*msg)->payload, (*msg)->payloadsz);
}
content.data = (*msg)->payload;
content.len = (*msg)->payloadsz;
if ((*msg)->payloadsz > 0)
{
client->message->headers = &client->headers;
ws_app_on_message(client->r, client->message);
// free(out_data.data);
}
// proto_server_log(NJT_LOG_DEBUG, "5 tcc ws_get_frm_payload = %V!", &content);
ws_free_message(client);
}
static void
ws_handle_pong(WSClient *client)
{
WSFrame **frm = &client->frame;
if (!(*frm)->fin)
{
return;
}
ws_free_message(client);
}
static int
ws_respond(tcc_stream_request_t *r, const char *buffer, int len)
{
int bytes = 0;
// size_t length = len;
proto_server_send(r, (char *)buffer, len);
return bytes;
}
static int
ws_send_frame(tcc_stream_request_t *r, WSOpcode opcode, const char *p, int sz)
{
unsigned char buf[32] = {0};
char *frm = NULL;
uint64_t payloadlen = 0, u64;
int hsize = 2;
if (sz < 126)
{
payloadlen = sz;
}
else if (sz < (1 << 16))
{
payloadlen = WS_PAYLOAD_EXT16;
hsize += 2;
}
else
{
payloadlen = WS_PAYLOAD_EXT64;
hsize += 8;
}
buf[0] = 0x80 | ((uint8_t)opcode);
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
buf[1] = WS_PAYLOAD_EXT16;
buf[2] = (sz & 0xff00) >> 8;
buf[3] = (sz & 0x00ff) >> 0;
break;
case WS_PAYLOAD_EXT64:
buf[1] = WS_PAYLOAD_EXT64;
u64 = htobe64(sz);
memcpy(buf + 2, &u64, sizeof(uint64_t));
break;
default:
buf[1] = (sz & 0xff);
}
frm = xcalloc(hsize + sz, sizeof(unsigned char));
memcpy(frm, buf, hsize);
if (p != NULL && sz > 0)
memcpy(frm + hsize, p, sz);
ws_respond(r, frm, hsize + sz);
free(frm);
return 0;
}
int ws_generate_frame(WSOpcode opcode, const char *p, int sz, tcc_str_t *out_message)
{
unsigned char buf[32] = {0};
char *frm = NULL;
uint64_t payloadlen = 0, u64;
int hsize = 2;
if (sz < 126)
{
payloadlen = sz;
}
else if (sz < (1 << 16))
{
payloadlen = WS_PAYLOAD_EXT16;
hsize += 2;
}
else
{
payloadlen = WS_PAYLOAD_EXT64;
hsize += 8;
}
buf[0] = 0x80 | ((uint8_t)opcode);
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
buf[1] = WS_PAYLOAD_EXT16;
buf[2] = (sz & 0xff00) >> 8;
buf[3] = (sz & 0x00ff) >> 0;
break;
case WS_PAYLOAD_EXT64:
buf[1] = WS_PAYLOAD_EXT64;
u64 = htobe64(sz);
memcpy(buf + 2, &u64, sizeof(uint64_t));
break;
default:
buf[1] = (sz & 0xff);
}
frm = xcalloc(hsize + sz, sizeof(unsigned char));
memcpy(frm, buf, hsize);
if (p != NULL && sz > 0)
memcpy(frm + hsize, p, sz);
out_message->data = frm;
out_message->len = hsize + sz;
return 0;
}
static void
ws_handle_ping(WSClient *client)
{
WSFrame **frm = &client->frame;
tcc_str_t content;
WSMessage **msg = &client->message;
char *buf = NULL, *tmp = NULL;
int pos = 0, len = (*frm)->payloadlen, newlen = 0;
/* RFC states that Control frames themselves MUST NOT be
* fragmented. */
if (!(*frm)->fin)
{
ws_handle_err(client, WS_CLOSE_PROTO_ERR, WS_ERR | WS_CLOSE, NULL);
return;
}
/* Control frames are only allowed to have payload up to and
* including 125 octets */
if ((*frm)->payloadlen > 125)
{
ws_handle_err(client, WS_CLOSE_PROTO_ERR, WS_ERR | WS_CLOSE, NULL);
return;
}
/* No payload from ping */
if (len == 0)
{
// ws_send_frame(client->r, WS_OPCODE_PONG, NULL, 0);
client->message->headers = &client->headers;
ws_app_on_message(client->r, client->message);
return;
}
/* Copy the ping payload */
pos = (*msg)->payloadsz - len;
buf = xcalloc(len, sizeof(char));
memcpy(buf, (*msg)->payload + pos, len);
/* Unmask it */
ws_unmask_payload(buf, len, 0, (*frm)->mask);
/* Resize the current payload (keep an eye on this realloc) */
newlen = (*msg)->payloadsz - len;
tmp = realloc((*msg)->payload, newlen);
if (tmp == NULL && newlen > 0)
{
free((*msg)->payload);
free(buf);
(*msg)->payload = NULL;
client->status = WS_ERR | WS_CLOSE;
return;
}
(*msg)->payload = tmp;
(*msg)->payloadsz -= len;
content.data = buf;
content.len = len;
// proto_server_log(NJT_LOG_DEBUG, "tcc ping!");
ws_send_frame(client->r, WS_OPCODE_PONG, buf, len);
// proto_server_log(NJT_LOG_DEBUG, "tcc ping len=%d,payloadsz=%d!",len,(*msg)->payloadsz);
client->message->headers = &client->headers;
ws_app_on_message(client->r, client->message);
(*msg)->buflen = 0; /* done with the current frame's payload */
/* Control frame injected in the middle of a fragmented message. */
if (!(*msg)->fragmented)
{
ws_free_message(client);
}
free(buf);
}
static int
ws_handle_close(WSClient *client)
{
client->status = WS_ERR | WS_CLOSE;
return ws_send_frame(client->r, WS_OPCODE_CLOSE, NULL, 0);
}
static void
ws_manage_payload_opcode(WSClient *client, WSServer *server)
{
WSFrame **frm = &client->frame;
WSMessage **msg = &client->message;
switch ((*frm)->opcode)
{
case WS_OPCODE_CONTINUATION:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket CONTINUATION\n");
/* first frame can't be a continuation frame */
if (!(*msg)->fragmented)
{
client->status = WS_ERR | WS_CLOSE;
break;
}
ws_handle_text_bin(client, server);
break;
case WS_OPCODE_TEXT:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket TEXT\n");
client->message->opcode = (*frm)->opcode;
ws_handle_text_bin(client, server);
break;
case WS_OPCODE_BIN:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket BIN\n");
client->message->opcode = (*frm)->opcode;
ws_handle_text_bin(client, server);
break;
case WS_OPCODE_PONG:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket PONG\n");
client->message->opcode = (*frm)->opcode;
ws_handle_pong(client);
break;
case WS_OPCODE_PING:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket PING\n");
client->message->opcode = (*frm)->opcode;
ws_handle_ping(client);
break;
default:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket CLOSE\n");
ws_handle_close(client);
}
}
static void
ws_free_frame(WSClient *client)
{
if (client->frame)
free(client->frame);
client->frame = NULL;
}
static WSMessage *
new_wsmessage(void)
{
WSMessage *msg = xcalloc(1, sizeof(WSMessage));
return msg;
}
static int
ws_get_frm_payload(WSClient *client, WSServer *server)
{
WSFrame **frm = NULL;
WSMessage **msg = NULL;
int bytes = 0, readh = 0, need = 0;
if (client->message == NULL)
client->message = new_wsmessage();
frm = &client->frame;
msg = &client->message;
/* message within the same frame */
if ((*msg)->payload == NULL && (*frm)->payloadlen)
(*msg)->payload = xcalloc((*frm)->payloadlen, sizeof(char));
/* handle a new frame */
else if ((*msg)->buflen == 0 && (*frm)->payloadlen)
{
if (ws_realloc_frm_payload((*frm), (*msg)) == 1)
return ws_set_status(client, WS_ERR | WS_CLOSE, 0);
}
readh = (*msg)->buflen; /* read from so far */
need = (*frm)->payloadlen - readh; /* need to read */
if (need > 0)
{
if ((bytes = ws_read_payload(client, (*msg), (*msg)->payloadsz, need)) < 0)
return bytes;
if (bytes != need)
return ws_set_status(client, WS_READING, bytes);
}
(*msg)->mask_offset = (*msg)->payloadsz - (*msg)->buflen;
ws_manage_payload_opcode(client, server);
ws_free_frame(client);
return bytes;
}
static int
ws_get_frm_header(WSClient *client)
{
WSFrame **frm = NULL;
int bytes = 0, readh = 0, need = 0, offset = 0, extended = 0;
if (client->frame == NULL)
{
client->frame = new_wsframe();
proto_server_log(NJT_LOG_DEBUG, "tcc new_wsframe!");
}
else
{
proto_server_log(NJT_LOG_DEBUG, "tcc client->frame->reading=%d!", client->frame->reading);
}
frm = &client->frame;
/* Read the first 2 bytes for basic frame info */
readh = (*frm)->buflen; /* read from header so far */
need = 2 - readh; /* need to read */
if (need > 0)
{
if ((bytes = ws_read_header(client, (*frm), readh, need)) < 1)
{
// proto_server_log(NJT_LOG_DEBUG, "1 tcc read %d!",bytes);
return bytes;
}
if (bytes != need)
{
// proto_server_log(NJT_LOG_DEBUG, "2 tcc read %d!",bytes);
return ws_set_status(client, WS_READING, bytes);
}
}
offset += 2;
if (ws_set_front_header_fields(client) != 0)
{
// proto_server_log(NJT_LOG_DEBUG, "3 tcc read %d!",bytes);
return bytes;
}
ws_set_extended_header_size((*frm)->buf, &extended);
/* read the extended header */
readh = (*frm)->buflen; /* read from header so far */
need = (extended + offset) - readh; /* read from header field so far */
if (need > 0)
{
if ((bytes = ws_read_header(client, (*frm), readh, need)) < 1)
{
// proto_server_log(NJT_LOG_DEBUG, "4 tcc read %d!",bytes);
return bytes;
}
if (bytes != need)
{
// proto_server_log(NJT_LOG_DEBUG, "5 tcc read %d!",bytes);
return ws_set_status(client, WS_READING, bytes);
}
}
offset += extended;
/* read the masking key */
readh = (*frm)->buflen; /* read from header so far */
need = (4 + offset) - readh;
if (need > 0)
{
if ((bytes = ws_read_header(client, (*frm), readh, need)) < 1)
{
// proto_server_log(NJT_LOG_DEBUG, "6 tcc read %d!",bytes);
return bytes;
}
if (bytes != need)
{
// proto_server_log(NJT_LOG_DEBUG, "7 tcc read %d!",bytes);
return ws_set_status(client, WS_READING, bytes);
}
}
offset += 4;
ws_set_payloadlen((*frm), (*frm)->buf);
ws_set_masking_key((*frm), (*frm)->buf);
if ((*frm)->payloadlen > max_frm_size)
{
// proto_server_log(NJT_LOG_DEBUG, "8 tcc read %d!",bytes);
return ws_set_status(client, WS_ERR | WS_CLOSE, bytes);
}
(*frm)->buflen = 0;
(*frm)->reading = 0;
(*frm)->payload_offset = offset;
// proto_server_log(NJT_LOG_DEBUG, "9 tcc read %d!",bytes);
return ws_set_status(client, WS_OK, bytes);
}
static int
ws_get_message(WSClient *client, WSServer *server)
{
int bytes = 0;
if ((client->frame == NULL) || (client->frame->reading))
{
if ((bytes = ws_get_frm_header(client)) < 1 || client->frame->reading)
{
proto_server_log(NJT_LOG_DEBUG, "tcc ws_get_frm_header bytes=%d!", bytes);
return bytes;
}
}
proto_server_log(NJT_LOG_DEBUG, "tcc ws_get_frm_payload!");
return ws_get_frm_payload(client, server);
return 1;
}
//===============================================================
int proto_server_process_connection(tcc_stream_request_t *r)
{
char ch = 'a';
tcc_str_t ip = njt_string("127.0.0.1");
if (ip.len == r->addr_text->len && memcmp((void *)r->addr_text->data, ip.data,ip.len) == 0)
{
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%V,NJT_STREAM_FORBIDDEN !",r->addr_text);
}
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%Xd ok!",ch);
return NJT_OK;
}
int proto_server_process_preread(tcc_stream_request_t *r, tcc_str_t *msg)
{
proto_server_log(NJT_LOG_DEBUG, "2 tcc preread ip=%V ok!",r->addr_text);
proto_server_log(NJT_LOG_DEBUG, "tcc preread msg=%V!",msg);
r->used_len = msg->len;
return NJT_DECLINED;
}
int proto_server_process_log(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "4 tcc log ip=%V ok!",r->addr_text);
return NJT_OK;
}
int proto_server_process_message(tcc_stream_request_t *r, tcc_str_t *msg)
{
char *data = NULL;
WSctx *cli_ctx;
int bytes;
int rc;
char *p;
WSMessage message;
WSServer *server;
tcc_str_t end_flag = njt_string("\r\n\r\n");
proto_server_log(NJT_LOG_DEBUG, "3 tcc content tcc get=%V,len=%d", msg, msg->len);
cli_ctx = tcc_get_client_ctx(r, TCC_PROTO_CTX_ID);
if (cli_ctx == NULL)
{
cli_ctx = proto_malloc(r, sizeof(WSctx));
if(cli_ctx == NULL) {
return NJT_ERROR;
}
memset(cli_ctx, 0, sizeof(WSctx));
cli_ctx->client.r = r;
tcc_set_client_ctx(r,TCC_PROTO_CTX_ID,cli_ctx);
}
if (cli_ctx->handshake == 0)
{
p = njt_strlcasestrn(msg->data,msg->data + msg->len,end_flag.data,end_flag.len - 1);
if (p == NULL)
{
cli_ctx->handshake = 1;
}
}
if (cli_ctx->handshake == 0)
{
cli_ctx->client.headers.buflen = msg->len;
memcpy(cli_ctx->client.headers.buf, msg->data, cli_ctx->client.headers.buflen);
cli_ctx->client.headers.buf[cli_ctx->client.headers.buflen] = '\0';
data = cli_ctx->client.headers.buf;
if (strstr(data, "\r\n\r\n") == NULL)
{
proto_server_log(NJT_LOG_DEBUG, "tcc http error!");
return NJT_AGAIN;
}
if (parse_headers(&cli_ctx->client.headers) != 0)
{
proto_server_log(NJT_LOG_DEBUG, "tcc content parse_headers error!");
return NJT_ERROR;
}
if (ws_verify_req_headers(&cli_ctx->client.headers) != 0)
{
proto_server_log(NJT_LOG_DEBUG, "tcc content ws_verify_req_headers error!");
return NJT_ERROR;
}
ws_set_handshake_headers(&cli_ctx->client.headers);
njt_memzero(&message, sizeof(WSMessage));
message.headers = &cli_ctx->client.headers;
rc = ws_app_on_connection(r, &message);
if (rc == NJT_OK)
{
cli_ctx->handshake = WS_HANDSHAKE_OK;
cli_ctx->client.r->used_len = msg->len;
proto_server_log(NJT_LOG_DEBUG, "3 tcc content WS_HANDSHAKE_OK [%p,%p]!", cli_ctx, cli_ctx->client);
}
return NJT_OK;
}
else
{
if (msg->len > 0)
{
cli_ctx->client.msg = *msg;
server = tcc_client_get_srv_ctx(r);
bytes = ws_get_message(&cli_ctx->client, server);
}
}
proto_server_log(NJT_LOG_DEBUG, "tcc get ws data3 msg->len=%d,used_len=%d!", msg->len, cli_ctx->client.r->used_len);
if (r->used_len != msg->len)
{
return NJT_AGAIN;
}
return NJT_OK;
}
int proto_server_process_client_update(tcc_stream_request_t *r)
{
//ws_app_client_update(r);
return NJT_OK;
}
int proto_server_process_connection_close(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_process_connection_close!");
ws_app_on_close(r);
return NJT_OK;
}
int proto_server_update(tcc_stream_server_ctx *srv_ctx)
{
//ws_app_server_update(srv_ctx);
return NJT_OK;
}
int proto_server_init(tcc_stream_server_ctx *srv_ctx)
{
WSServer *srv_data = proto_malloc(srv_ctx, sizeof(WSServer));
if (srv_data != NULL)
{
tcc_set_srv_ctx(srv_ctx, srv_data);
}
ws_app_server_init(srv_ctx);
return NJT_OK;
}
int proto_server_upstream_message(tcc_stream_request_t *r, tcc_str_t *msg){
u_char *p;
tcc_str_t end_tok = njt_string("\r\n\r\n");
tcc_str_t add_data = njt_string("\r\ntcc-down-set: downset");
tcc_str_t new_msg;
tcc_str_t old_end_msg;
proto_server_log(NJT_LOG_DEBUG, "tcc from upstream msg=%V",msg);
p = njt_strlcasestrn(msg->data,msg->data + msg->len,end_tok.data,end_tok.len - 1);
if (p == NULL) {
proto_server_send(r,msg->data,msg->len);
} else {
new_msg.data = msg->data;
new_msg.len = p - msg->data;
old_end_msg.data = p;
old_end_msg.len = msg->data + msg->len - old_end_msg.data;
proto_server_send(r,new_msg.data,new_msg.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client msg1=%V",&new_msg);
proto_server_send(r,add_data.data,add_data.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client msg2=%V",&add_data);
proto_server_send(r,old_end_msg.data,old_end_msg.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client len=%d,msg3=%V",old_end_msg.len,&old_end_msg);
}
r->used_len = msg->len;
return NJT_OK;
}
void* proto_server_check_upstream_peer(tcc_stream_client_upstream_data_t *cli_ups_info) {
int i;
tcc_uint_t weight;
int mask_len = 7;
tcc_stream_upstream_rr_peer_t *peer_list = cli_ups_info->peer_list;
for(i = 0; i < cli_ups_info->peer_num; i++) {
if(peer_list[i].server->len >= mask_len && cli_ups_info->cli_addr_text->len >= mask_len && njt_memcmp(peer_list[i].server->data,cli_ups_info->cli_addr_text->data,mask_len) == 0) {
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "1 tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
}
if(cli_ups_info->peer_num > 0) {
i = 0;
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "default tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
return NULL; // NJT_BUSY;
}
int proto_server_upstream_connection_close(tcc_stream_request_t *r) {
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_upstream_connection_close!");
}tcc_ws.c
#include <tcclib.h>
#include <njt_tcc.h>
#include <tcc_ws.h>
#include <ctype.h>
typedef struct app_server_s {
/* Server Status */
int gen_client_id;
int server_id;
} app_server_t;
typedef struct app_client_s {
/* Server Status */
tcc_str_t *init_data;
int id;
} app_client_t;
int ws_app_on_connection(tcc_stream_request_t *r,WSMessage *msg) {
app_server_t *app_server;
app_client_t *app_client = proto_malloc(r,sizeof(app_client_t));
if(app_client != NULL) {
app_client->init_data = r->addr_text;
app_server = tcc_client_get_app_srv_ctx(r);
app_client->id = ++app_server->gen_client_id;
tcc_set_client_app_ctx(r,app_client);
}
ws_send_handshake_headers(r, msg->headers);
if(msg->headers != NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_on_connection path: %s!",msg->headers->path);
}
return NJT_OK;
}
int ws_app_on_message(tcc_stream_request_t *r,WSMessage *msg) {
tcc_str_t data,out_data;
tcc_str_t buffer;
u_char *p;
int len;
app_client_t *app_client = tcc_get_client_app_ctx(r);
if(app_client == NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc app_client null!");
return NJT_ERROR;
}
app_server_t *app_server = tcc_client_get_app_srv_ctx(r);
if(app_server == NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc app_server null!");
return NJT_ERROR;
}
if(msg->opcode == WS_OPCODE_PING) {
proto_server_log(NJT_LOG_DEBUG, "1 tcc from ws_app_on_message ping!");
return NJT_OK;
}
proto_server_log(NJT_LOG_DEBUG, "1 tcc from ws_app_on_message msg->opcode=%d!",msg->opcode);
data.data = msg->payload;
data.len = msg->payloadsz;
len = msg->payloadsz + app_client->init_data->len + 100;
buffer.data = proto_malloc(r,len);
if(buffer.data == NULL) {
return NJT_ERROR;
}
buffer.len = len;
njt_memzero(buffer.data,len);
p = njt_snprintf(buffer.data,buffer.len,"[server]:\nclient_id=%d,addr:[%V],data=%V,server_id=%d",app_client->id,app_client->init_data,&data,app_server->server_id);
buffer.len = p - buffer.data;
proto_server_log(NJT_LOG_DEBUG, "3 tcc from ws_app_on_message %V!",&buffer);
ws_generate_frame(WS_OPCODE_TEXT, buffer.data, buffer.len, &out_data);
proto_server_send(r,out_data.data, out_data.len);
proto_free(r,buffer.data);
free(out_data.data);
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_on_message data=%V!",&data);
return NJT_OK;
}
int ws_app_on_close(tcc_stream_request_t *r) {
app_client_t *app_client = tcc_get_client_app_ctx(r);
if(app_client != NULL) {
proto_free(r,app_client);
}
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_on_close!");
return NJT_OK;
}
int ws_app_client_update(tcc_stream_request_t *r) {
app_client_t *app_client = tcc_get_client_app_ctx(r);
tcc_str_t data = njt_string("tcc ws_app_client_update!\n");
if(app_client != NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_client_update !");
}
return NJT_OK;
}
int ws_app_server_update(tcc_stream_server_ctx *srv_ctx)
{
tcc_str_t data = njt_string("tcc ws_app_server_update!\n");
tcc_str_t out_data;
app_server_t * srv_data = tcc_get_app_srv_ctx(srv_ctx);
if(srv_data) {
ws_generate_frame(WS_OPCODE_TEXT, data.data, data.len, &out_data);
if(out_data.len > 0) {
free(out_data.data);
}
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_server_update !");
}
return NJT_OK;
}
int ws_app_server_init(tcc_stream_server_ctx *srv_ctx) {
app_server_t *app_server = proto_malloc(srv_ctx,sizeof(app_server_t));
if(app_server != NULL) {
app_server->gen_client_id = 0;
app_server->server_id = 1;
tcc_set_app_srv_ctx(srv_ctx,app_server);
}
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_server_init!");
return NJT_OK;
}使用websocat连接ws server,发送数据123
向ws server发送数据,并接收到返回内容。
日志中记录接收消息和返回消息。







