/* BSD-2-Clause license * * Copyright (c) 2018-2023 NST , sss . * */ #include #include #include #include #include #include #include #include #include #include #include /* * HTTP and Websocket serialization/deserializtion headers */ #include #include #include #include "globals.h" #include "ws_session.h" #include "ws_server_internals.h" #include "ws_protocol.h" #include "thread_impl.h" #include "utilities.h" #include "wrdp_thpool.h" #include "wrdp_thpool_internals.h" #include "task.h" #include "socket_helpers.h" #include "log.h" static int ws_server_socket = -1; static int ws_server_socket_unix = -1; int ws_server_init() { if (g_globals.settings.ws_port <= 0) { ws_server_socket = -1; return ws_server_socket; } ws_server_socket = create_listen_socket_tcp(g_globals.settings.ws_port); if (ws_server_socket != -1) { socket_make_non_block(ws_server_socket); } return ws_server_socket; } int ws_server_init_unix() { if (!g_globals.settings.ws_socket_path || !g_globals.settings.ws_socket_path[0]) { ws_server_socket_unix = -1; return ws_server_socket_unix; } ws_server_socket_unix = create_listen_socket_unix(g_globals.settings.ws_socket_path); if (ws_server_socket_unix != -1) { socket_make_non_block(ws_server_socket_unix); } return ws_server_socket_unix; } static bool check_http_header_value_no_case(const struct phr_header *hdr, const char *value) { if (hdr->value_len != strlen(value)) return false; if (!strncasecmp(hdr->value, value, strlen(value))) return true; return false; } /*static bool check_http_header_value(const struct phr_header *hdr, const char *value) { if (memmem(hdr->value, hdr->value_len, value, strlen(value))) return true; return false; }*/ static struct phr_header * find_http_header( struct phr_header *headers, size_t num_headers, const char *header_name) { size_t i; for (i = 0; i != num_headers; ++i) { if (!strncmp(headers[i].name, header_name, headers[i].name_len)) return &headers[i]; } return 0; } #ifdef DEBUG static void print_http_header(const struct phr_header header, char *buf) { char msg_buf[4096] = {0}; char name_buf[1024] = {0}, value_buf[3072] = {0}; memcpy(name_buf, header.name, header.name_len); memcpy(value_buf, header.value, header.value_len); snprintf(msg_buf, 4095, "\n%s: %s", name_buf, value_buf); strcat(buf, msg_buf); } static void print_http_headers(struct phr_header *headers, size_t num_headers) { size_t i; char msg[81920] = {0}; strcpy(msg, "http headers:\n"); for (i = 0; i != num_headers; ++i) { print_http_header(headers[i], msg); } log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_trace, 0); } #endif typedef struct { struct phr_header *hdr_key, *hdr_ext; const char *path; size_t path_len; } http_get_request_info; #define WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" static int create_accept_key(uint8_t *dst, const char *client_key, size_t *base64_len) { uint8_t sha1buf[20], key_src[60]; memcpy(key_src, client_key, 24); memcpy(key_src + 24, WS_GUID, 36); if (!sha1(sha1buf, key_src, sizeof(key_src))) return false; return base64_encode(sha1buf, 20, dst, 60, base64_len); } typedef struct { size_t pos; /* Should not be touched by user */ ev_io *socket_fd_writeable; bool can_write; } ws_s_a_s_i_internals; typedef struct { /* initialized task info */ ws_session *session; /* user allocated buffer filled with data for sending via socket * freed by function */ void *buf; /* buffer size */ size_t buf_size; /* initialzed connection socket returned by accept * set in non-blocking mode */ int socket_fd; void (*write_done_cb)(ws_session *session, bool success); ws_s_a_s_i_internals internals; } ws_server_async_send_info; static void ev_connection_writeable_cb(struct ev_loop *loop, ev_io *w, int revents) { ws_server_async_send_info *w_info = w->data; ssize_t len = 0; if ((len = send(w_info->socket_fd, w_info->buf + w_info->internals.pos, w_info->buf_size - w_info->internals.pos, MSG_DONTWAIT)) == -1) { if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { perror("send"); /* TODO: handle send error */ w_info->write_done_cb(w_info->session, false); goto cleanup; } } w_info->internals.pos += len; if (w_info->internals.pos == w_info->buf_size) { /* Writing done. * Doing cleanup. */ w_info->write_done_cb(w_info->session, true); goto cleanup; } return; cleanup: ev_io_stop(loop, w); free(w); /* NOTE: this is "info->internals.socket_fd_writeable" */ free(w_info->buf); free(w_info); } /* ws_server_async_send_info *info must be allocated by caller * will be freed by function */ static bool ws_server_async_send(ws_server_async_send_info *info) { info->internals.socket_fd_writeable = calloc(1, sizeof(ev_io)); wrdp_thpool_task *task = info->session->wrdp_thpool_task; if (!info->internals.socket_fd_writeable) { /* TODO: handle allocation error */ free(info->buf); perror("calloc"); return false; } ev_io_init(info->internals.socket_fd_writeable, ev_connection_writeable_cb, info->socket_fd, EV_WRITE); info->internals.socket_fd_writeable->data = info; ev_io_start( task->thread->ev_th_loop, info->internals.socket_fd_writeable); return true; } static ssize_t ws_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) { ws_session *session = user_data; ssize_t r = 0; int sflags = MSG_DONTWAIT; r = send(session->connection_fd, data, len, sflags); if (r == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); } else { wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); } } return r; } static ssize_t ws_recv_callback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data) { ws_session *session = user_data; ssize_t r; r = recv(session->connection_fd, buf, len, MSG_DONTWAIT); if (r == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); } else { wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); task_destroy_client_connection(session); } } else if (r == 0) { /* Unexpected EOF is also treated as an error */ wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); r = -1; } return r; } static void ws_on_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *_ws_session) { if (!wslay_is_ctrl_frame(arg->opcode)) { if (!ws_handle_message(arg, _ws_session)) { log_msg_info i; i.ws_session = _ws_session; i.level = wrdp_log_level_warning; i.buf = (const uint8_t *)"Failed to handle websocket message."; log_msg_ex(&i); } } } void ws_server_websocket_init(ws_session *session) { struct wslay_event_callbacks callbacks = {ws_recv_callback, ws_send_callback, NULL, NULL, NULL, NULL, ws_on_msg_recv_callback}; /* TODO: set TCP_NODELAY only for tcp socket, and not for unix */ /* int val = 1; if (setsockopt (server->connection_fd, IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t)sizeof (val)) == -1) { perror ("setsockopt: TCP_NODELAY"); thr_destroy_task (task); return; } */ wslay_event_context_server_init( &(session->wslay_ctx), &callbacks, session); } static void ws_server_send_http_reply_w_cb(ws_session *session, bool success) { if (!success) { log_msg_info i; i.ws_session = session; i.level = wrdp_log_level_warning; i.buf = (const uint8_t *)"ws_session: Failed to send reply " "with http protocol upgrade."; log_msg_ex(&i); /* TODO: handle error */ return; } session->http_state = ws_server_state_ws_running; ws_server_websocket_init(session); } static void ws_server_send_http_reply(http_get_request_info info, ws_session *session) { char accept_key[60] = {0}, res_header[255] = {0}; int header_len = 0; size_t base64_len = 0 /*, written = 0*/; create_accept_key( (uint8_t *)accept_key, info.hdr_key->value, &base64_len); header_len = snprintf(res_header, sizeof(res_header), "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %.*s\r\n" "\r\n", (int)base64_len, accept_key); { ws_server_async_send_info *info = calloc(1, sizeof(ws_server_async_send_info)); if (!info) { /* TODO: handle allocation error */ perror("calloc"); return; } info->buf = strdup(res_header); info->buf_size = header_len; info->socket_fd = session->connection_fd; info->write_done_cb = ws_server_send_http_reply_w_cb; info->session = session; /* here is no memory leak, info is freed by ws_server_async_send */ ws_server_async_send(info); } } static bool ws_server_handle_http_get(const char *path, size_t path_len, struct phr_header *headers, size_t num_headers, ws_session *session) { struct phr_header *hdr_ws_key = find_http_header(headers, num_headers, "Sec-WebSocket-Key"), /* *hdr_connection = 0, */ *hdr_upgrade = 0, *hdr_ws_ext = 0; #ifdef DEBUG print_http_headers(headers, num_headers); #endif if (!hdr_ws_key) { log_msg_info i; i.ws_session = session; i.level = wrdp_log_level_warning; i.buf = (const uint8_t *)"ws_session: error: Sec-WebSocket-Key " "header not found"; log_msg_ex(&i); return false; } if (hdr_ws_key->value_len != 24) { log_msg_info i; i.ws_session = session; i.level = wrdp_log_level_warning; i.buf = (const uint8_t *)"ws_session: error: Sec-WebSocket-Key " "header value length != 24"; log_msg_ex(&i); return false; } /* hdr_connection = find_http_header (headers, num_headers, "Connection"); if (!hdr_connection) return false; if (!check_http_header_value (hdr_connection, "Upgrade")) return false;*/ hdr_upgrade = find_http_header(headers, num_headers, "Upgrade"); if (!hdr_upgrade) { log_msg_info i; i.ws_session = session; i.level = wrdp_log_level_warning; i.buf = (const uint8_t *)"ws_session: error: Upgrade header not found"; log_msg_ex(&i); return false; } if (!check_http_header_value_no_case(hdr_upgrade, "websocket")) { log_msg_info i; i.ws_session = session; i.level = wrdp_log_level_warning; i.buf = (const uint8_t *)"ws_session: error: Upgrade header " "value != websocket"; log_msg_ex(&i); return false; } hdr_ws_ext = find_http_header( headers, num_headers, "Sec-WebSocket-Extensions"); http_get_request_info info = {0}; info.hdr_key = hdr_ws_key; info.hdr_ext = hdr_ws_ext; info.path = path; info.path_len = path_len; ws_server_send_http_reply(info, session); return true; } static bool ws_server_socket_read(void *taskdata) { ws_session *session = taskdata; ssize_t read_size = 0; /* int err = 0; socklen_t slen = sizeof (err); getsockopt (socket, SOL_SOCKET, SO_ERROR, &err, &slen); if(err) return false; */ session->prev_read_size = session->read_size; while ( (read_size = recv(session->connection_fd, session->read_buf + session->read_size, sizeof(session->read_buf) - session->read_size, MSG_DONTWAIT)) == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) { } if (read_size == -1) { perror("recv"); goto error; } if (!read_size) { const char *msg = "remote host closed connection"; log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_debug, 0); goto cleanup; } session->read_size += read_size; return true; error: { const char *msg = "connection error occurs, destroying task"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); } cleanup: return false; } static bool ws_server_handle_http_request(ws_session *session) { char *method, *path; int pret, minor_version; const short header_count = 256; struct phr_header headers[header_count]; size_t method_len, path_len, num_headers = header_count; #ifdef DEBUG { const char *msg_base = "RAW HTTP request:\n"; const size_t buf_size = strlen(msg_base) + session->read_size + 1; log_msg_info i = {0}; uint8_t *msg = malloc(buf_size); strcpy((char *)msg, msg_base); strcat((char *)msg, session->read_buf); i.buf_size = buf_size; i.ws_session = session; i.buf = msg; i.level = wrdp_log_level_trace; log_msg_ex(&i); free(msg); } #endif /* parse the request */ pret = phr_parse_request((const char *)session->read_buf, session->read_size, (const char **)&method, &method_len, (const char **)&path, &path_len, &minor_version, headers, &num_headers, session->prev_read_size); if (pret == 0) { if (session->read_size == sizeof(session->read_buf)) { const char *msg = "HTTP request is too large"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto error; } return true; /* need more data */ } else if (pret == -1) { const char *msg = "Failed to parse HTTP request"; log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto error; } /* Print request */ /* printf("request is %d bytes long\n", pret); printf("method is %.*s\n", (int)method_len, method); printf("path is %.*s\n", (int)path_len, path); printf("HTTP version is 1.%d\n", minor_version); printf("headers:\n"); { int i; for (i = 0; i != num_headers; ++i) { printf("%.*s: %.*s\n", (int)headers[i].name_len, headers[i].name, (int)headers[i].value_len, headers[i].value); } } */ session->prev_read_size = session->read_size = 0; if (!strncmp(method, "GET", method_len)) { if (!ws_server_handle_http_get( path, path_len, headers, num_headers, session)) { return false; } } else { char buf[128]; snprintf(buf, 127, "Unsupported HTTP method %.*s, destroying task", (int)method_len, method); log_msg( (const uint8_t *)buf, strlen(buf), wrdp_log_level_error, 0); goto error; } /* reset buffer state */ return true; error: { const char *msg = "HTTP error occurs, destroying task"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); } return false; } bool ws_server_handle_data(ws_session *session) { task_info *info; info = session->task_info; if (info && info->stopped) { return false; } switch (session->http_state) { case ws_server_state_http_handshake: { ws_server_socket_read(session); if (!ws_server_handle_http_request(session)) { return false; } } break; case ws_server_state_ws_running: { if (!wslay_event_want_read(session->wslay_ctx)) { if (!wslay_event_get_close_received( session->wslay_ctx)) { const char *msg = "Unexpected data"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); ws_server_socket_read(session); } else { return false; } } else if (wslay_event_recv(session->wslay_ctx)) { return false; } } break; default: break; } return true; }