python脚本支持
1. 功能说明
python脚本支持是对websocket server功能的扩展。主要功能是在消息处理阶段,可以使用python提供的on_msg函数来处理接收到的消息。
2. 依赖模块
python脚本支持依赖模块:
load_module modules/njt_python_module.so;
以及python3.8环境。
3. python环境配置
python环境配置详见章节可编程性-http-python_wsgi。
4.指令说明
Syntax | proto_server_py_module module_name [module_path] |
---|---|
Default | 如果没有给出module_path, 默认在PYTHONPATH环境变量指定的目录及conf文件所在目录查找module_name对应文件 |
Context | stream,server |
5.调用样例
njet.conf
helper broker modules/njt_helper_broker_module.so conf/mqtt.conf;
helper ctrl modules/njt_helper_ctrl_module.so conf/ctrl.conf;
load_module modules/njt_python_module.so;
user root root;
worker_processes 2;
cluster_name helper;
node_name node1;
error_log logs/error.log info;
pid logs/njet.pid;
events {
worker_connections 1024;
}
http {
dyn_kv_conf conf/iot-work.conf;
include mime.types;
access_log logs/access.log;
sendfile on;
keepalive_timeout 65;
server {
listen 5555;
server_name localhost;
location / {
return 200 "5555 ok";
}
}
}
stream {
server {
listen 22223;
preread_buffer_size 1024;
proto_buffer_size 1024;
proto_server on;
proto_server_code_file "conf/ws_proto_server_py.c";
proto_server_code_file "conf/tcc_ws_py.c";
proto_server_py_module "ws_py""conf/apps";
}
}
ws_proto_server_py.c
#include <tcclib.h>
#include <njt_tcc.h>
#include <tcc_ws.h>
#include <ctype.h>
#include <string.h>
// global vari
int max_frm_size = 6553500;
// ws_headers headers;
typedef struct
{
uint32_t state[5];
uint32_t count[2];
uint8_t buffer[64];
} SHA1_CTX;
#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits))))
/* blk0() and blk() perform the initial expand. */
/* I got the idea of expanding during the round function from SSLeay */
#ifdef LITTLE_ENDIAN
#define blk0(i) (block->l[i] = (rol(block->l[i], 24) & 0xFF00FF00) | (rol(block->l[i], 8) & 0x00FF00FF))
#else
#define blk0(i) block->l[i]
#endif
#define blk(i) (block->l[i & 15] = rol(block->l[(i + 13) & 15] ^ block->l[(i + 8) & 15] ^ block->l[(i + 2) & 15] ^ block->l[i & 15], 1))
/* (R0+R1), R2, R3, R4 are the different operations used in SHA1 */
#define R0(v, w, x, y, z, i) \
z += ((w & (x ^ y)) ^ y) + blk0(i) + 0x5A827999 + rol(v, 5); \
w = rol(w, 30);
#define R1(v, w, x, y, z, i) \
z += ((w & (x ^ y)) ^ y) + blk(i) + 0x5A827999 + rol(v, 5); \
w = rol(w, 30);
#define R2(v, w, x, y, z, i) \
z += (w ^ x ^ y) + blk(i) + 0x6ED9EBA1 + rol(v, 5); \
w = rol(w, 30);
#define R3(v, w, x, y, z, i) \
z += (((w | x) & y) | (w & x)) + blk(i) + 0x8F1BBCDC + rol(v, 5); \
w = rol(w, 30);
#define R4(v, w, x, y, z, i) \
z += (w ^ x ^ y) + blk(i) + 0xCA62C1D6 + rol(v, 5); \
w = rol(w, 30);
#define CRLF "\r\n"
#define WS_MAGIC_STR "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
#define WS_SWITCH_PROTO_STR "HTTP/1.1 101 Switching Protocols"
#ifndef SHA_DIGEST_LENGTH
#define SHA_DIGEST_LENGTH 20
#endif
//
void *
xmalloc(size_t size)
{
void *ptr;
if ((ptr = malloc(size)) == NULL)
proto_server_log(NJT_LOG_DEBUG, "Unable to allocate memory - failed.");
return (ptr);
}
char *
xstrdup(const char *s)
{
char *ptr;
size_t len;
len = strlen(s) + 1;
ptr = xmalloc(len);
strncpy(ptr, s, len);
return (ptr);
}
/* Self-checking wrapper to calloc() */
void *
xcalloc(size_t nmemb, size_t size)
{
void *ptr;
if ((ptr = calloc(nmemb, size)) == NULL)
proto_server_log(NJT_LOG_DEBUG, "Unable to calloc memory - failed.");
return (ptr);
}
void *
xrealloc(void *oldptr, size_t size)
{
void *newptr;
if ((newptr = realloc(oldptr, size)) == NULL)
proto_server_log(NJT_LOG_DEBUG, "Unable to reallocate memory - failed");
return (newptr);
}
static char *
strtoupper(char *str)
{
char *p = str;
if (str == NULL || *str == '\0')
return str;
while (*p != '\0')
{
*p = toupper((int)*p);
p++;
}
return str;
}
static const char *
ws_get_method(const char *token)
{
const char *lookfor = NULL;
if ((lookfor = "GET", !memcmp(token, "GET ", 4)) ||
(lookfor = "get", !memcmp(token, "get ", 4)))
return lookfor;
return NULL;
}
char *
base64_encode(const void *buf, size_t size)
{
static const char base64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
char *str = (char *)xmalloc((size + 3) * 4 / 3 + 1);
char *p = str;
const unsigned char *q = (const unsigned char *)buf;
size_t i = 0;
while (i < size)
{
int c = q[i++];
c *= 256;
if (i < size)
c += q[i];
i++;
c *= 256;
if (i < size)
c += q[i];
i++;
*p++ = base64[(c & 0x00fc0000) >> 18];
*p++ = base64[(c & 0x0003f000) >> 12];
if (i > size + 1)
*p++ = '=';
else
*p++ = base64[(c & 0x00000fc0) >> 6];
if (i > size)
*p++ = '=';
else
*p++ = base64[c & 0x0000003f];
}
*p = 0;
return str;
}
static void
ws_set_header_key_value(ws_headers *headers, char *key, char *value)
{
if (strcasecmp("Host", key) == 0)
headers->host = xstrdup(value);
else if (strcasecmp("Origin", key) == 0)
headers->origin = xstrdup(value);
else if (strcasecmp("Upgrade", key) == 0)
headers->upgrade = xstrdup(value);
else if (strcasecmp("Connection", key) == 0)
headers->connection = xstrdup(value);
else if (strcasecmp("Sec-WebSocket-Protocol", key) == 0)
headers->ws_protocol = xstrdup(value);
else if (strcasecmp("Sec-WebSocket-Key", key) == 0)
headers->ws_key = xstrdup(value);
else if (strcasecmp("Sec-WebSocket-Version", key) == 0)
headers->ws_sock_ver = xstrdup(value);
else if (strcasecmp("User-Agent", key) == 0)
headers->agent = xstrdup(value);
else if (strcasecmp("Referer", key) == 0)
headers->referer = xstrdup(value);
}
static char *
ws_parse_request(char *line, char **method, char **protocol)
{
const char *meth;
char *req = NULL, *request = NULL, *proto = NULL;
ptrdiff_t rlen;
if ((meth = ws_get_method(line)) == NULL)
{
return NULL;
}
else
{
req = line + strlen(meth);
if ((proto = strstr(line, " HTTP/1.0")) == NULL &&
(proto = strstr(line, " HTTP/1.1")) == NULL)
return NULL;
req++;
if ((rlen = proto - req) <= 0)
return NULL;
request = xmalloc(rlen + 1);
strncpy(request, req, rlen);
request[rlen] = 0;
(*method) = strtoupper(xstrdup(meth));
(*protocol) = strtoupper(xstrdup(++proto));
}
return request;
}
static int
ws_set_header_fields(char *line, ws_headers *headers)
{
char *path = NULL, *method = NULL, *proto = NULL, *p, *value;
if (line[0] == '\n' || line[0] == '\r')
return 1;
if ((strstr(line, "GET ")) || (strstr(line, "get ")))
{
if ((path = ws_parse_request(line, &method, &proto)) == NULL)
return 1;
headers->path = path;
headers->method = method;
headers->protocol = proto;
return 0;
}
if ((p = strchr(line, ':')) == NULL)
return 1;
value = p + 1;
while (p != line && isspace((unsigned char)*(p - 1)))
p--;
if (p == line)
return 1;
*p = '\0';
if (strpbrk(line, " \t") != NULL)
{
*p = ' ';
return 1;
}
while (isspace((unsigned char)*value))
value++;
ws_set_header_key_value(headers, line, value);
return 0;
}
static int
parse_headers(ws_headers *headers)
{
char *tmp = NULL;
const char *buffer = headers->buf;
const char *line = buffer, *next = NULL;
int len = 0;
while (line)
{
if ((next = strstr(line, "\r\n")) != NULL)
len = (next - line);
else
len = strlen(line);
if (len <= 0)
{
proto_server_log(NJT_LOG_DEBUG, "1 tcc content parse_headers error!");
return 1;
}
tmp = xmalloc(len + 1);
memcpy(tmp, line, len);
tmp[len] = '\0';
if (ws_set_header_fields(tmp, headers) == 1)
{
free(tmp);
proto_server_log(NJT_LOG_DEBUG, "2 tcc content parse_headers error!");
return 1;
}
free(tmp);
line = next ? (next + 2) : NULL;
if (next && strcmp(next, "\r\n\r\n") == 0)
break;
}
return 0;
}
static int
ws_verify_req_headers(ws_headers *headers)
{
if (!headers->host)
return 1;
if (!headers->method)
return 1;
if (!headers->protocol)
return 1;
if (!headers->path)
return 1;
if (!headers->connection)
return 1;
if (!headers->ws_key)
return 1;
if (!headers->ws_sock_ver)
return 1;
return 0;
}
static void
SHA1Init(SHA1_CTX *context)
{
context->state[0] = 0x67452301;
context->state[1] = 0xEFCDAB89;
context->state[2] = 0x98BADCFE;
context->state[3] = 0x10325476;
context->state[4] = 0xC3D2E1F0;
context->count[0] = context->count[1] = 0;
}
void SHA1Transform(uint32_t state[5], uint8_t buffer[64])
{
uint32_t a, b, c, d, e;
typedef union
{
uint8_t c[64];
uint32_t l[16];
} CHAR64LONG16;
CHAR64LONG16 *block;
#ifdef SHA1HANDSOFF
static uint8_t workspace[64];
block = (CHAR64LONG16 *)(void *)workspace;
memcpy(block, buffer, 64);
#else
block = (CHAR64LONG16 *)(void *)buffer;
#endif
/* Copy context->state[] to working vars */
a = state[0];
b = state[1];
c = state[2];
d = state[3];
e = state[4];
/* 4 rounds of 20 operations each. Loop unrolled. */
R0(a, b, c, d, e, 0);
R0(e, a, b, c, d, 1);
R0(d, e, a, b, c, 2);
R0(c, d, e, a, b, 3);
R0(b, c, d, e, a, 4);
R0(a, b, c, d, e, 5);
R0(e, a, b, c, d, 6);
R0(d, e, a, b, c, 7);
R0(c, d, e, a, b, 8);
R0(b, c, d, e, a, 9);
R0(a, b, c, d, e, 10);
R0(e, a, b, c, d, 11);
R0(d, e, a, b, c, 12);
R0(c, d, e, a, b, 13);
R0(b, c, d, e, a, 14);
R0(a, b, c, d, e, 15);
R1(e, a, b, c, d, 16);
R1(d, e, a, b, c, 17);
R1(c, d, e, a, b, 18);
R1(b, c, d, e, a, 19);
R2(a, b, c, d, e, 20);
R2(e, a, b, c, d, 21);
R2(d, e, a, b, c, 22);
R2(c, d, e, a, b, 23);
R2(b, c, d, e, a, 24);
R2(a, b, c, d, e, 25);
R2(e, a, b, c, d, 26);
R2(d, e, a, b, c, 27);
R2(c, d, e, a, b, 28);
R2(b, c, d, e, a, 29);
R2(a, b, c, d, e, 30);
R2(e, a, b, c, d, 31);
R2(d, e, a, b, c, 32);
R2(c, d, e, a, b, 33);
R2(b, c, d, e, a, 34);
R2(a, b, c, d, e, 35);
R2(e, a, b, c, d, 36);
R2(d, e, a, b, c, 37);
R2(c, d, e, a, b, 38);
R2(b, c, d, e, a, 39);
R3(a, b, c, d, e, 40);
R3(e, a, b, c, d, 41);
R3(d, e, a, b, c, 42);
R3(c, d, e, a, b, 43);
R3(b, c, d, e, a, 44);
R3(a, b, c, d, e, 45);
R3(e, a, b, c, d, 46);
R3(d, e, a, b, c, 47);
R3(c, d, e, a, b, 48);
R3(b, c, d, e, a, 49);
R3(a, b, c, d, e, 50);
R3(e, a, b, c, d, 51);
R3(d, e, a, b, c, 52);
R3(c, d, e, a, b, 53);
R3(b, c, d, e, a, 54);
R3(a, b, c, d, e, 55);
R3(e, a, b, c, d, 56);
R3(d, e, a, b, c, 57);
R3(c, d, e, a, b, 58);
R3(b, c, d, e, a, 59);
R4(a, b, c, d, e, 60);
R4(e, a, b, c, d, 61);
R4(d, e, a, b, c, 62);
R4(c, d, e, a, b, 63);
R4(b, c, d, e, a, 64);
R4(a, b, c, d, e, 65);
R4(e, a, b, c, d, 66);
R4(d, e, a, b, c, 67);
R4(c, d, e, a, b, 68);
R4(b, c, d, e, a, 69);
R4(a, b, c, d, e, 70);
R4(e, a, b, c, d, 71);
R4(d, e, a, b, c, 72);
R4(c, d, e, a, b, 73);
R4(b, c, d, e, a, 74);
R4(a, b, c, d, e, 75);
R4(e, a, b, c, d, 76);
R4(d, e, a, b, c, 77);
R4(c, d, e, a, b, 78);
R4(b, c, d, e, a, 79);
state[0] += a;
state[1] += b;
state[2] += c;
state[3] += d;
state[4] += e;
a = b = c = d = e = 0;
}
static void
SHA1Update(SHA1_CTX *context, uint8_t *data, unsigned int len)
{
unsigned int i, j;
j = (context->count[0] >> 3) & 63;
if ((context->count[0] += len << 3) < (len << 3))
context->count[1]++;
context->count[1] += (len >> 29);
if ((j + len) > 63)
{
memcpy(&context->buffer[j], data, (i = 64 - j));
SHA1Transform(context->state, context->buffer);
for (; i + 63 < len; i += 64)
{
SHA1Transform(context->state, &data[i]);
}
j = 0;
}
else
i = 0;
memcpy(&context->buffer[j], &data[i], len - i);
}
static void
SHA1Final(uint8_t digest[20], SHA1_CTX *context)
{
uint32_t i, j;
uint8_t finalcount[8];
for (i = 0; i < 8; i++)
{
finalcount[i] = (uint8_t)((context->count[(i >= 4 ? 0 : 1)] >> ((3 - (i & 3)) * 8)) & 255); /* Endian independent */
}
SHA1Update(context, (uint8_t *)"\200", 1);
while ((context->count[0] & 504) != 448)
{
SHA1Update(context, (uint8_t *)"\0", 1);
}
SHA1Update(context, finalcount, 8); /* Should cause a SHA1Transform() */
for (i = 0; i < 20; i++)
{
digest[i] = (uint8_t)((context->state[i >> 2] >> ((3 - (i & 3)) * 8)) & 255);
}
/* Wipe variables */
i = j = 0;
memset(context->buffer, 0, 64);
memset(context->state, 0, 20);
memset(context->count, 0, 8);
memset(&finalcount, 0, 8);
#ifdef SHA1HANDSOFF /* make SHA1Transform overwrite its own static vars */
SHA1Transform(context->state, context->buffer);
#endif
}
static void
ws_sha1_digest(char *s, int len, unsigned char *digest)
{
SHA1_CTX sha;
SHA1Init(&sha);
SHA1Update(&sha, (uint8_t *)s, len);
SHA1Final(digest, &sha);
}
static void
ws_set_handshake_headers(ws_headers *headers)
{
size_t klen = strlen(headers->ws_key);
size_t mlen = strlen(WS_MAGIC_STR);
size_t len = klen + mlen;
char *s = xmalloc(klen + mlen + 1);
uint8_t digest[SHA_DIGEST_LENGTH];
memset(digest, 0, sizeof *digest);
memcpy(s, headers->ws_key, klen);
memcpy(s + klen, WS_MAGIC_STR, mlen + 1);
ws_sha1_digest(s, len, digest);
headers->ws_accept = base64_encode((unsigned char *)digest, sizeof(digest));
headers->ws_resp = xstrdup(WS_SWITCH_PROTO_STR);
if (!headers->upgrade)
headers->upgrade = xstrdup("websocket");
if (!headers->connection)
headers->connection = xstrdup("Upgrade");
free(s);
}
static void
ws_append_str(char **dest, const char *src)
{
size_t curlen = strlen(*dest);
size_t srclen = strlen(src);
size_t newlen = curlen + srclen;
char *str = xrealloc(*dest, newlen + 1);
memcpy(str + curlen, src, srclen + 1);
*dest = str;
}
int ws_send_handshake_headers(tcc_stream_request_t *r, ws_headers *headers)
{
int bytes = 0;
char *str = xstrdup("");
ws_append_str(&str, headers->ws_resp);
ws_append_str(&str, CRLF);
ws_append_str(&str, "Upgrade: ");
ws_append_str(&str, headers->upgrade);
ws_append_str(&str, CRLF);
ws_append_str(&str, "Connection: ");
ws_append_str(&str, headers->connection);
ws_append_str(&str, CRLF);
ws_append_str(&str, "Sec-WebSocket-Accept: ");
ws_append_str(&str, headers->ws_accept);
ws_append_str(&str, CRLF CRLF);
bytes = proto_server_send(r, str, strlen(str));
free(str);
return bytes;
}
/* Allocate memory for a websocket frame */
static WSFrame *
new_wsframe(void)
{
WSFrame *frame = xcalloc(1, sizeof(WSFrame));
memset(frame->buf, 0, sizeof(frame->buf));
frame->reading = 1;
return frame;
}
static int ws_get_data(WSClient *client, char *buffer, int size)
{
int len;
len = size;
if (client->msg.len < size)
{
len = client->msg.len;
}
memcpy(buffer, client->msg.data, len);
client->msg.data = client->msg.data + len;
client->msg.len = client->msg.len - len;
client->r->used_len = client->r->used_len + len;
return len;
}
static int
read_socket(WSClient *client, char *buffer, int size)
{
int bytes = 0;
bytes = ws_get_data(client, buffer, size);
return bytes;
}
/* Read a websocket frame's header.
*
* On success, the number of bytes read is returned. */
static int
ws_read_header(WSClient *client, WSFrame *frm, int pos, int need)
{
char *buf = frm->buf;
int bytes = 0;
if (client->msg.len == 0)
{
return 0;
}
/* read the first 2 bytes for basic frame info */
if ((bytes = read_socket(client, buf + pos, need)) < 1)
{
return bytes;
}
frm->buflen += bytes;
frm->buf[frm->buflen] = '\0'; /* null-terminate */
return bytes;
}
static int
ws_set_status(WSClient *client, WSStatus status, int bytes)
{
client->status = status;
return bytes;
}
static int
ws_set_front_header_fields(WSClient *client)
{
WSFrame **frm = &client->frame;
char *buf = (*frm)->buf;
(*frm)->fin = WS_FRM_FIN(*(buf));
(*frm)->masking = WS_FRM_MASK(*(buf + 1));
(*frm)->opcode = WS_FRM_OPCODE(*(buf));
(*frm)->res = WS_FRM_R1(*(buf)) || WS_FRM_R2(*(buf)) || WS_FRM_R3(*(buf));
/* should be masked and can't be using RESVd bits */
if (!(*frm)->masking || (*frm)->res)
{
proto_server_log(NJT_LOG_DEBUG, "tcc ws_set_front_header_fields masking=%d,res=%d", (*frm)->masking, (*frm)->res);
return ws_set_status(client, WS_ERR | WS_CLOSE, 1);
}
return 0;
}
/* Set the extended payload length into the given pointer. */
static void
ws_set_extended_header_size(const char *buf, int *extended)
{
uint64_t payloadlen = 0;
/* determine the payload length, else read more data */
payloadlen = WS_FRM_PAYLOAD(*(buf + 1));
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
*extended = 2;
break;
case WS_PAYLOAD_EXT64:
*extended = 8;
break;
}
}
/* Set the masking key into our frame structure. */
static void
ws_set_masking_key(WSFrame *frm, const char *buf)
{
uint64_t payloadlen = 0;
/* determine the payload length, else read more data */
payloadlen = WS_FRM_PAYLOAD(*(buf + 1));
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
memcpy(&frm->mask, buf + 4, sizeof(frm->mask));
break;
case WS_PAYLOAD_EXT64:
memcpy(&frm->mask, buf + 10, sizeof(frm->mask));
break;
default:
memcpy(&frm->mask, buf + 2, sizeof(frm->mask));
}
}
/* Set the extended payload length into our frame structure. */
static void
ws_set_payloadlen(WSFrame *frm, const char *buf)
{
uint64_t payloadlen = 0, len64;
uint16_t len16;
/* determine the payload length, else read more data */
payloadlen = WS_FRM_PAYLOAD(*(buf + 1));
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
memcpy(&len16, (buf + 2), sizeof(uint16_t));
frm->payloadlen = ntohs(len16);
break;
case WS_PAYLOAD_EXT64:
memcpy(&len64, (buf + 2), sizeof(uint64_t));
frm->payloadlen = be64toh(len64);
break;
default:
frm->payloadlen = payloadlen;
}
}
static int
ws_realloc_frm_payload(WSFrame *frm, WSMessage *msg)
{
char *tmp = NULL;
uint64_t newlen = 0;
newlen = msg->payloadsz + frm->payloadlen;
tmp = realloc(msg->payload, newlen);
if (tmp == NULL && newlen > 0)
{
free(msg->payload);
msg->payload = NULL;
return 1;
}
msg->payload = tmp;
return 0;
}
static void
ws_unmask_payload(char *buf, int len, int offset, unsigned char mask[])
{
int i, j = 0;
/* unmask data */
for (i = offset; i < len; ++i, ++j)
{
buf[i] ^= mask[j % 4];
}
}
static int
ws_error(tcc_stream_request_t *r, unsigned short code, const char *err)
{
unsigned int len;
unsigned short code_be;
char buf[128] = {0};
len = 2;
code_be = htobe16(code);
memcpy(buf, &code_be, 2);
if (err)
len += snprintf(buf + 2, sizeof buf - 4, "%s", err);
return ws_send_frame(r, WS_OPCODE_CLOSE, buf, len);
}
static int
ws_read_payload(WSClient *client, WSMessage *msg, int pos, int need)
{
char *buf = msg->payload;
int bytes = 0;
/* read the first 2 bytes for basic frame info */
if ((bytes = read_socket(client, buf + pos, need)) < 1)
{
if (client->status & WS_CLOSE)
ws_error(client->r, WS_CLOSE_UNEXPECTED, "Unable to read payload");
return bytes;
}
msg->buflen += bytes;
msg->payloadsz += bytes;
return bytes;
}
static void
ws_free_message(WSClient *client)
{
if (client->message && client->message->payload)
free(client->message->payload);
if (client->message)
free(client->message);
client->message = NULL;
}
static uint32_t
verify_utf8(uint32_t *state, const char *str, int len)
{
int i;
uint32_t type;
for (i = 0; i < len; ++i)
{
type = utf8d[(uint8_t)str[i]];
*state = utf8d[256 + (*state) * 16 + type];
if (*state == UTF8_INVAL)
break;
}
return *state;
}
int ws_validate_string(const char *str, int len)
{
uint32_t state = UTF8_VALID;
if (verify_utf8(&state, str, len) == UTF8_INVAL)
{
return 1;
}
if (state != UTF8_VALID)
{
return 1;
}
return 0;
}
static int
ws_handle_err(WSClient *client, unsigned short code, WSStatus status, const char *m)
{
client->status = status;
return ws_error(client->r, code, m);
}
static void
ws_handle_text_bin(WSClient *client, WSServer *server)
{
tcc_str_t content, out_data;
WSFrame **frm = &client->frame;
WSMessage **msg = &client->message;
int offset = (*msg)->mask_offset;
if ((*frm)->opcode == WS_OPCODE_CONTINUATION)
{
// proto_server_log(NJT_LOG_DEBUG,"2 tcc websocket CONTINUATION\n");
}
/* All data frames after the initial data frame must have opcode 0 */
if ((*msg)->fragmented && (*frm)->opcode != WS_OPCODE_CONTINUATION)
{
client->status = WS_ERR | WS_CLOSE;
return;
}
/* RFC states that there is a new masking key per frame, therefore,
* time to unmask... */
ws_unmask_payload((*msg)->payload, (*msg)->payloadsz, offset, (*frm)->mask);
/* Done with the current frame's payload */
(*msg)->buflen = 0;
/* Reading a fragmented frame */
(*msg)->fragmented = 1;
content.data = (*msg)->payload;
content.len = (*msg)->payloadsz;
if (!(*frm)->fin)
{
proto_server_log(NJT_LOG_DEBUG, "tcc frm CONTINUATION ws_get_frm_payload = %V!", &content);
return;
}
else
{
proto_server_log(NJT_LOG_DEBUG, "tcc frm ws_get_frm_payload = %V!", &content);
}
// proto_server_log(NJT_LOG_DEBUG, "2 tcc ws_get_frm_payload = %V!",&content);
/* validate text data encoded as UTF-8 */
if ((*msg)->opcode == WS_OPCODE_TEXT)
{
if (ws_validate_string((*msg)->payload, (*msg)->payloadsz) != 0)
{
ws_handle_err(client, WS_CLOSE_INVALID_UTF8, WS_ERR | WS_CLOSE, NULL);
proto_server_log(NJT_LOG_DEBUG, "3 tcc ws_get_frm_payload = %V!", &content);
return;
}
}
if ((*msg)->opcode != WS_OPCODE_CONTINUATION)
{
// ws_write_fifo (server->pipeout, (*msg)->payload, (*msg)->payloadsz);
}
content.data = (*msg)->payload;
content.len = (*msg)->payloadsz;
if ((*msg)->payloadsz > 0)
{
client->message->headers = &client->headers;
ws_app_on_message(client->r, client->message);
// free(out_data.data);
}
// proto_server_log(NJT_LOG_DEBUG, "5 tcc ws_get_frm_payload = %V!", &content);
ws_free_message(client);
}
static void
ws_handle_pong(WSClient *client)
{
WSFrame **frm = &client->frame;
if (!(*frm)->fin)
{
return;
}
ws_free_message(client);
}
static int
ws_respond(tcc_stream_request_t *r, const char *buffer, int len)
{
int bytes = 0;
// size_t length = len;
proto_server_send(r, (char *)buffer, len);
return bytes;
}
static int
ws_send_frame(tcc_stream_request_t *r, WSOpcode opcode, const char *p, int sz)
{
unsigned char buf[32] = {0};
char *frm = NULL;
uint64_t payloadlen = 0, u64;
int hsize = 2;
if (sz < 126)
{
payloadlen = sz;
}
else if (sz < (1 << 16))
{
payloadlen = WS_PAYLOAD_EXT16;
hsize += 2;
}
else
{
payloadlen = WS_PAYLOAD_EXT64;
hsize += 8;
}
buf[0] = 0x80 | ((uint8_t)opcode);
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
buf[1] = WS_PAYLOAD_EXT16;
buf[2] = (sz & 0xff00) >> 8;
buf[3] = (sz & 0x00ff) >> 0;
break;
case WS_PAYLOAD_EXT64:
buf[1] = WS_PAYLOAD_EXT64;
u64 = htobe64(sz);
memcpy(buf + 2, &u64, sizeof(uint64_t));
break;
default:
buf[1] = (sz & 0xff);
}
frm = xcalloc(hsize + sz, sizeof(unsigned char));
memcpy(frm, buf, hsize);
if (p != NULL && sz > 0)
memcpy(frm + hsize, p, sz);
ws_respond(r, frm, hsize + sz);
free(frm);
return 0;
}
int ws_generate_frame(WSOpcode opcode, const char *p, int sz, tcc_str_t *out_message)
{
unsigned char buf[32] = {0};
char *frm = NULL;
uint64_t payloadlen = 0, u64;
int hsize = 2;
if (sz < 126)
{
payloadlen = sz;
}
else if (sz < (1 << 16))
{
payloadlen = WS_PAYLOAD_EXT16;
hsize += 2;
}
else
{
payloadlen = WS_PAYLOAD_EXT64;
hsize += 8;
}
buf[0] = 0x80 | ((uint8_t)opcode);
switch (payloadlen)
{
case WS_PAYLOAD_EXT16:
buf[1] = WS_PAYLOAD_EXT16;
buf[2] = (sz & 0xff00) >> 8;
buf[3] = (sz & 0x00ff) >> 0;
break;
case WS_PAYLOAD_EXT64:
buf[1] = WS_PAYLOAD_EXT64;
u64 = htobe64(sz);
memcpy(buf + 2, &u64, sizeof(uint64_t));
break;
default:
buf[1] = (sz & 0xff);
}
frm = xcalloc(hsize + sz, sizeof(unsigned char));
memcpy(frm, buf, hsize);
if (p != NULL && sz > 0)
memcpy(frm + hsize, p, sz);
out_message->data = frm;
out_message->len = hsize + sz;
return 0;
}
static void
ws_handle_ping(WSClient *client)
{
WSFrame **frm = &client->frame;
tcc_str_t content;
WSMessage **msg = &client->message;
char *buf = NULL, *tmp = NULL;
int pos = 0, len = (*frm)->payloadlen, newlen = 0;
/* RFC states that Control frames themselves MUST NOT be
* fragmented. */
if (!(*frm)->fin)
{
ws_handle_err(client, WS_CLOSE_PROTO_ERR, WS_ERR | WS_CLOSE, NULL);
return;
}
/* Control frames are only allowed to have payload up to and
* including 125 octets */
if ((*frm)->payloadlen > 125)
{
ws_handle_err(client, WS_CLOSE_PROTO_ERR, WS_ERR | WS_CLOSE, NULL);
return;
}
/* No payload from ping */
if (len == 0)
{
// ws_send_frame(client->r, WS_OPCODE_PONG, NULL, 0);
client->message->headers = &client->headers;
ws_app_on_message(client->r, client->message);
return;
}
/* Copy the ping payload */
pos = (*msg)->payloadsz - len;
buf = xcalloc(len, sizeof(char));
memcpy(buf, (*msg)->payload + pos, len);
/* Unmask it */
ws_unmask_payload(buf, len, 0, (*frm)->mask);
/* Resize the current payload (keep an eye on this realloc) */
newlen = (*msg)->payloadsz - len;
tmp = realloc((*msg)->payload, newlen);
if (tmp == NULL && newlen > 0)
{
free((*msg)->payload);
free(buf);
(*msg)->payload = NULL;
client->status = WS_ERR | WS_CLOSE;
return;
}
(*msg)->payload = tmp;
(*msg)->payloadsz -= len;
content.data = buf;
content.len = len;
// proto_server_log(NJT_LOG_DEBUG, "tcc ping!");
ws_send_frame(client->r, WS_OPCODE_PONG, buf, len);
// proto_server_log(NJT_LOG_DEBUG, "tcc ping len=%d,payloadsz=%d!",len,(*msg)->payloadsz);
client->message->headers = &client->headers;
ws_app_on_message(client->r, client->message);
(*msg)->buflen = 0; /* done with the current frame's payload */
/* Control frame injected in the middle of a fragmented message. */
if (!(*msg)->fragmented)
{
ws_free_message(client);
}
free(buf);
}
static int
ws_handle_close(WSClient *client)
{
client->status = WS_ERR | WS_CLOSE;
return ws_send_frame(client->r, WS_OPCODE_CLOSE, NULL, 0);
}
static void
ws_manage_payload_opcode(WSClient *client, WSServer *server)
{
WSFrame **frm = &client->frame;
WSMessage **msg = &client->message;
switch ((*frm)->opcode)
{
case WS_OPCODE_CONTINUATION:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket CONTINUATION\n");
/* first frame can't be a continuation frame */
if (!(*msg)->fragmented)
{
client->status = WS_ERR | WS_CLOSE;
break;
}
ws_handle_text_bin(client, server);
break;
case WS_OPCODE_TEXT:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket TEXT\n");
client->message->opcode = (*frm)->opcode;
ws_handle_text_bin(client, server);
break;
case WS_OPCODE_BIN:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket BIN\n");
client->message->opcode = (*frm)->opcode;
ws_handle_text_bin(client, server);
break;
case WS_OPCODE_PONG:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket PONG\n");
client->message->opcode = (*frm)->opcode;
ws_handle_pong(client);
break;
case WS_OPCODE_PING:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket PING\n");
client->message->opcode = (*frm)->opcode;
ws_handle_ping(client);
break;
default:
proto_server_log(NJT_LOG_DEBUG, "tcc websocket CLOSE\n");
ws_handle_close(client);
}
}
static void
ws_free_frame(WSClient *client)
{
if (client->frame)
free(client->frame);
client->frame = NULL;
}
static WSMessage *
new_wsmessage(void)
{
WSMessage *msg = xcalloc(1, sizeof(WSMessage));
return msg;
}
static int
ws_get_frm_payload(WSClient *client, WSServer *server)
{
WSFrame **frm = NULL;
WSMessage **msg = NULL;
int bytes = 0, readh = 0, need = 0;
if (client->message == NULL)
client->message = new_wsmessage();
frm = &client->frame;
msg = &client->message;
/* message within the same frame */
if ((*msg)->payload == NULL && (*frm)->payloadlen)
(*msg)->payload = xcalloc((*frm)->payloadlen, sizeof(char));
/* handle a new frame */
else if ((*msg)->buflen == 0 && (*frm)->payloadlen)
{
if (ws_realloc_frm_payload((*frm), (*msg)) == 1)
return ws_set_status(client, WS_ERR | WS_CLOSE, 0);
}
readh = (*msg)->buflen; /* read from so far */
need = (*frm)->payloadlen - readh; /* need to read */
if (need > 0)
{
if ((bytes = ws_read_payload(client, (*msg), (*msg)->payloadsz, need)) < 0)
return bytes;
if (bytes != need)
return ws_set_status(client, WS_READING, bytes);
}
(*msg)->mask_offset = (*msg)->payloadsz - (*msg)->buflen;
ws_manage_payload_opcode(client, server);
ws_free_frame(client);
return bytes;
}
static int
ws_get_frm_header(WSClient *client)
{
WSFrame **frm = NULL;
int bytes = 0, readh = 0, need = 0, offset = 0, extended = 0;
if (client->frame == NULL)
{
client->frame = new_wsframe();
proto_server_log(NJT_LOG_DEBUG, "tcc new_wsframe!");
}
else
{
proto_server_log(NJT_LOG_DEBUG, "tcc client->frame->reading=%d!", client->frame->reading);
}
frm = &client->frame;
/* Read the first 2 bytes for basic frame info */
readh = (*frm)->buflen; /* read from header so far */
need = 2 - readh; /* need to read */
if (need > 0)
{
if ((bytes = ws_read_header(client, (*frm), readh, need)) < 1)
{
// proto_server_log(NJT_LOG_DEBUG, "1 tcc read %d!",bytes);
return bytes;
}
if (bytes != need)
{
// proto_server_log(NJT_LOG_DEBUG, "2 tcc read %d!",bytes);
return ws_set_status(client, WS_READING, bytes);
}
}
offset += 2;
if (ws_set_front_header_fields(client) != 0)
{
// proto_server_log(NJT_LOG_DEBUG, "3 tcc read %d!",bytes);
return bytes;
}
ws_set_extended_header_size((*frm)->buf, &extended);
/* read the extended header */
readh = (*frm)->buflen; /* read from header so far */
need = (extended + offset) - readh; /* read from header field so far */
if (need > 0)
{
if ((bytes = ws_read_header(client, (*frm), readh, need)) < 1)
{
// proto_server_log(NJT_LOG_DEBUG, "4 tcc read %d!",bytes);
return bytes;
}
if (bytes != need)
{
// proto_server_log(NJT_LOG_DEBUG, "5 tcc read %d!",bytes);
return ws_set_status(client, WS_READING, bytes);
}
}
offset += extended;
/* read the masking key */
readh = (*frm)->buflen; /* read from header so far */
need = (4 + offset) - readh;
if (need > 0)
{
if ((bytes = ws_read_header(client, (*frm), readh, need)) < 1)
{
// proto_server_log(NJT_LOG_DEBUG, "6 tcc read %d!",bytes);
return bytes;
}
if (bytes != need)
{
// proto_server_log(NJT_LOG_DEBUG, "7 tcc read %d!",bytes);
return ws_set_status(client, WS_READING, bytes);
}
}
offset += 4;
ws_set_payloadlen((*frm), (*frm)->buf);
ws_set_masking_key((*frm), (*frm)->buf);
if ((*frm)->payloadlen > max_frm_size)
{
// proto_server_log(NJT_LOG_DEBUG, "8 tcc read %d!",bytes);
return ws_set_status(client, WS_ERR | WS_CLOSE, bytes);
}
(*frm)->buflen = 0;
(*frm)->reading = 0;
(*frm)->payload_offset = offset;
// proto_server_log(NJT_LOG_DEBUG, "9 tcc read %d!",bytes);
return ws_set_status(client, WS_OK, bytes);
}
static int
ws_get_message(WSClient *client, WSServer *server)
{
int bytes = 0;
if ((client->frame == NULL) || (client->frame->reading))
{
if ((bytes = ws_get_frm_header(client)) < 1 || client->frame->reading)
{
proto_server_log(NJT_LOG_DEBUG, "tcc ws_get_frm_header bytes=%d!", bytes);
return bytes;
}
}
proto_server_log(NJT_LOG_DEBUG, "tcc ws_get_frm_payload!");
return ws_get_frm_payload(client, server);
return 1;
}
//===============================================================
int proto_server_process_connection(tcc_stream_request_t *r)
{
char ch = 'a';
tcc_str_t ip = njt_string("127.0.0.1");
if (ip.len == r->addr_text->len && memcmp((void *)r->addr_text->data, ip.data,ip.len) == 0)
{
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%V,NJT_STREAM_FORBIDDEN !",r->addr_text);
}
proto_server_log(NJT_LOG_DEBUG, "1 tcc connetion ip=%Xd ok!",ch);
return NJT_OK;
}
int proto_server_process_preread(tcc_stream_request_t *r, tcc_str_t *msg)
{
proto_server_log(NJT_LOG_DEBUG, "2 tcc preread ip=%V ok!",r->addr_text);
proto_server_log(NJT_LOG_DEBUG, "tcc preread msg=%V!",msg);
r->used_len = msg->len;
return NJT_DECLINED;
}
int proto_server_process_log(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "4 tcc log ip=%V ok!",r->addr_text);
return NJT_OK;
}
int proto_server_process_message(tcc_stream_request_t *r, tcc_str_t *msg)
{
char *data = NULL;
WSctx *cli_ctx;
int bytes;
int rc;
char *p;
WSMessage message;
WSServer *server;
tcc_str_t end_flag = njt_string("\r\n\r\n");
proto_server_log(NJT_LOG_DEBUG, "3 tcc content tcc get=%V,len=%d", msg, msg->len);
cli_ctx = tcc_get_client_ctx(r, TCC_PROTO_CTX_ID);
if (cli_ctx == NULL)
{
cli_ctx = proto_malloc(r, sizeof(WSctx));
if(cli_ctx == NULL) {
return NJT_ERROR;
}
memset(cli_ctx, 0, sizeof(WSctx));
cli_ctx->client.r = r;
tcc_set_client_ctx(r,TCC_PROTO_CTX_ID,cli_ctx);
}
if (cli_ctx->handshake == 0)
{
p = njt_strlcasestrn(msg->data,msg->data + msg->len,end_flag.data,end_flag.len - 1);
if (p == NULL)
{
cli_ctx->handshake = 1;
}
}
if (cli_ctx->handshake == 0)
{
cli_ctx->client.headers.buflen = msg->len;
memcpy(cli_ctx->client.headers.buf, msg->data, cli_ctx->client.headers.buflen);
cli_ctx->client.headers.buf[cli_ctx->client.headers.buflen] = '\0';
data = cli_ctx->client.headers.buf;
if (strstr(data, "\r\n\r\n") == NULL)
{
proto_server_log(NJT_LOG_DEBUG, "tcc http error!");
return NJT_AGAIN;
}
if (parse_headers(&cli_ctx->client.headers) != 0)
{
proto_server_log(NJT_LOG_DEBUG, "tcc content parse_headers error!");
return NJT_ERROR;
}
if (ws_verify_req_headers(&cli_ctx->client.headers) != 0)
{
proto_server_log(NJT_LOG_DEBUG, "tcc content ws_verify_req_headers error!");
return NJT_ERROR;
}
ws_set_handshake_headers(&cli_ctx->client.headers);
njt_memzero(&message, sizeof(WSMessage));
message.headers = &cli_ctx->client.headers;
rc = ws_app_on_connection(r, &message);
if (rc == NJT_OK)
{
cli_ctx->handshake = WS_HANDSHAKE_OK;
cli_ctx->client.r->used_len = msg->len;
proto_server_log(NJT_LOG_DEBUG, "3 tcc content WS_HANDSHAKE_OK [%p,%p]!", cli_ctx, cli_ctx->client);
}
return NJT_OK;
}
else
{
if (msg->len > 0)
{
cli_ctx->client.msg = *msg;
server = tcc_client_get_srv_ctx(r);
bytes = ws_get_message(&cli_ctx->client, server);
}
}
proto_server_log(NJT_LOG_DEBUG, "tcc get ws data3 msg->len=%d,used_len=%d!", msg->len, cli_ctx->client.r->used_len);
if (r->used_len != msg->len)
{
return NJT_AGAIN;
}
return NJT_OK;
}
int proto_server_process_client_update(tcc_stream_request_t *r)
{
//ws_app_client_update(r);
return NJT_OK;
}
int proto_server_process_connection_close(tcc_stream_request_t *r)
{
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_process_connection_close!");
ws_app_on_close(r);
return NJT_OK;
}
int proto_server_update(tcc_stream_server_ctx *srv_ctx)
{
//ws_app_server_update(srv_ctx);
return NJT_OK;
}
int proto_server_init(tcc_stream_server_ctx *srv_ctx)
{
WSServer *srv_data = proto_malloc(srv_ctx, sizeof(WSServer));
if (srv_data != NULL)
{
tcc_set_srv_ctx(srv_ctx, srv_data);
}
ws_app_server_init(srv_ctx);
return NJT_OK;
}
int proto_server_upstream_message(tcc_stream_request_t *r, tcc_str_t *msg){
u_char *p;
tcc_str_t end_tok = njt_string("\r\n\r\n");
tcc_str_t add_data = njt_string("\r\ntcc-down-set: downset");
tcc_str_t new_msg;
tcc_str_t old_end_msg;
proto_server_log(NJT_LOG_DEBUG, "tcc from upstream msg=%V",msg);
p = njt_strlcasestrn(msg->data,msg->data + msg->len,end_tok.data,end_tok.len - 1);
if (p == NULL) {
proto_server_send(r,msg->data,msg->len);
} else {
new_msg.data = msg->data;
new_msg.len = p - msg->data;
old_end_msg.data = p;
old_end_msg.len = msg->data + msg->len - old_end_msg.data;
proto_server_send(r,new_msg.data,new_msg.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client msg1=%V",&new_msg);
proto_server_send(r,add_data.data,add_data.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client msg2=%V",&add_data);
proto_server_send(r,old_end_msg.data,old_end_msg.len);
proto_server_log(NJT_LOG_DEBUG, "tcc send client len=%d,msg3=%V",old_end_msg.len,&old_end_msg);
}
r->used_len = msg->len;
return NJT_OK;
}
void* proto_server_check_upstream_peer(tcc_stream_client_upstream_data_t *cli_ups_info) {
int i;
tcc_uint_t weight;
int mask_len = 7;
tcc_stream_upstream_rr_peer_t *peer_list = cli_ups_info->peer_list;
for(i = 0; i < cli_ups_info->peer_num; i++) {
if(peer_list[i].server->len >= mask_len && cli_ups_info->cli_addr_text->len >= mask_len && njt_memcmp(peer_list[i].server->data,cli_ups_info->cli_addr_text->data,mask_len) == 0) {
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "1 tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
}
if(cli_ups_info->peer_num > 0) {
i = 0;
weight = proto_get_peer_weight(peer_list[i].peer);
proto_server_log(NJT_LOG_DEBUG, "default tcc upstream server=%V,%d!",peer_list[i].server,weight);
return peer_list[i].peer;
}
return NULL; // NJT_BUSY;
}
int proto_server_upstream_connection_close(tcc_stream_request_t *r) {
proto_server_log(NJT_LOG_DEBUG, "tcc proto_server_upstream_connection_close!");
}
typedef struct{
WSOpcode code;
char *msg;
size_t msg_len;
} ws_in_data_t;
int proto_server_create_message(tcc_stream_server_ctx *srv_ctx,void *in_data,tcc_str_t *out_data){
ws_in_data_t *data;
data = (ws_in_data_t *)in_data;
ws_generate_frame(data->code, data->msg, data->msg_len, out_data);
}
tcc_ws_py.c
#include <tcclib.h>
#include <njt_tcc.h>
#include <tcc_ws.h>
#include <ctype.h>
typedef struct app_server_s {
/* Server Status */
int gen_client_id;
int server_id;
} app_server_t;
typedef struct app_client_s {
/* Server Status */
tcc_str_t *init_data;
int id;
} app_client_t;
int ws_app_on_connection(tcc_stream_request_t *r,WSMessage *msg) {
app_server_t *app_server;
app_client_t *app_client = proto_malloc(r,sizeof(app_client_t));
if(app_client != NULL) {
app_client->init_data = r->addr_text;
app_server = tcc_client_get_app_srv_ctx(r);
app_client->id = ++app_server->gen_client_id;
tcc_set_client_app_ctx(r,app_client);
}
ws_send_handshake_headers(r, msg->headers);
if(msg->headers != NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_on_connection path: %s!",msg->headers->path);
}
return NJT_OK;
}
int ws_app_on_message(tcc_stream_request_t *r,WSMessage *msg) {
tcc_str_t data,out_data;
tcc_str_t buffer;
u_char *p;
int len;
app_client_t *app_client = tcc_get_client_app_ctx(r);
if(app_client == NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc app_client null!");
return NJT_ERROR;
}
app_server_t *app_server = tcc_client_get_app_srv_ctx(r);
if(app_server == NULL) {
proto_server_log(NJT_LOG_DEBUG, "tcc app_server null!");
return NJT_ERROR;
}
if(msg->opcode == WS_OPCODE_PING) {
proto_server_log(NJT_LOG_DEBUG, "1 tcc from ws_app_on_message ping!");
return NJT_OK;
}
proto_server_log(NJT_LOG_DEBUG, "1 tcc from ws_app_on_message msg->opcode=%d!",msg->opcode);
data.data = msg->payload;
data.len = msg->payloadsz;
len = msg->payloadsz + app_client->init_data->len + 100;
buffer.data = proto_malloc(r,len);
if(buffer.data == NULL) {
return NJT_ERROR;
}
buffer.len = len;
njt_memzero(buffer.data,len);
njt_memcpy(buffer.data, data.data, data.len);
// printf("need call python handler\n");
// for (int i = 0; i < 1; i++) {
// for (int i = 0; i < 1000000; i++) {
njt_stream_proto_python_on_msg(r, data.data, data.len);
// }
proto_free(r, buffer.data);
return NJT_OK;
}
int ws_app_on_close(tcc_stream_request_t *r) {
app_client_t *app_client = tcc_get_client_app_ctx(r);
if(app_client != NULL) {
proto_free(r,app_client);
}
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_on_close!");
return NJT_OK;
}
int ws_app_client_update(tcc_stream_request_t *r) {
app_client_t *app_client = tcc_get_client_app_ctx(r);
tcc_str_t data = njt_string("tcc ws_app_client_update!\n");
if(app_client != NULL) {
//ws_send_frame(r, WS_OPCODE_TEXT, data.data, data.len);
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_client_update !");
}
return NJT_OK;
}
int ws_app_server_update(tcc_stream_server_ctx *srv_ctx)
{
tcc_str_t data = njt_string("tcc ws_app_server_update!\n");
tcc_str_t out_data;
app_server_t * srv_data = tcc_get_app_srv_ctx(srv_ctx);
if(srv_data) {
ws_generate_frame(WS_OPCODE_TEXT, data.data, data.len, &out_data);
//proto_server_send_broadcast(srv_ctx,out_data.data, out_data.len);
if(out_data.len > 0) {
free(out_data.data);
}
// proto_server_send_broadcast(srv_ctx,buf,strlen(buf));
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_server_update !");
}
return NJT_OK;
}
int ws_app_server_init(tcc_stream_server_ctx *srv_ctx) {
app_server_t *app_server = proto_malloc(srv_ctx,sizeof(app_server_t));
if(app_server != NULL) {
app_server->gen_client_id = 0;
app_server->server_id = 1;
tcc_set_app_srv_ctx(srv_ctx,app_server);
}
proto_server_log(NJT_LOG_DEBUG, "tcc from ws_app_server_init!");
return NJT_OK;
}
ws_py.py
def on_msg(r):
import time
msg = r.msg
r.send("py test ok")
r.send("py test send {}".format(msg))
r.log("=======TEST==========================")
r.send("test log ok")
使用websocket连接22223
向ws server发送数据123abc,收到py test ok
日志中记录收到的123abc,以及返回值。