/* BSD-2-Clause license * * Copyright (c) 2018-2023 NST , sss . * */ #include #include #include #include #include #include #include "wrdp_thpool.h" #include "wrdp_thpool_internals.h" #include "globals.h" #include "curl_helpers.h" #include "ws_protocol.h" #include "wrdp_thpool.h" #include "webrdp_module_api.h" #include "ws_session.h" #include "task.h" #include "thread_impl.h" #include "exports.h" #include "webrdp_core_api.h" #include "log.h" /* global curl code */ /* TODO: per thread watcher list */ struct socket_watcher_s; typedef struct socket_watcher_s socket_watcher; struct socket_watcher_s { ev_io *watcher; int fd, what; wrdp_thpool_thread *t; CURL *easy_handle; socket_watcher *prev, *next; }; typedef struct { CURLM *cm; socket_watcher *watcher_list; } curl_thread_data; static socket_watcher * find_socket_watcher(int fd, int what, socket_watcher **head) { socket_watcher *w = *head; while (w) { if (w->fd == fd) { if (what != -1 && w->what == what) { return w; } } w = w->next; } return 0; } static socket_watcher * find_watcher_by_handle(CURL *easy_handle, socket_watcher **head) { socket_watcher *w = *head; while (w) { if (w->easy_handle == easy_handle) { return w; } w = w->next; } return 0; } static void append_socket_watcher(socket_watcher *_w, socket_watcher **head) { _w->next = *head; if (*head) { (*head)->prev = _w; } *head = _w; } typedef struct { CURLM *multi; wrdp_thpool_thread *t; } curl_socket_data; typedef struct { curl_socket_data *csd; int fd; } c_f_i_cb_data; static void remove_socket_watcher_impl(socket_watcher *w, socket_watcher **head) { if (w->prev) { w->prev->next = w->next; } if (w->next) { w->next->prev = w->prev; } if (w == *head) { *head = w->next; } if (w->watcher) { if (ev_is_active(w->watcher)) { ev_io_stop(w->t->ev_th_loop, w->watcher); } if (w->watcher->data) { free(w->watcher->data); } free(w->watcher); } free(w); } static void remove_socket_watcher(int fd, int what, socket_watcher **head) { socket_watcher *w = 0; for (w = find_socket_watcher(fd, what, head); w; w = find_socket_watcher(fd, what, head)) { remove_socket_watcher_impl(w, head); } } static void remove_all_watchers(CURL *easy_handle, socket_watcher **head) { socket_watcher *w = 0; for (w = find_watcher_by_handle(easy_handle, head); w; w = find_watcher_by_handle(easy_handle, head)) { remove_socket_watcher_impl(w, head); } } static void curl_check_finished(CURLM *cm) { CURLMsg *msg = 0; int finished = 0; while ((msg = curl_multi_info_read(cm, &finished))) { { const char *msg = "curl transfer finished"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_trace, 0); } { curl_request_info *cri = 0; if (msg->msg != CURLMSG_DONE) { //TODO: handle errors continue; } curl_easy_getinfo( msg->easy_handle, CURLINFO_PRIVATE, &cri); if (!cri) { continue; } curl_multi_remove_handle(cm, msg->easy_handle); ws_session *t = cri->session; task_info *info = t->task_info; wrdp_thpool_task *task = 0; if (info) task = info->wrdp_thpool_task; if (task) { curl_thread_data *ctd = task->thread->pool->userdata; remove_all_watchers(msg->easy_handle, &(ctd[task->thread->thread_id] .watcher_list)); } if (!cri->data) { continue; } if (info && info->stopped) { goto clean; } if (msg->data.result != CURLE_OK) { char msg_str[64], log_str[64]; log_msg_info i = {0}; snprintf(log_str, 63, "curl transfer failed" " with error:\n\t%s", curl_easy_strerror(msg->data.result)); i.buf = (uint8_t *)log_str; i.task_info = info; i.ws_session = t; i.level = wrdp_log_level_error; log_msg_ex(&i); snprintf(msg_str, 63, "curl_transfer_error_%d", (int)(msg->data.result)); send_error_msg(msg_str, info); t->session_state = ws_session_error; goto clean; } cri->data[cri->data_size - 1] = 0; #ifdef DEBUG /* print server request/reply data */ { size_t buf_size = cri->data_size + 256; char buf[buf_size]; log_msg_info i = {0}; i.task_info = info; i.ws_session = t; snprintf(buf, buf_size - 1, "curl raw data:\nrequest type: %s\ndata:\n " "%.*s", cri->type == curl_request_type_get ? "GET" : "POST", (int)cri->data_size, cri->data); i.buf = (uint8_t *)buf; i.level = wrdp_log_level_trace; log_msg_ex(&i); } #endif if (cri->user_data_handler_cb) { if (!cri->user_data_handler_cb( cri->data, cri->session, cri->userdata)) { cri->session->session_state = ws_session_error; task_destroy_client_connection( cri->session); } } clean: if (!SLIST_EMPTY(&(t->curls_easy_head))) { struct curls_easy_s *curl_node = SLIST_FIRST(&(t->curls_easy_head)); while (curl_node) { CURL *curl = curl_node->curl; struct curls_easy_s *node = curl_node; curl_node = SLIST_NEXT(curl_node, entries); if (curl == msg->easy_handle) { SLIST_REMOVE( &t->curls_easy_head, node, curls_easy_s, entries); free(node); } } } if (cri->free_cb) cri->free_cb(cri->userdata); free(cri->data); cri->data = 0; free(cri); curl_easy_cleanup(msg->easy_handle); { log_msg( (const uint8_t *)"clearing finished curl transfer", 0, wrdp_log_level_trace, 0); } } } } void curl_list_session_destroy(ws_session *session) { CURLM *cm = session->curlm; if (!SLIST_EMPTY(&(session->curls_easy_head))) { struct curls_easy_s *curl_node = SLIST_FIRST(&(session->curls_easy_head)); while (curl_node) { CURL *curl = curl_node->curl; curl_request_info *cri = 0; curl_easy_getinfo(curl, CURLINFO_PRIVATE, &cri); { struct curls_easy_s *node = curl_node; curl_node = SLIST_NEXT(curl_node, entries); SLIST_REMOVE(&(session->curls_easy_head), node, curls_easy_s, entries); free(node); } if (!cri) { curl_multi_remove_handle(cm, curl); curl_easy_cleanup(curl); continue; } curl_multi_remove_handle(cm, curl); curl_easy_cleanup(curl); if (!cri->data) { continue; } free(cri->data); free(cri); } } } static void curl_fd_io_cb(EV_P_ ev_io *w, int revents) { c_f_i_cb_data *d = w->data; int pending = 0; int ev_bitmask = 0; if (revents & EV_READ && revents & EV_WRITE) { ev_bitmask |= CURL_POLL_INOUT; } else if (revents & EV_READ) { ev_bitmask |= CURL_POLL_IN; } else if (revents & EV_WRITE) { ev_bitmask |= CURL_POLL_OUT; } CURLMcode code = curl_multi_socket_action( d->csd->multi, d->fd, ev_bitmask, &pending); if (code != CURLM_OK) { char msg[128]; snprintf(msg, 127, "curl_multi_socket_action failed with error: %d", code); log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); } curl_check_finished(d->csd->multi); } static int curl_socket_cb(CURL *easy, /* easy handle */ curl_socket_t s, /* socket */ int what, /* describes the socket */ void *userp, /* private callback pointer */ void *socketp) /* private socket pointer */ { curl_socket_data *csd = userp; if (what == CURL_POLL_REMOVE) { { curl_thread_data *ctd = csd->t->pool->userdata; remove_socket_watcher( s, -1, &(ctd[csd->t->thread_id].watcher_list)); } goto finish; } socket_watcher *sw = calloc(1, sizeof(socket_watcher)); if (!sw) { perror("calloc"); goto finish; } ev_io *w = calloc(1, sizeof(ev_io)); if (!w) { free(sw); perror("calloc"); goto finish; } c_f_i_cb_data *d = calloc(1, sizeof(c_f_i_cb_data)); if (!d) { free(sw); free(w); perror("calloc"); goto finish; } d->csd = csd; d->fd = s; sw->t = csd->t; sw->watcher = w; sw->fd = s; sw->what = what; sw->easy_handle = easy; w->data = d; { curl_thread_data *ctd = csd->t->pool->userdata; append_socket_watcher( sw, &(ctd[csd->t->thread_id].watcher_list)); } if (what == CURL_POLL_IN) { ev_io_init(w, curl_fd_io_cb, s, EV_READ); } else if (what == CURL_POLL_OUT) { ev_io_init(w, curl_fd_io_cb, s, EV_WRITE); } else if (what == CURL_POLL_INOUT) { ev_io_init(w, curl_fd_io_cb, s, EV_READ | EV_WRITE); } ev_io_start(csd->t->ev_th_loop, w); finish: curl_check_finished(csd->multi); return CURLM_OK; } static void timeouts_check_cb(EV_P_ ev_timer *w, int revents) { /* TODO: test this */ CURLM *multi = w->data; int handles = 0; free(w); curl_check_finished(multi); CURLMcode code = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &handles); if (code != CURLM_OK) { char msg[128]; snprintf(msg, 127, "curl_multi_socket_action failed with error: %d", code); log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); } } static int curl_timer_cb(CURLM *multi, long timeout_ms, void *userp) { if (timeout_ms > 0) { int handles = 0; CURLMcode code = curl_multi_socket_action( multi, CURL_SOCKET_TIMEOUT, 0, &handles); if (code != CURLM_OK) { char msg[128]; snprintf(msg, 127, "curl_multi_socket_action failed with error: %d", code); log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); } return 0; } wrdp_thpool_thread *t = userp; ev_timer *timer = calloc(1, sizeof(ev_timer)); if (!timer) { perror("calloc"); return -1; } ev_timer_init(timer, timeouts_check_cb, timeout_ms / 1000.0, 0.); timer->data = multi; ev_timer_start(t->ev_th_loop, timer); return 0; } void per_thread_curl_init(void *user_pool_data, wrdp_thpool_thread *t) { curl_thread_data *ctd = user_pool_data; ctd[t->thread_id].cm = curl_multi_init(); if (!ctd[t->thread_id].cm) { printf("FATAL: curl_multi_init failed\n"); exit(EXIT_FAILURE); } curl_socket_data *csd = calloc(1, sizeof(curl_socket_data)); if (!csd) { perror("calloc"); exit(EXIT_FAILURE); } csd->multi = ctd[t->thread_id].cm; csd->t = t; if (curl_multi_setopt( ctd[t->thread_id].cm, CURLMOPT_SOCKETFUNCTION, curl_socket_cb) != CURLM_OK) { printf("curl_multi_setopt(curlms[t->thread_id]," "CURLMOPT_SOCKETFUNCTION, curl_socket_cb) failed\n"); exit(EXIT_FAILURE); } if (curl_multi_setopt(ctd[t->thread_id].cm, CURLMOPT_SOCKETDATA, csd) != CURLM_OK) { printf("curl_multi_setopt(curlms[t->thread_id], " "CURLMOPT_SOCKETDATA," " csd) failed failed\n"); exit(EXIT_FAILURE); } if (curl_multi_setopt( ctd[t->thread_id].cm, CURLMOPT_TIMERFUNCTION, curl_timer_cb) != CURLM_OK) { printf("curl_multi_setopt(curlms[t->thread_id], " "CURLMOPT_TIMERFUNCTION," "curl_timer_cb) failed\n"); exit(EXIT_FAILURE); } if (curl_multi_setopt(ctd[t->thread_id].cm, CURLMOPT_TIMERDATA, t) != CURLM_OK) { printf("curl_multi_setopt(curlms[t->thread_id], " "CURLMOPT_TIMERDATA," " t) failed\n"); exit(EXIT_FAILURE); } } void * init_curl_global() { curl_thread_data *ret; CURLcode cc = curl_global_init(CURL_GLOBAL_ALL | CURL_GLOBAL_ACK_EINTR); if (cc != CURLE_OK) { printf("FATAL: curl_global_init failed\n"); exit(EXIT_FAILURE); } ret = calloc(g_globals.settings.thread_count, sizeof(curl_thread_data)); if (!ret) { perror("calloc"); exit(EXIT_FAILURE); } return ret; } /* transfer related code */ static size_t curl_read_cb(char *buffer, size_t size, size_t nitems, void *userdata) { curl_request_info *cri = userdata; if (!cri || !cri->data) { return 0; } ws_session *t = cri->session; task_info *info = t->task_info; if (info && info->stopped) return 0; size_t len = size * nitems; if (len > (cri->data_size - cri->written - 1)) { char msg[128]; snprintf(msg, 127, "curl_read_cb: error: buffer" " of size %ld is too small\n", cri->data_size); log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto error; } if (!len) { const char *msg = "curl_read_cb: error: zero length reply"; log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto error; } memcpy(cri->data + cri->written, buffer, len); cri->data[len] = 0; cri->written += len; curl_check_finished(cri->cm); return len; error: if (cri->free_cb) { cri->free_cb(cri->userdata); } free(cri->data); free(cri); return 0; } char * get_url(const char *token, curl_request_type type) { switch (type) { case curl_request_type_get: { size_t len = strlen(token) + strlen(g_globals.settings.auth_server_url) + strlen("?") + 1; char *url = malloc(len); if (!url) { perror("malloc"); return NULL; } snprintf(url, len, "%s?%s", g_globals.settings.auth_server_url, token); return url; } break; case curl_request_type_post: { char *url = malloc( strlen(g_globals.settings.auth_server_url) + 1); if (!url) { perror("malloc"); return NULL; } strcpy(url, g_globals.settings.auth_server_url); return url; } break; default: return NULL; } } curl_request_info * curl_init_request(ws_session *session, curl_request_type type, uint8_t *data, size_t data_size, bool (*user_data_handler_cb)(uint8_t *, ws_session *, void *), void (*free_cb)(void *), void *userdata) { curl_request_info *request = calloc(1, sizeof(*request)); request->type = type; request->session = session; if (type == curl_request_type_get) { request->data_size = 4096; request->data = malloc(request->data_size); request->data[0] = 0; } if (type == curl_request_type_post) { request->data_size = data_size; request->data = malloc(request->data_size + 1); memcpy(request->data, data, data_size); request->data[data_size] = 0; } request->url = get_url(session->token_base64, type); request->user_data_handler_cb = user_data_handler_cb; request->free_cb = free_cb; request->userdata = userdata; return request; } bool curl_request(curl_request_info *r) { CURL *c = 0; curl_thread_data *ctd = 0; CURLM *cm = 0; ws_session *session = 0; task_info *t_info = 0; wrdp_thpool_task *task = 0; if (!r) return false; session = r->session; if (!session) { return false; } t_info = session->task_info; if (!t_info) { return false; } task = t_info->wrdp_thpool_task; if (!task) { return false; } ctd = task->thread->pool->userdata; cm = ctd[task->thread->thread_id].cm; r->cm = cm; session->curlm = cm; c = curl_easy_init(); if (!r->url || !r->data) { goto curl_easy_setopt_error; } if (!c) { goto curl_easy_setopt_error; } if (curl_easy_setopt(c, CURLOPT_URL, r->url) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, CURLOPT_URL, url) failed"; log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } switch (r->type) { case curl_request_type_get: if (curl_easy_setopt(c, CURLOPT_WRITEDATA, r) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, CURLOPT_WRITEDATA," " crd) failed"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } if (curl_easy_setopt( c, CURLOPT_WRITEFUNCTION, curl_read_cb) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, " "CURLOPT_WRITEFUNCTION," " curl_read_cb) failed"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } break; case curl_request_type_post: if (curl_easy_setopt(c, CURLOPT_POST, 1L) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, CURLOPT_POST," " 1L) failed"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } if (curl_easy_setopt( c, CURLOPT_POSTFIELDSIZE, r->data_size) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, CURLOPT_WRITEDATA," " crd) failed"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } if (curl_easy_setopt(c, CURLOPT_POSTFIELDS, r->data) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, CURLOPT_WRITEDATA," " crd) failed"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } { log_msg_info i = {0}; size_t buf_size = r->data_size + 256; uint8_t buf[buf_size]; i.task_info = t_info; i.ws_session = session; i.level = wrdp_log_level_trace; snprintf((char *)buf, buf_size - 1, "curl postfields data: %.*s", (int)r->data_size, r->data); i.buf = buf; log_msg_ex(&i); } break; default: { const char *msg = "request type is wrong"; log_msg((const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } break; } if (curl_easy_setopt(c, CURLOPT_PRIVATE, r) != CURLE_OK) { const char *msg = "curl_easy_setopt (c, CURLOPT_WRITEDATA," " crd) failed"; log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); goto curl_easy_setopt_error; } #ifdef DEBUG curl_easy_setopt(c, CURLOPT_SSL_VERIFYPEER, 0); curl_easy_setopt(c, CURLOPT_SSL_VERIFYHOST, 0); #endif if (curl_multi_add_handle(cm, c) != CURLM_OK) { const char *msg = "curl_multi_add_handle failed"; log_msg( (const uint8_t *)msg, strlen(msg), wrdp_log_level_error, 0); free(r->url); return false; } { struct curls_easy_s *entry = calloc(1, sizeof(struct curls_easy_s)); if (!entry) { perror("calloc"); goto curl_easy_setopt_error; } entry->session = session; entry->curl = c; SLIST_INSERT_HEAD(&(session->curls_easy_head), entry, entries); } free(r->url); return true; curl_easy_setopt_error: if (r->free_cb) r->free_cb(r->userdata); free(r->url); free(r->data); free(r); return false; } size_t curl_prepare_post_request_data(uint8_t **buf, ws_session *session) { size_t data_size = strlen("{\"sid\": \"%s\", \"type\": " "\"sessionupdate\", \"status\": %d}") - 4 + //format strlen(session->token_base64) + 1; //one digit (session_state) uint8_t *data = malloc(data_size + 1); sprintf((char *)data, "{\"sid\": \"%s\", \"type\": \"sessionupdate\", \"status\": %d}", session->token_base64, session->session_state); *buf = data; return data_size; }