This commit is contained in:
2026-02-13 13:06:50 +09:00
commit b54066842b
249 changed files with 69547 additions and 0 deletions

515
grd-throttler.c Normal file
View File

@@ -0,0 +1,515 @@
/*
* Copyright (C) 2025 Red Hat
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
* 02111-1307, USA.
*/
#include "config.h"
#include "grd-throttler.h"
#include "grd-context.h"
#include "grd-settings.h"
#include "grd-utils.h"
#define DEFAULT_MAX_CONNECTIONS_PER_PEER 5
#define DEFAULT_MAX_PENDING_CONNECTIONS 5
#define DEFAULT_MAX_ATTEMPTS_PER_SECOND 10
struct _GrdThrottlerLimits
{
int max_global_connections;
int max_connections_per_peer;
int max_pending_connections;
int max_attempts_per_second;
};
#define PRUNE_TIME_CUTOFF_US (s2us (1))
typedef struct _GrdPeer
{
GrdThrottler *throttler;
char *name;
int active_connections;
GArray *connect_timestamps;
GQueue *delayed_connections;
int64_t last_accept_us;
} GrdPeer;
struct _GrdThrottler
{
GObject parent;
GrdThrottlerLimits *limits;
int active_connections;
GrdThrottlerAllowCallback allow_callback;
gpointer user_data;
GHashTable *peers;
GSource *delayed_connections_source;
};
G_DEFINE_TYPE (GrdThrottler, grd_throttler, G_TYPE_OBJECT)
static GQuark quark_remote_address;
static void
maybe_queue_timeout (GrdThrottler *throttler);
static void
prune_old_timestamps (GArray *timestamps,
int64_t now_us)
{
size_t i;
if (!timestamps)
return;
/* Prune all timestamps older than 1 second, so we can determine how many
* connections that happened the last second by looking at how many timestamps
* we have.
*/
for (i = 0; i < timestamps->len; i++)
{
int64_t ts_us = g_array_index (timestamps, int64_t, i);
if (now_us - ts_us < PRUNE_TIME_CUTOFF_US)
break;
}
g_array_remove_range (timestamps, 0, i);
}
static void
maybe_dispose_peer (GrdThrottler *throttler,
GrdPeer *peer,
GHashTableIter *iter)
{
if (peer->active_connections > 0)
return;
if (peer->connect_timestamps && peer->connect_timestamps->len > 0)
return;
if (!g_queue_is_empty (peer->delayed_connections))
return;
if (iter)
g_hash_table_iter_remove (iter);
else
g_hash_table_remove (throttler->peers, peer->name);
}
static void
grd_throttler_register_connection (GrdThrottler *throttler,
GrdPeer *peer)
{
int64_t now_us;
peer->active_connections++;
throttler->active_connections++;
if (!peer->connect_timestamps)
peer->connect_timestamps = g_array_new (FALSE, FALSE, sizeof (int64_t));
now_us = g_get_monotonic_time ();
prune_old_timestamps (peer->connect_timestamps, now_us);
g_array_append_val (peer->connect_timestamps, now_us);
}
static void
grd_throttler_unregister_connection (GrdThrottler *throttler,
GSocketConnection *connection,
GrdPeer *peer)
{
g_assert (peer->active_connections > 0);
g_assert (throttler->active_connections > 0);
peer->active_connections--;
throttler->active_connections--;
maybe_dispose_peer (throttler, peer, NULL);
maybe_queue_timeout (throttler);
}
static void
grd_throttler_deny_connection (GrdThrottler *throttler,
const char *peer_name,
GSocketConnection *connection)
{
g_debug ("Denying connection from %s", peer_name);
g_io_stream_close (G_IO_STREAM (connection), NULL, NULL);
}
static void
on_connection_closed_changed (GSocketConnection *connection,
GParamSpec *pspec,
GrdThrottler *throttler)
{
const char *peer_name;
GrdPeer *peer;
g_assert (g_io_stream_is_closed (G_IO_STREAM (connection)));
peer_name = g_object_get_qdata (G_OBJECT (connection), quark_remote_address);
peer = g_hash_table_lookup (throttler->peers, peer_name);
grd_throttler_unregister_connection (throttler, connection, peer);
}
static void
grd_throttler_allow_connection (GrdThrottler *throttler,
GSocketConnection *connection,
GrdPeer *peer)
{
g_debug ("Accepting connection from %s", peer->name);
throttler->allow_callback (throttler, connection, throttler->user_data);
peer->last_accept_us = g_get_monotonic_time ();
g_object_set_qdata_full (G_OBJECT (connection), quark_remote_address,
g_strdup (peer->name), g_free);
grd_throttler_register_connection (throttler, peer);
g_signal_connect (connection, "notify::closed",
G_CALLBACK (on_connection_closed_changed), throttler);
}
static gboolean
source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
g_source_set_ready_time (source, -1);
return callback (user_data);
}
static GSourceFuncs source_funcs =
{
.dispatch = source_dispatch,
};
static void
prune_closed_connections (GQueue *queue)
{
GList *l;
l = queue->head;
while (l)
{
GSocketConnection *connection = G_SOCKET_CONNECTION (l->data);
GList *l_next = l->next;
if (g_io_stream_is_closed (G_IO_STREAM (connection)))
{
g_queue_delete_link (queue, l);
g_object_unref (connection);
}
l = l_next;
}
}
static gboolean
is_connection_limit_reached (GrdThrottler *throttler,
GrdPeer *peer)
{
GrdThrottlerLimits *limits = throttler->limits;
if (peer->active_connections >= limits->max_connections_per_peer)
return TRUE;
if (throttler->active_connections >= limits->max_global_connections)
return TRUE;
return FALSE;
}
static gboolean
is_new_connection_allowed (GrdThrottler *throttler,
GrdPeer *peer)
{
GrdThrottlerLimits *limits = throttler->limits;
if (is_connection_limit_reached (throttler, peer))
return FALSE;
if (peer->connect_timestamps &&
peer->connect_timestamps->len >= limits->max_attempts_per_second)
return FALSE;
return TRUE;
}
static gboolean
dispatch_delayed_connections (gpointer user_data)
{
GrdThrottler *throttler = GRD_THROTTLER (user_data);
GHashTableIter iter;
gpointer key, value;
g_hash_table_iter_init (&iter, throttler->peers);
while (g_hash_table_iter_next (&iter, &key, &value))
{
GrdPeer *peer = value;
GQueue *queue = peer->delayed_connections;
GSocketConnection *connection;
prune_closed_connections (queue);
connection = g_queue_peek_head (queue);
if (!connection)
{
maybe_dispose_peer (throttler, peer, &iter);
continue;
}
if (is_new_connection_allowed (throttler, peer))
{
g_queue_pop_head (queue);
grd_throttler_allow_connection (throttler, connection, peer);
g_object_unref (connection);
}
}
maybe_queue_timeout (throttler);
return G_SOURCE_CONTINUE;
}
static void
ensure_delayed_connections_source (GrdThrottler *throttler)
{
if (throttler->delayed_connections_source)
return;
throttler->delayed_connections_source = g_source_new (&source_funcs,
sizeof (GSource));
g_source_set_callback (throttler->delayed_connections_source,
dispatch_delayed_connections, throttler, NULL);
g_source_attach (throttler->delayed_connections_source, NULL);
g_source_unref (throttler->delayed_connections_source);
}
static void
maybe_queue_timeout (GrdThrottler *throttler)
{
GrdThrottlerLimits *limits = throttler->limits;
GHashTableIter iter;
gpointer key, value;
int64_t next_timeout_us = INT64_MAX;
g_hash_table_iter_init (&iter, throttler->peers);
while (g_hash_table_iter_next (&iter, &key, &value))
{
GrdPeer *peer = value;
if (is_connection_limit_reached (throttler, peer))
continue;
if (g_queue_is_empty (peer->delayed_connections))
continue;
next_timeout_us = MIN (next_timeout_us,
peer->last_accept_us +
G_USEC_PER_SEC / limits->max_attempts_per_second);
}
if (next_timeout_us == INT64_MAX)
next_timeout_us = -1;
if (next_timeout_us >= 0)
{
ensure_delayed_connections_source (throttler);
g_source_set_ready_time (throttler->delayed_connections_source,
next_timeout_us);
}
}
static void
maybe_delay_connection (GrdThrottler *throttler,
GSocketConnection *connection,
GrdPeer *peer,
int64_t now_us)
{
GrdThrottlerLimits *limits = throttler->limits;
GQueue *delayed_connections;
delayed_connections = peer->delayed_connections;
if (!delayed_connections)
{
delayed_connections = g_queue_new ();
peer->delayed_connections = delayed_connections;
}
if (g_queue_get_length (delayed_connections) > limits->max_pending_connections)
{
grd_throttler_deny_connection (throttler, peer->name, connection);
return;
}
g_debug ("Delaying connection from %s", peer->name);
g_queue_push_tail (delayed_connections, g_object_ref (connection));
maybe_queue_timeout (throttler);
}
static GrdPeer *
ensure_peer (GrdThrottler *throttler,
const char *peer_name)
{
GrdPeer *peer;
peer = g_hash_table_lookup (throttler->peers, peer_name);
if (peer)
return peer;
peer = g_new0 (GrdPeer, 1);
peer->throttler = throttler;
peer->name = g_strdup (peer_name);
peer->delayed_connections = g_queue_new ();
g_hash_table_insert (throttler->peers,
g_strdup (peer_name), peer);
return peer;
}
void
grd_throttler_handle_connection (GrdThrottler *throttler,
GSocketConnection *connection)
{
g_autoptr (GError) error = NULL;
g_autoptr (GSocketAddress) remote_address = NULL;
GInetAddress *inet_address;
g_autofree char *peer_name = NULL;
GrdPeer *peer;
int64_t now_us;
remote_address = g_socket_connection_get_remote_address (connection, &error);
if (!remote_address)
{
g_warning ("Failed to get remote address: %s", error->message);
grd_throttler_deny_connection (throttler, "unknown peer", connection);
return;
}
inet_address =
g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (remote_address));
peer_name = g_inet_address_to_string (inet_address);
g_debug ("New incoming connection from %s", peer_name);
peer = ensure_peer (throttler, peer_name);
prune_closed_connections (peer->delayed_connections);
now_us = g_get_monotonic_time ();
prune_old_timestamps (peer->connect_timestamps, now_us);
if (is_new_connection_allowed (throttler, peer) &&
g_queue_get_length (peer->delayed_connections) == 0)
{
grd_throttler_allow_connection (throttler, connection, peer);
return;
}
maybe_delay_connection (throttler, connection, peer, now_us);
}
void
grd_throttler_limits_set_max_global_connections (GrdThrottlerLimits *limits,
int limit)
{
limits->max_global_connections = limit;
}
GrdThrottlerLimits *
grd_throttler_limits_new (GrdContext *context)
{
GrdSettings *settings = grd_context_get_settings (context);
GrdThrottlerLimits *limits;
limits = g_new0 (GrdThrottlerLimits, 1);
limits->max_global_connections =
grd_settings_get_max_parallel_connections (settings);
limits->max_connections_per_peer = DEFAULT_MAX_CONNECTIONS_PER_PEER;
limits->max_pending_connections = DEFAULT_MAX_PENDING_CONNECTIONS;
limits->max_attempts_per_second = DEFAULT_MAX_ATTEMPTS_PER_SECOND;
return limits;
}
GrdThrottler *
grd_throttler_new (GrdThrottlerLimits *limits,
GrdThrottlerAllowCallback allow_callback,
gpointer user_data)
{
GrdThrottler *throttler;
g_assert (limits);
throttler = g_object_new (GRD_TYPE_THROTTLER, NULL);
throttler->allow_callback = allow_callback;
throttler->user_data = user_data;
throttler->limits = limits;
return throttler;
}
static void
grd_peer_free (GrdPeer *peer)
{
if (peer->delayed_connections)
g_queue_free_full (peer->delayed_connections, g_object_unref);
g_clear_pointer (&peer->connect_timestamps, g_array_unref);
g_clear_pointer (&peer->name, g_free);
g_free (peer);
}
static void
grd_throttler_finalize (GObject *object)
{
GrdThrottler *throttler = GRD_THROTTLER(object);
g_clear_pointer (&throttler->delayed_connections_source, g_source_destroy);
g_clear_pointer (&throttler->peers, g_hash_table_unref);
g_clear_pointer (&throttler->limits, g_free);
G_OBJECT_CLASS (grd_throttler_parent_class)->finalize (object);
}
static void
grd_throttler_init (GrdThrottler *throttler)
{
throttler->peers =
g_hash_table_new_full (g_str_hash, g_str_equal,
g_free, (GDestroyNotify) grd_peer_free);
}
static void
grd_throttler_class_init (GrdThrottlerClass *klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);
object_class->finalize = grd_throttler_finalize;
quark_remote_address =
g_quark_from_static_string ("grd-remote-address-string");
}