summaryrefslogtreecommitdiff
path: root/protocols/Telegram/libevent/bufferevent_ratelim.c
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/libevent/bufferevent_ratelim.c')
-rw-r--r--protocols/Telegram/libevent/bufferevent_ratelim.c1092
1 files changed, 0 insertions, 1092 deletions
diff --git a/protocols/Telegram/libevent/bufferevent_ratelim.c b/protocols/Telegram/libevent/bufferevent_ratelim.c
deleted file mode 100644
index bde192021b..0000000000
--- a/protocols/Telegram/libevent/bufferevent_ratelim.c
+++ /dev/null
@@ -1,1092 +0,0 @@
-/*
- * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
- * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * 3. The name of the author may not be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
- * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
- * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
- * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
- * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
- * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-#include "evconfig-private.h"
-
-#include <sys/types.h>
-#include <limits.h>
-#include <string.h>
-#include <stdlib.h>
-
-#include "event2/event.h"
-#include "event2/event_struct.h"
-#include "event2/util.h"
-#include "event2/bufferevent.h"
-#include "event2/bufferevent_struct.h"
-#include "event2/buffer.h"
-
-#include "ratelim-internal.h"
-
-#include "bufferevent-internal.h"
-#include "mm-internal.h"
-#include "util-internal.h"
-#include "event-internal.h"
-
-int
-ev_token_bucket_init_(struct ev_token_bucket *bucket,
- const struct ev_token_bucket_cfg *cfg,
- ev_uint32_t current_tick,
- int reinitialize)
-{
- if (reinitialize) {
- /* on reinitialization, we only clip downwards, since we've
- already used who-knows-how-much bandwidth this tick. We
- leave "last_updated" as it is; the next update will add the
- appropriate amount of bandwidth to the bucket.
- */
- if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
- bucket->read_limit = cfg->read_maximum;
- if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
- bucket->write_limit = cfg->write_maximum;
- } else {
- bucket->read_limit = cfg->read_rate;
- bucket->write_limit = cfg->write_rate;
- bucket->last_updated = current_tick;
- }
- return 0;
-}
-
-int
-ev_token_bucket_update_(struct ev_token_bucket *bucket,
- const struct ev_token_bucket_cfg *cfg,
- ev_uint32_t current_tick)
-{
- /* It's okay if the tick number overflows, since we'll just
- * wrap around when we do the unsigned substraction. */
- unsigned n_ticks = current_tick - bucket->last_updated;
-
- /* Make sure some ticks actually happened, and that time didn't
- * roll back. */
- if (n_ticks == 0 || n_ticks > INT_MAX)
- return 0;
-
- /* Naively, we would say
- bucket->limit += n_ticks * cfg->rate;
-
- if (bucket->limit > cfg->maximum)
- bucket->limit = cfg->maximum;
-
- But we're worried about overflow, so we do it like this:
- */
-
- if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
- bucket->read_limit = cfg->read_maximum;
- else
- bucket->read_limit += n_ticks * cfg->read_rate;
-
-
- if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
- bucket->write_limit = cfg->write_maximum;
- else
- bucket->write_limit += n_ticks * cfg->write_rate;
-
-
- bucket->last_updated = current_tick;
-
- return 1;
-}
-
-static inline void
-bufferevent_update_buckets(struct bufferevent_private *bev)
-{
- /* Must hold lock on bev. */
- struct timeval now;
- unsigned tick;
- event_base_gettimeofday_cached(bev->bev.ev_base, &now);
- tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
- if (tick != bev->rate_limiting->limit.last_updated)
- ev_token_bucket_update_(&bev->rate_limiting->limit,
- bev->rate_limiting->cfg, tick);
-}
-
-ev_uint32_t
-ev_token_bucket_get_tick_(const struct timeval *tv,
- const struct ev_token_bucket_cfg *cfg)
-{
- /* This computation uses two multiplies and a divide. We could do
- * fewer if we knew that the tick length was an integer number of
- * seconds, or if we knew it divided evenly into a second. We should
- * investigate that more.
- */
-
- /* We cast to an ev_uint64_t first, since we don't want to overflow
- * before we do the final divide. */
- ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
- return (unsigned)(msec / cfg->msec_per_tick);
-}
-
-struct ev_token_bucket_cfg *
-ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
- size_t write_rate, size_t write_burst,
- const struct timeval *tick_len)
-{
- struct ev_token_bucket_cfg *r;
- struct timeval g;
- if (! tick_len) {
- g.tv_sec = 1;
- g.tv_usec = 0;
- tick_len = &g;
- }
- if (read_rate > read_burst || write_rate > write_burst ||
- read_rate < 1 || write_rate < 1)
- return NULL;
- if (read_rate > EV_RATE_LIMIT_MAX ||
- write_rate > EV_RATE_LIMIT_MAX ||
- read_burst > EV_RATE_LIMIT_MAX ||
- write_burst > EV_RATE_LIMIT_MAX)
- return NULL;
- r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
- if (!r)
- return NULL;
- r->read_rate = read_rate;
- r->write_rate = write_rate;
- r->read_maximum = read_burst;
- r->write_maximum = write_burst;
- memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
- r->msec_per_tick = (tick_len->tv_sec * 1000) +
- (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
- return r;
-}
-
-void
-ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
-{
- mm_free(cfg);
-}
-
-/* Default values for max_single_read & max_single_write variables. */
-#define MAX_SINGLE_READ_DEFAULT 16384
-#define MAX_SINGLE_WRITE_DEFAULT 16384
-
-#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
-#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
-
-static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
-static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
-static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
-static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
-
-/** Helper: figure out the maximum amount we should write if is_write, or
- the maximum amount we should read if is_read. Return that maximum, or
- 0 if our bucket is wholly exhausted.
- */
-static inline ev_ssize_t
-bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
-{
- /* needs lock on bev. */
- ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
-
-#define LIM(x) \
- (is_write ? (x).write_limit : (x).read_limit)
-
-#define GROUP_SUSPENDED(g) \
- (is_write ? (g)->write_suspended : (g)->read_suspended)
-
- /* Sets max_so_far to MIN(x, max_so_far) */
-#define CLAMPTO(x) \
- do { \
- if (max_so_far > (x)) \
- max_so_far = (x); \
- } while (0);
-
- if (!bev->rate_limiting)
- return max_so_far;
-
- /* If rate-limiting is enabled at all, update the appropriate
- bucket, and take the smaller of our rate limit and the group
- rate limit.
- */
-
- if (bev->rate_limiting->cfg) {
- bufferevent_update_buckets(bev);
- max_so_far = LIM(bev->rate_limiting->limit);
- }
- if (bev->rate_limiting->group) {
- struct bufferevent_rate_limit_group *g =
- bev->rate_limiting->group;
- ev_ssize_t share;
- LOCK_GROUP(g);
- if (GROUP_SUSPENDED(g)) {
- /* We can get here if we failed to lock this
- * particular bufferevent while suspending the whole
- * group. */
- if (is_write)
- bufferevent_suspend_write_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- else
- bufferevent_suspend_read_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- share = 0;
- } else {
- /* XXXX probably we should divide among the active
- * members, not the total members. */
- share = LIM(g->rate_limit) / g->n_members;
- if (share < g->min_share)
- share = g->min_share;
- }
- UNLOCK_GROUP(g);
- CLAMPTO(share);
- }
-
- if (max_so_far < 0)
- max_so_far = 0;
- return max_so_far;
-}
-
-ev_ssize_t
-bufferevent_get_read_max_(struct bufferevent_private *bev)
-{
- return bufferevent_get_rlim_max_(bev, 0);
-}
-
-ev_ssize_t
-bufferevent_get_write_max_(struct bufferevent_private *bev)
-{
- return bufferevent_get_rlim_max_(bev, 1);
-}
-
-int
-bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
-{
- /* XXXXX Make sure all users of this function check its return value */
- int r = 0;
- /* need to hold lock on bev */
- if (!bev->rate_limiting)
- return 0;
-
- if (bev->rate_limiting->cfg) {
- bev->rate_limiting->limit.read_limit -= bytes;
- if (bev->rate_limiting->limit.read_limit <= 0) {
- bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
- if (event_add(&bev->rate_limiting->refill_bucket_event,
- &bev->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (bev->read_suspended & BEV_SUSPEND_BW) {
- if (!(bev->write_suspended & BEV_SUSPEND_BW))
- event_del(&bev->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
- }
- }
-
- if (bev->rate_limiting->group) {
- LOCK_GROUP(bev->rate_limiting->group);
- bev->rate_limiting->group->rate_limit.read_limit -= bytes;
- bev->rate_limiting->group->total_read += bytes;
- if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
- bev_group_suspend_reading_(bev->rate_limiting->group);
- } else if (bev->rate_limiting->group->read_suspended) {
- bev_group_unsuspend_reading_(bev->rate_limiting->group);
- }
- UNLOCK_GROUP(bev->rate_limiting->group);
- }
-
- return r;
-}
-
-int
-bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
-{
- /* XXXXX Make sure all users of this function check its return value */
- int r = 0;
- /* need to hold lock */
- if (!bev->rate_limiting)
- return 0;
-
- if (bev->rate_limiting->cfg) {
- bev->rate_limiting->limit.write_limit -= bytes;
- if (bev->rate_limiting->limit.write_limit <= 0) {
- bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
- if (event_add(&bev->rate_limiting->refill_bucket_event,
- &bev->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (bev->write_suspended & BEV_SUSPEND_BW) {
- if (!(bev->read_suspended & BEV_SUSPEND_BW))
- event_del(&bev->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
- }
- }
-
- if (bev->rate_limiting->group) {
- LOCK_GROUP(bev->rate_limiting->group);
- bev->rate_limiting->group->rate_limit.write_limit -= bytes;
- bev->rate_limiting->group->total_written += bytes;
- if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
- bev_group_suspend_writing_(bev->rate_limiting->group);
- } else if (bev->rate_limiting->group->write_suspended) {
- bev_group_unsuspend_writing_(bev->rate_limiting->group);
- }
- UNLOCK_GROUP(bev->rate_limiting->group);
- }
-
- return r;
-}
-
-/** Stop reading on every bufferevent in <b>g</b> */
-static int
-bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
-{
- /* Needs group lock */
- struct bufferevent_private *bev;
- g->read_suspended = 1;
- g->pending_unsuspend_read = 0;
-
- /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
- to prevent a deadlock. (Ordinarily, the group lock nests inside
- the bufferevent locks. If we are unable to lock any individual
- bufferevent, it will find out later when it looks at its limit
- and sees that its group is suspended.)
- */
- LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_suspend_read_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- }
- }
- return 0;
-}
-
-/** Stop writing on every bufferevent in <b>g</b> */
-static int
-bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
-{
- /* Needs group lock */
- struct bufferevent_private *bev;
- g->write_suspended = 1;
- g->pending_unsuspend_write = 0;
- LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_suspend_write_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- }
- }
- return 0;
-}
-
-/** Timer callback invoked on a single bufferevent with one or more exhausted
- buckets when they are ready to refill. */
-static void
-bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
-{
- unsigned tick;
- struct timeval now;
- struct bufferevent_private *bev = arg;
- int again = 0;
- BEV_LOCK(&bev->bev);
- if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
- BEV_UNLOCK(&bev->bev);
- return;
- }
-
- /* First, update the bucket */
- event_base_gettimeofday_cached(bev->bev.ev_base, &now);
- tick = ev_token_bucket_get_tick_(&now,
- bev->rate_limiting->cfg);
- ev_token_bucket_update_(&bev->rate_limiting->limit,
- bev->rate_limiting->cfg,
- tick);
-
- /* Now unsuspend any read/write operations as appropriate. */
- if ((bev->read_suspended & BEV_SUSPEND_BW)) {
- if (bev->rate_limiting->limit.read_limit > 0)
- bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
- else
- again = 1;
- }
- if ((bev->write_suspended & BEV_SUSPEND_BW)) {
- if (bev->rate_limiting->limit.write_limit > 0)
- bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
- else
- again = 1;
- }
- if (again) {
- /* One or more of the buckets may need another refill if they
- started negative.
-
- XXXX if we need to be quiet for more ticks, we should
- maybe figure out what timeout we really want.
- */
- /* XXXX Handle event_add failure somehow */
- event_add(&bev->rate_limiting->refill_bucket_event,
- &bev->rate_limiting->cfg->tick_timeout);
- }
- BEV_UNLOCK(&bev->bev);
-}
-
-/** Helper: grab a random element from a bufferevent group.
- *
- * Requires that we hold the lock on the group.
- */
-static struct bufferevent_private *
-bev_group_random_element_(struct bufferevent_rate_limit_group *group)
-{
- int which;
- struct bufferevent_private *bev;
-
- /* requires group lock */
-
- if (!group->n_members)
- return NULL;
-
- EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
-
- which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
-
- bev = LIST_FIRST(&group->members);
- while (which--)
- bev = LIST_NEXT(bev, rate_limiting->next_in_group);
-
- return bev;
-}
-
-/** Iterate over the elements of a rate-limiting group 'g' with a random
- starting point, assigning each to the variable 'bev', and executing the
- block 'block'.
-
- We do this in a half-baked effort to get fairness among group members.
- XXX Round-robin or some kind of priority queue would be even more fair.
- */
-#define FOREACH_RANDOM_ORDER(block) \
- do { \
- first = bev_group_random_element_(g); \
- for (bev = first; bev != LIST_END(&g->members); \
- bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
- block ; \
- } \
- for (bev = LIST_FIRST(&g->members); bev && bev != first; \
- bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
- block ; \
- } \
- } while (0)
-
-static void
-bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
-{
- int again = 0;
- struct bufferevent_private *bev, *first;
-
- g->read_suspended = 0;
- FOREACH_RANDOM_ORDER({
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_unsuspend_read_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- } else {
- again = 1;
- }
- });
- g->pending_unsuspend_read = again;
-}
-
-static void
-bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
-{
- int again = 0;
- struct bufferevent_private *bev, *first;
- g->write_suspended = 0;
-
- FOREACH_RANDOM_ORDER({
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_unsuspend_write_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- } else {
- again = 1;
- }
- });
- g->pending_unsuspend_write = again;
-}
-
-/** Callback invoked every tick to add more elements to the group bucket
- and unsuspend group members as needed.
- */
-static void
-bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
-{
- struct bufferevent_rate_limit_group *g = arg;
- unsigned tick;
- struct timeval now;
-
- event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
-
- LOCK_GROUP(g);
-
- tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
- ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
-
- if (g->pending_unsuspend_read ||
- (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
- bev_group_unsuspend_reading_(g);
- }
- if (g->pending_unsuspend_write ||
- (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
- bev_group_unsuspend_writing_(g);
- }
-
- /* XXXX Rather than waiting to the next tick to unsuspend stuff
- * with pending_unsuspend_write/read, we should do it on the
- * next iteration of the mainloop.
- */
-
- UNLOCK_GROUP(g);
-}
-
-int
-bufferevent_set_rate_limit(struct bufferevent *bev,
- struct ev_token_bucket_cfg *cfg)
-{
- struct bufferevent_private *bevp =
- EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
- int r = -1;
- struct bufferevent_rate_limit *rlim;
- struct timeval now;
- ev_uint32_t tick;
- int reinit = 0, suspended = 0;
- /* XXX reference-count cfg */
-
- BEV_LOCK(bev);
-
- if (cfg == NULL) {
- if (bevp->rate_limiting) {
- rlim = bevp->rate_limiting;
- rlim->cfg = NULL;
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
- if (event_initialized(&rlim->refill_bucket_event))
- event_del(&rlim->refill_bucket_event);
- }
- r = 0;
- goto done;
- }
-
- event_base_gettimeofday_cached(bev->ev_base, &now);
- tick = ev_token_bucket_get_tick_(&now, cfg);
-
- if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
- /* no-op */
- r = 0;
- goto done;
- }
- if (bevp->rate_limiting == NULL) {
- rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
- if (!rlim)
- goto done;
- bevp->rate_limiting = rlim;
- } else {
- rlim = bevp->rate_limiting;
- }
- reinit = rlim->cfg != NULL;
-
- rlim->cfg = cfg;
- ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
-
- if (reinit) {
- EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
- event_del(&rlim->refill_bucket_event);
- }
- event_assign(&rlim->refill_bucket_event, bev->ev_base,
- -1, EV_FINALIZE, bev_refill_callback_, bevp);
-
- if (rlim->limit.read_limit > 0) {
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
- } else {
- bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
- suspended=1;
- }
- if (rlim->limit.write_limit > 0) {
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
- } else {
- bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
- suspended = 1;
- }
-
- if (suspended)
- event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
-
- r = 0;
-
-done:
- BEV_UNLOCK(bev);
- return r;
-}
-
-struct bufferevent_rate_limit_group *
-bufferevent_rate_limit_group_new(struct event_base *base,
- const struct ev_token_bucket_cfg *cfg)
-{
- struct bufferevent_rate_limit_group *g;
- struct timeval now;
- ev_uint32_t tick;
-
- event_base_gettimeofday_cached(base, &now);
- tick = ev_token_bucket_get_tick_(&now, cfg);
-
- g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
- if (!g)
- return NULL;
- memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
- LIST_INIT(&g->members);
-
- ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
-
- event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
- bev_group_refill_callback_, g);
- /*XXXX handle event_add failure */
- event_add(&g->master_refill_event, &cfg->tick_timeout);
-
- EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
-
- bufferevent_rate_limit_group_set_min_share(g, 64);
-
- evutil_weakrand_seed_(&g->weakrand_seed,
- (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
-
- return g;
-}
-
-int
-bufferevent_rate_limit_group_set_cfg(
- struct bufferevent_rate_limit_group *g,
- const struct ev_token_bucket_cfg *cfg)
-{
- int same_tick;
- if (!g || !cfg)
- return -1;
-
- LOCK_GROUP(g);
- same_tick = evutil_timercmp(
- &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
- memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
-
- if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
- g->rate_limit.read_limit = cfg->read_maximum;
- if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
- g->rate_limit.write_limit = cfg->write_maximum;
-
- if (!same_tick) {
- /* This can cause a hiccup in the schedule */
- event_add(&g->master_refill_event, &cfg->tick_timeout);
- }
-
- /* The new limits might force us to adjust min_share differently. */
- bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
-
- UNLOCK_GROUP(g);
- return 0;
-}
-
-int
-bufferevent_rate_limit_group_set_min_share(
- struct bufferevent_rate_limit_group *g,
- size_t share)
-{
- if (share > EV_SSIZE_MAX)
- return -1;
-
- g->configured_min_share = share;
-
- /* Can't set share to less than the one-tick maximum. IOW, at steady
- * state, at least one connection can go per tick. */
- if (share > g->rate_limit_cfg.read_rate)
- share = g->rate_limit_cfg.read_rate;
- if (share > g->rate_limit_cfg.write_rate)
- share = g->rate_limit_cfg.write_rate;
-
- g->min_share = share;
- return 0;
-}
-
-void
-bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
-{
- LOCK_GROUP(g);
- EVUTIL_ASSERT(0 == g->n_members);
- event_del(&g->master_refill_event);
- UNLOCK_GROUP(g);
- EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
- mm_free(g);
-}
-
-int
-bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
- struct bufferevent_rate_limit_group *g)
-{
- int wsuspend, rsuspend;
- struct bufferevent_private *bevp =
- EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
- BEV_LOCK(bev);
-
- if (!bevp->rate_limiting) {
- struct bufferevent_rate_limit *rlim;
- rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
- if (!rlim) {
- BEV_UNLOCK(bev);
- return -1;
- }
- event_assign(&rlim->refill_bucket_event, bev->ev_base,
- -1, EV_FINALIZE, bev_refill_callback_, bevp);
- bevp->rate_limiting = rlim;
- }
-
- if (bevp->rate_limiting->group == g) {
- BEV_UNLOCK(bev);
- return 0;
- }
- if (bevp->rate_limiting->group)
- bufferevent_remove_from_rate_limit_group(bev);
-
- LOCK_GROUP(g);
- bevp->rate_limiting->group = g;
- ++g->n_members;
- LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
-
- rsuspend = g->read_suspended;
- wsuspend = g->write_suspended;
-
- UNLOCK_GROUP(g);
-
- if (rsuspend)
- bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
- if (wsuspend)
- bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
-
- BEV_UNLOCK(bev);
- return 0;
-}
-
-int
-bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
-{
- return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
-}
-
-int
-bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
- int unsuspend)
-{
- struct bufferevent_private *bevp =
- EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
- BEV_LOCK(bev);
- if (bevp->rate_limiting && bevp->rate_limiting->group) {
- struct bufferevent_rate_limit_group *g =
- bevp->rate_limiting->group;
- LOCK_GROUP(g);
- bevp->rate_limiting->group = NULL;
- --g->n_members;
- LIST_REMOVE(bevp, rate_limiting->next_in_group);
- UNLOCK_GROUP(g);
- }
- if (unsuspend) {
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
- }
- BEV_UNLOCK(bev);
- return 0;
-}
-
-/* ===
- * API functions to expose rate limits.
- *
- * Don't use these from inside Libevent; they're meant to be for use by
- * the program.
- * === */
-
-/* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_read_max_() is more likely what you want*/
-ev_ssize_t
-bufferevent_get_read_limit(struct bufferevent *bev)
-{
- ev_ssize_t r;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
- bufferevent_update_buckets(bevp);
- r = bevp->rate_limiting->limit.read_limit;
- } else {
- r = EV_SSIZE_MAX;
- }
- BEV_UNLOCK(bev);
- return r;
-}
-
-/* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_write_max_() is more likely what you want*/
-ev_ssize_t
-bufferevent_get_write_limit(struct bufferevent *bev)
-{
- ev_ssize_t r;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
- bufferevent_update_buckets(bevp);
- r = bevp->rate_limiting->limit.write_limit;
- } else {
- r = EV_SSIZE_MAX;
- }
- BEV_UNLOCK(bev);
- return r;
-}
-
-int
-bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
-{
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (size == 0 || size > EV_SSIZE_MAX)
- bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
- else
- bevp->max_single_read = size;
- BEV_UNLOCK(bev);
- return 0;
-}
-
-int
-bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
-{
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (size == 0 || size > EV_SSIZE_MAX)
- bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
- else
- bevp->max_single_write = size;
- BEV_UNLOCK(bev);
- return 0;
-}
-
-ev_ssize_t
-bufferevent_get_max_single_read(struct bufferevent *bev)
-{
- ev_ssize_t r;
-
- BEV_LOCK(bev);
- r = BEV_UPCAST(bev)->max_single_read;
- BEV_UNLOCK(bev);
- return r;
-}
-
-ev_ssize_t
-bufferevent_get_max_single_write(struct bufferevent *bev)
-{
- ev_ssize_t r;
-
- BEV_LOCK(bev);
- r = BEV_UPCAST(bev)->max_single_write;
- BEV_UNLOCK(bev);
- return r;
-}
-
-ev_ssize_t
-bufferevent_get_max_to_read(struct bufferevent *bev)
-{
- ev_ssize_t r;
- BEV_LOCK(bev);
- r = bufferevent_get_read_max_(BEV_UPCAST(bev));
- BEV_UNLOCK(bev);
- return r;
-}
-
-ev_ssize_t
-bufferevent_get_max_to_write(struct bufferevent *bev)
-{
- ev_ssize_t r;
- BEV_LOCK(bev);
- r = bufferevent_get_write_max_(BEV_UPCAST(bev));
- BEV_UNLOCK(bev);
- return r;
-}
-
-const struct ev_token_bucket_cfg *
-bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
- struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
- struct ev_token_bucket_cfg *cfg;
-
- BEV_LOCK(bev);
-
- if (bufev_private->rate_limiting) {
- cfg = bufev_private->rate_limiting->cfg;
- } else {
- cfg = NULL;
- }
-
- BEV_UNLOCK(bev);
-
- return cfg;
-}
-
-/* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_read_max_() is more likely what you want*/
-ev_ssize_t
-bufferevent_rate_limit_group_get_read_limit(
- struct bufferevent_rate_limit_group *grp)
-{
- ev_ssize_t r;
- LOCK_GROUP(grp);
- r = grp->rate_limit.read_limit;
- UNLOCK_GROUP(grp);
- return r;
-}
-
-/* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_write_max_() is more likely what you want. */
-ev_ssize_t
-bufferevent_rate_limit_group_get_write_limit(
- struct bufferevent_rate_limit_group *grp)
-{
- ev_ssize_t r;
- LOCK_GROUP(grp);
- r = grp->rate_limit.write_limit;
- UNLOCK_GROUP(grp);
- return r;
-}
-
-int
-bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
-{
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
- old_limit = bevp->rate_limiting->limit.read_limit;
-
- new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
- if (old_limit > 0 && new_limit <= 0) {
- bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
- if (event_add(&bevp->rate_limiting->refill_bucket_event,
- &bevp->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (old_limit <= 0 && new_limit > 0) {
- if (!(bevp->write_suspended & BEV_SUSPEND_BW))
- event_del(&bevp->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
- }
-
- BEV_UNLOCK(bev);
- return r;
-}
-
-int
-bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
-{
- /* XXXX this is mostly copy-and-paste from
- * bufferevent_decrement_read_limit */
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
- old_limit = bevp->rate_limiting->limit.write_limit;
-
- new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
- if (old_limit > 0 && new_limit <= 0) {
- bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
- if (event_add(&bevp->rate_limiting->refill_bucket_event,
- &bevp->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (old_limit <= 0 && new_limit > 0) {
- if (!(bevp->read_suspended & BEV_SUSPEND_BW))
- event_del(&bevp->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
- }
-
- BEV_UNLOCK(bev);
- return r;
-}
-
-int
-bufferevent_rate_limit_group_decrement_read(
- struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
-{
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- LOCK_GROUP(grp);
- old_limit = grp->rate_limit.read_limit;
- new_limit = (grp->rate_limit.read_limit -= decr);
-
- if (old_limit > 0 && new_limit <= 0) {
- bev_group_suspend_reading_(grp);
- } else if (old_limit <= 0 && new_limit > 0) {
- bev_group_unsuspend_reading_(grp);
- }
-
- UNLOCK_GROUP(grp);
- return r;
-}
-
-int
-bufferevent_rate_limit_group_decrement_write(
- struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
-{
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- LOCK_GROUP(grp);
- old_limit = grp->rate_limit.write_limit;
- new_limit = (grp->rate_limit.write_limit -= decr);
-
- if (old_limit > 0 && new_limit <= 0) {
- bev_group_suspend_writing_(grp);
- } else if (old_limit <= 0 && new_limit > 0) {
- bev_group_unsuspend_writing_(grp);
- }
-
- UNLOCK_GROUP(grp);
- return r;
-}
-
-void
-bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
- ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
-{
- EVUTIL_ASSERT(grp != NULL);
- if (total_read_out)
- *total_read_out = grp->total_read;
- if (total_written_out)
- *total_written_out = grp->total_written;
-}
-
-void
-bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
-{
- grp->total_read = grp->total_written = 0;
-}
-
-int
-bufferevent_ratelim_init_(struct bufferevent_private *bev)
-{
- bev->rate_limiting = NULL;
- bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
- bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
-
- return 0;
-}