blob: b37e2d384abd35466fc7c7ad57487f21e1481935 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
/* BSD-2-Clause license
*
* Copyright (c) 2018-2023 NST <www.newinfosec.ru>, sss <sss at dark-alexandr dot net>.
*
*/
#pragma once
#include "wrdp_thpool.h"
struct wrdp_thpool_task_s
{
/* Pass any user specified data pointer. */
void *userdata;
//thread owning task
wrdp_thpool_thread *thread;
/* Task callbacks */
/* Task entry point callback */
void (*run_task)(wrdp_thpool_task *task, void *userdata);
/* callback used to stop task before moving to another thread */
void (*stop_task)(wrdp_thpool_task *task, void *userdata);
/* function called just before adding task to thread,
* may be used to do additional task initialization
* userdata is user specified data passed to "wrdp_thread_pool_add_task"
*/
void (*task_init_cb)(wrdp_thpool_task *task, void *userdata);
};
struct wrdp_thpool_thread_s
{
//per thread libev based event loop
struct ev_loop *ev_th_loop;
ev_io ev_pipe_readable;
pthread_t thread;
wrdp_thpool_task **tasks;
uint64_t running_task_count;
uint16_t thread_id;
int pipe_fds[2];
//pool owning thread
wrdp_thpool *pool;
};
struct wrdp_thpool_s
{
wrdp_thpool_thread *threads;
/* worker threads count */
uint16_t thread_count;
/* dynamic variable holding number of threads were task count check is
* already done
*/
uint16_t checked_threads_tasks;
/* maximum tasks per thread */
uint64_t max_tasks;
/* buffer to hold tasks count for each thread */
uint64_t *tasks_per_thread;
/* internal messageing pipe */
int pipe_fds[2];
ev_io ev_pipe_readable;
/* data assigned by user */
void *userdata;
/* additional api callbacks */
/* function called in each thread to do additional initialization
* of user_pool_data
*/
void (*custom_thread_init)(void *user_pool_data, wrdp_thpool_thread *t);
/* function called in each thread to do additional cleanup of
* user_pool_data
*/
void (*custom_thread_deinit)(
void *user_pool_data, wrdp_thpool_thread *t);
/* function called in wrdp_thpool_create to do additional initialization
* of user_pool_data
*/
void (*custom_pool_create)(void *user_pool_data);
/* function called in wrdp_thpool_destroy to do additional cleanup of
* user_pool_data
*/
void (*custom_pool_destroy)(void *user_pool_data);
/* function called on incomming mesdage with "void *user_data" directed
* to pool
*/
void (*pool_message_handler)(void *user_data);
/* function called on incomming message with "void *user_data" directed
* to thread
*/
void (*thread_message_handler)(void *user_data);
};
|