From cc3f33db7a8d3c4ad373e646b199808e01bc5d9b Mon Sep 17 00:00:00 2001 From: sss Date: Tue, 17 Jan 2023 00:38:19 +0300 Subject: added webrdp public code --- src/core/thread_impl.c | 387 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 src/core/thread_impl.c (limited to 'src/core/thread_impl.c') diff --git a/src/core/thread_impl.c b/src/core/thread_impl.c new file mode 100644 index 0000000..2be8ed0 --- /dev/null +++ b/src/core/thread_impl.c @@ -0,0 +1,387 @@ +/* BSD-2-Clause license + * + * Copyright (c) 2018-2023 NST , sss . + * + */ + +#include +#include +#include +#include + +#include + +#include "wrdp_thpool.h" +//required to work with thpool internals +#include "wrdp_thpool_internals.h" + +//required to work with ws_server internals +#include "ws_session.h" +#include "ws_server_internals.h" + +#include + +#include "globals.h" +#include "task.h" +#include "remote_control.h" +#include "ctl_task.h" + +#include "rdp_backend_api.h" + +#include "thread_sync.h" +#include "backend_helpers.h" +#include "curl_helpers.h" + +#include "curl_helpers.h" + +#include "log.h" + +#include +#include "base64_url.h" + +void +ws_session_init_cb(wrdp_thpool_task *task, void *userdata) +{ + ws_session *session = userdata; + task_info *info = session->task_info; + info->wrdp_thpool_task = task; + session->wrdp_thpool_task = task; +} + +void +task_destroy_timers(wrdp_thpool_task *task) +{ + task_info *info; + if (!task) + { + { + log_msg_info i = {0}; + i.buf = (const uint8_t + *)"task_destroy_timers: task is null"; + i.level = wrdp_log_level_trace; + log_msg_ex(&i); + } + return; + } + info = task->userdata; + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"task_destroy_timers"; + i.level = wrdp_log_level_trace; + i.task_info = info; + log_msg_ex(&i); + } + if (!info || !info->ev_timer_watcher) + { + return; + } + if (ev_is_active(info->ev_timer_watcher)) + { + ev_timer_stop(task->thread->ev_th_loop, info->ev_timer_watcher); + } + free(info->ev_timer_watcher); + info->ev_timer_watcher = 0; +} + +static void +task_destroy_client_connection_impl(ws_session *session) +{ + if (session->session_state != ws_session_error) + session->session_state = ws_session_ended; + wrdp_thpool_task *task = session->wrdp_thpool_task; + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"task_destroy_client_connection"; + i.level = wrdp_log_level_trace; + i.ws_session = session; + log_msg_ex(&i); + } + if (task) + { + ev_io_stop(task->thread->ev_th_loop, &(session->ev_con_fd_r)); + if (ev_is_active(&(session->ev_con_fd_w))) + { + ev_io_stop( + task->thread->ev_th_loop, &(session->ev_con_fd_w)); + } + } + if (session->http_state == ws_server_state_ws_running + && session->wslay_ctx) + { + /* send all pending messages + * this may block ? */ + wslay_event_shutdown_read(session->wslay_ctx); + wslay_event_queue_close(session->wslay_ctx, 0, 0, 0); + while (wslay_event_want_write(session->wslay_ctx) + && !wslay_event_get_close_received(session->wslay_ctx)) + { + wslay_event_send(session->wslay_ctx); + } + wslay_event_shutdown_write(session->wslay_ctx); + wslay_event_context_free(session->wslay_ctx); + session->wslay_ctx = 0; + } + close(session->connection_fd); + free(session->sid_base64); +} + +static void +task_destroy_server_connection(wrdp_thpool_task *task) +{ + task_info *info = task->userdata; + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"task_destroy_server_connection"; + i.level = wrdp_log_level_trace; + i.task_info = info; + log_msg_ex(&i); + } + if (!info) + { + return; + } + /* TODO: implement backend self-destruction based on session timeout + * instead */ + if (info->backend && info->backend->callbacks_module->destroy) + { + info->backend->callbacks_module->destroy( + info->backend->backend_internals); + info->backend->callbacks_module->destroy = 0; + } + if (info->backend) + { + backend_destroy(info->backend); + info->backend = 0; + } +} + +void +task_destroy_empty(wrdp_thpool_task *task) +{ + task_info *info = task->userdata; + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"destroy empty task"; + i.level = wrdp_log_level_trace; + i.task_info = info; + log_msg_ex(&i); + } + task_destroy_timers(task); + task_destroy_server_connection(task); + wrdp_thread_pool_destroy_task(task, 0); + free(info); +} + +static void +task_remove_session_from_list_and_destroy(void *session_) +{ + ws_session *session = session_; + task_info *info = session->task_info; + task_destroy_client_connection_impl(session); + if (!info) + { + /* in rare case this may be called when task_info not created + * yet */ + return; + } + SLIST_HEAD(sessions_head, ws_session_list_entry_s) *sessions_list_head_p + = info->backend->sessions_list_head; + if (!SLIST_EMPTY(sessions_list_head_p)) + { + for (struct ws_session_list_entry_s *s + = SLIST_FIRST(sessions_list_head_p); + s; s = SLIST_NEXT(s, entries)) + { + if (s->session == session) + { + SLIST_REMOVE(sessions_list_head_p, s, + ws_session_list_entry_s, entries); + free(s); + break; + } + } + } + curl_list_session_destroy(session); + + if (SLIST_EMPTY(sessions_list_head_p)) + { + task_destroy_empty((wrdp_thpool_task *)info->wrdp_thpool_task); + } + else + { + wrdp_thread_pool_destroy_task(session->wrdp_thpool_task, 0); + } + free(session); +} + +void +task_destroy_client_connection(ws_session *session) +{ + if (session->token_base64) + { + uint8_t *data = 0; + size_t data_size + = curl_prepare_post_request_data(&data, session); + curl_request_info *request = curl_init_request(session, + curl_request_type_post, data, data_size, 0, + task_remove_session_from_list_and_destroy, session); + free(data); + curl_request(request); + } +} + +static void +on_task_destroy(wrdp_thpool_task *task) +{ + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"cleaning task slot"; + i.wrdp_thpool_task = task; + i.level = wrdp_log_level_debug; + log_msg_ex(&i); + } + task_destroy_timers(task); + task_info *info = task->userdata; + if (info) + { + SLIST_HEAD(sessions_head, + ws_session_list_entry_s) *sessions_list_head_p + = info->backend->sessions_list_head; + if (!SLIST_EMPTY(sessions_list_head_p)) + { + for (struct ws_session_list_entry_s *s + = SLIST_FIRST(sessions_list_head_p); + s; s = SLIST_NEXT(s, entries)) + { + if (!s->session) + continue; + if (s->session->session_state + != ws_session_error) + s->session->session_state + = ws_session_ended; + task_destroy_client_connection(s->session); + } + } + while (!SLIST_EMPTY(sessions_list_head_p)) + SLIST_REMOVE_HEAD(sessions_list_head_p, entries); + } +} + +void +destroy_task(task_info *t_info) +{ + t_info->stopped = true; + if (t_info->backend) + { + t_info->backend->stopped = true; + } + /* TODO: + * eliminate possibility of small race condition here + * which lead to inconsistant task count in thread of thread pool + * for one second + */ +} + +void +task_timeouts_check_cb(EV_P_ ev_timer *w, int revents) +{ + + task_info *info = w->data; + if (!info) + { + return; + } + if (info->stopped) + { + w->data = 0; + on_task_destroy(info->wrdp_thpool_task); + return; + } + if (info->settings.session_idle_timeout + && (info->idle_time > info->settings.session_idle_timeout)) + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"Session idle timeout occured"; + i.task_info = info; + i.level = wrdp_log_level_debug; + log_msg_ex(&i); + destroy_task(info); + } + if (info->settings.session_time_limit + && (info->run_time > info->settings.session_time_limit)) + { + log_msg_info i = {0}; + i.buf = (const uint8_t *)"Session timeout occured"; + i.task_info = info; + i.level = wrdp_log_level_debug; + log_msg_ex(&i); + destroy_task(info); + } + info->run_time++; + info->idle_time++; +} + +static void +ws_ev_connection_readable_cb(struct ev_loop *loop, ev_io *w, int revents) +{ + ws_session *session = w->data; + if (!session) + { + return; + } + task_info *info = session->task_info; + if (info && info->stopped) + { + return; + } + if (!ws_server_handle_data(session)) + { + w->data = 0; + if (session->session_state != ws_session_error) + session->session_state = ws_session_ended; + task_destroy_client_connection(session); + } +} + +void +ws_run_session(wrdp_thpool_task *task, void *_ws_session) +{ + ws_session *session = _ws_session; + ev_io *io = &(session->ev_con_fd_r); + ev_io_init( + io, ws_ev_connection_readable_cb, session->connection_fd, EV_READ); + session->ev_con_fd_r.data = session; + ev_io_start(task->thread->ev_th_loop, io); +} + +void +ws_stop_session(wrdp_thpool_task *current_task, void *_ws_session) +{ + ws_session *session = _ws_session; + ev_io *io = &(session->ev_con_fd_r); + if (ev_is_active(io)) + { + ev_io_stop(current_task->thread->ev_th_loop, io); + } +} + +static void +ctl_ev_connection_readable_cb(struct ev_loop *loop, ev_io *w, int revents) +{ + if (!ctl_server_handle_data(w->data)) + { + ctl_destroy_task(w->data); + } +} + +void +ctl_run_task(wrdp_thpool_task *task) +{ + ctl_task_info *info = task->userdata; + ctl_session *session = info->session; + ev_io *io = &(session->ev_con_fd_r); + ev_io_init( + io, ctl_ev_connection_readable_cb, session->connection_fd, EV_READ); + session->ev_con_fd_r.data = task; + ev_io_start(task->thread->ev_th_loop, io); +} -- cgit v1.2.3