diff --git a/src/libsystemd/sd-bus/bus-internal.h b/src/libsystemd/sd-bus/bus-internal.h index a8d61bf72a..04864d4df0 100644 --- a/src/libsystemd/sd-bus/bus-internal.h +++ b/src/libsystemd/sd-bus/bus-internal.h @@ -219,11 +219,11 @@ struct sd_bus { size_t rbuffer_size; sd_bus_message **rqueue; - unsigned rqueue_size; + size_t rqueue_size; size_t rqueue_allocated; sd_bus_message **wqueue; - unsigned wqueue_size; + size_t wqueue_size; size_t windex; size_t wqueue_allocated; diff --git a/src/libsystemd/sd-bus/bus-message.c b/src/libsystemd/sd-bus/bus-message.c index bb7e09c945..700c57d362 100644 --- a/src/libsystemd/sd-bus/bus-message.c +++ b/src/libsystemd/sd-bus/bus-message.c @@ -118,7 +118,8 @@ static sd_bus_message* message_free(sd_bus_message *m) { message_reset_parts(m); - sd_bus_unref(m->bus); + /* Note that we don't unref m->bus here. That's already done by sd_bus_message_unref() as each user + * reference to the bus message also is considered a reference to the bus connection itself. */ if (m->free_fds) { close_many(m->fds, m->n_fds); @@ -136,8 +137,6 @@ static sd_bus_message* message_free(sd_bus_message *m) { return mfree(m); } -DEFINE_TRIVIAL_CLEANUP_FUNC(sd_bus_message*, message_free); - static void *message_extend_fields(sd_bus_message *m, size_t align, size_t sz, bool add_offset) { void *op, *np; size_t old_size, new_size, start; @@ -459,7 +458,6 @@ int bus_message_from_header( if (!m) return -ENOMEM; - m->n_ref = 1; m->sealed = true; m->header = header; m->header_accessible = header_accessible; @@ -513,7 +511,9 @@ int bus_message_from_header( m->creds.mask |= SD_BUS_CREDS_SELINUX_CONTEXT; } + m->n_ref = 1; m->bus = sd_bus_ref(bus); + *ret = TAKE_PTR(m); return 0; @@ -528,7 +528,7 @@ int bus_message_from_malloc( const char *label, sd_bus_message **ret) { - _cleanup_(message_freep) sd_bus_message *m = NULL; + _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL; size_t sz; int r; @@ -585,13 +585,13 @@ _public_ int sd_bus_message_new( return -ENOMEM; t->n_ref = 1; + t->bus = sd_bus_ref(bus); t->header = (struct bus_header*) ((uint8_t*) t + ALIGN(sizeof(struct sd_bus_message))); t->header->endian = BUS_NATIVE_ENDIAN; t->header->type = type; t->header->version = bus->message_version; t->allow_fds = bus->can_fds || !IN_SET(bus->state, BUS_HELLO, BUS_RUNNING); t->root_container.need_offsets = BUS_MESSAGE_IS_GVARIANT(t); - t->bus = sd_bus_ref(bus); if (bus->allow_interactive_authorization) t->header->flags |= BUS_MESSAGE_ALLOW_INTERACTIVE_AUTHORIZATION; @@ -647,7 +647,7 @@ _public_ int sd_bus_message_new_method_call( const char *interface, const char *member) { - _cleanup_(message_freep) sd_bus_message *t = NULL; + _cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL; int r; assert_return(bus, -ENOTCONN); @@ -692,7 +692,7 @@ static int message_new_reply( uint8_t type, sd_bus_message **m) { - _cleanup_(message_freep) sd_bus_message *t = NULL; + _cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL; uint64_t cookie; int r; @@ -743,7 +743,7 @@ _public_ int sd_bus_message_new_method_error( sd_bus_message **m, const sd_bus_error *e) { - _cleanup_(message_freep) sd_bus_message *t = NULL; + _cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL; int r; assert_return(sd_bus_error_is_set(e), -EINVAL); @@ -846,7 +846,7 @@ int bus_message_new_synthetic_error( const sd_bus_error *e, sd_bus_message **m) { - _cleanup_(message_freep) sd_bus_message *t = NULL; + _cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL; int r; assert(bus); @@ -890,7 +890,80 @@ int bus_message_new_synthetic_error( return 0; } -DEFINE_PUBLIC_TRIVIAL_REF_UNREF_FUNC(sd_bus_message, sd_bus_message, message_free); + +_public_ sd_bus_message* sd_bus_message_ref(sd_bus_message *m) { + if (!m) + return NULL; + + /* We are fine if this message so far was either explicitly reffed or not reffed but queued into at + * least one bus connection object. */ + assert(m->n_ref > 0 || m->n_queued > 0); + + m->n_ref++; + + /* Each user reference to a bus message shall also be considered a ref on the bus */ + sd_bus_ref(m->bus); + return m; +} + +_public_ sd_bus_message* sd_bus_message_unref(sd_bus_message *m) { + if (!m) + return NULL; + + assert(m->n_ref > 0); + + sd_bus_unref(m->bus); /* Each regular ref is also a ref on the bus connection. Let's hence drop it + * here. Note we have to do this before decrementing our own n_ref here, since + * otherwise, if this message is currently queued sd_bus_unref() might call + * bus_message_unref_queued() for this which might then destroy the message + * while we are still processing it. */ + m->n_ref--; + + if (m->n_ref > 0 || m->n_queued > 0) + return NULL; + + /* Unset the bus field if neither the user has a reference nor this message is queued. We are careful + * to reset the field only after the last reference to the bus is dropped, after all we might keep + * multiple references to the bus, once for each reference kept on outselves. */ + m->bus = NULL; + + return message_free(m); +} + +sd_bus_message* bus_message_ref_queued(sd_bus_message *m, sd_bus *bus) { + if (!m) + return NULL; + + /* If this is a different bus than the message is associated with, then implicitly turn this into a + * regular reference. This means that you can create a memory leak by enqueuing a message generated + * on one bus onto another at the same time as enqueueing a message from the second one on the first, + * as we'll not detect the cyclic references there. */ + if (bus != m->bus) + return sd_bus_message_ref(m); + + assert(m->n_ref > 0 || m->n_queued > 0); + m->n_queued++; + + return m; +} + +sd_bus_message* bus_message_unref_queued(sd_bus_message *m, sd_bus *bus) { + if (!m) + return NULL; + + if (bus != m->bus) + return sd_bus_message_unref(m); + + assert(m->n_queued > 0); + m->n_queued--; + + if (m->n_ref > 0 || m->n_queued > 0) + return NULL; + + m->bus = NULL; + + return message_free(m); +} _public_ int sd_bus_message_get_type(sd_bus_message *m, uint8_t *type) { assert_return(m, -EINVAL); diff --git a/src/libsystemd/sd-bus/bus-message.h b/src/libsystemd/sd-bus/bus-message.h index 0115437d26..a7c4f81c4b 100644 --- a/src/libsystemd/sd-bus/bus-message.h +++ b/src/libsystemd/sd-bus/bus-message.h @@ -48,7 +48,16 @@ struct bus_body_part { }; struct sd_bus_message { - unsigned n_ref; + /* Caveat: a message can be referenced in two different ways: the main (user-facing) way will also + * pin the bus connection object the message is associated with. The secondary way ("queued") is used + * when a message is in the read or write queues of the bus connection object, which will not pin the + * bus connection object. This is necessary so that we don't have to have a pair of cyclic references + * between a message that is queued and its connection: as soon as a message is only referenced by + * the connection (by means of being queued) and the connection itself has no other references it + * will be freed. */ + + unsigned n_ref; /* Counter of references that pin the connection */ + unsigned n_queued; /* Counter of references that do not pin the connection */ sd_bus *bus; @@ -211,3 +220,6 @@ int bus_message_remarshal(sd_bus *bus, sd_bus_message **m); void bus_message_set_sender_driver(sd_bus *bus, sd_bus_message *m); void bus_message_set_sender_local(sd_bus *bus, sd_bus_message *m); + +sd_bus_message* bus_message_ref_queued(sd_bus_message *m, sd_bus *bus); +sd_bus_message* bus_message_unref_queued(sd_bus_message *m, sd_bus *bus); diff --git a/src/libsystemd/sd-bus/bus-socket.c b/src/libsystemd/sd-bus/bus-socket.c index 441b4a816f..df9b2631fd 100644 --- a/src/libsystemd/sd-bus/bus-socket.c +++ b/src/libsystemd/sd-bus/bus-socket.c @@ -1110,8 +1110,10 @@ static int bus_socket_make_message(sd_bus *bus, size_t size) { bus->fds = NULL; bus->n_fds = 0; - if (t) - bus->rqueue[bus->rqueue_size++] = t; + if (t) { + bus->rqueue[bus->rqueue_size++] = bus_message_ref_queued(t, bus); + sd_bus_message_unref(t); + } return 1; } diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c index 9eeb3c448c..69f1519976 100644 --- a/src/libsystemd/sd-bus/sd-bus.c +++ b/src/libsystemd/sd-bus/sd-bus.c @@ -146,13 +146,13 @@ static void bus_reset_queues(sd_bus *b) { assert(b); while (b->rqueue_size > 0) - sd_bus_message_unref(b->rqueue[--b->rqueue_size]); + bus_message_unref_queued(b->rqueue[--b->rqueue_size], b); b->rqueue = mfree(b->rqueue); b->rqueue_allocated = 0; while (b->wqueue_size > 0) - sd_bus_message_unref(b->wqueue[--b->wqueue_size]); + bus_message_unref_queued(b->wqueue[--b->wqueue_size], b); b->wqueue = mfree(b->wqueue); b->wqueue_allocated = 0; @@ -248,12 +248,12 @@ _public_ int sd_bus_new(sd_bus **ret) { .close_on_exit = true, }; - assert_se(pthread_mutex_init(&b->memfd_cache_mutex, NULL) == 0); - /* We guarantee that wqueue always has space for at least one entry */ if (!GREEDY_REALLOC(b->wqueue, b->wqueue_allocated, 1)) return -ENOMEM; + assert_se(pthread_mutex_init(&b->memfd_cache_mutex, NULL) == 0); + *ret = TAKE_PTR(b); return 0; } @@ -493,7 +493,7 @@ static int synthesize_connected_signal(sd_bus *bus) { /* Insert at the very front */ memmove(bus->rqueue + 1, bus->rqueue, sizeof(sd_bus_message*) * bus->rqueue_size); - bus->rqueue[0] = TAKE_PTR(m); + bus->rqueue[0] = bus_message_ref_queued(m, bus); bus->rqueue_size++; return 0; @@ -1811,7 +1811,7 @@ static int dispatch_wqueue(sd_bus *bus) { * anyway. */ bus->wqueue_size--; - sd_bus_message_unref(bus->wqueue[0]); + bus_message_unref_queued(bus->wqueue[0], bus); memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size); bus->windex = 0; @@ -1840,6 +1840,15 @@ int bus_rqueue_make_room(sd_bus *bus) { return 0; } +static void rqueue_drop_one(sd_bus *bus, size_t i) { + assert(bus); + assert(i < bus->rqueue_size); + + bus_message_unref_queued(bus->rqueue[i], bus); + memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1)); + bus->rqueue_size--; +} + static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) { int r, ret = 0; @@ -1854,10 +1863,8 @@ static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd for (;;) { if (bus->rqueue_size > 0) { /* Dispatch a queued message */ - - *m = bus->rqueue[0]; - bus->rqueue_size--; - memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size); + *m = sd_bus_message_ref(bus->rqueue[0]); + rqueue_drop_one(bus, 0); return 1; } @@ -1865,8 +1872,10 @@ static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd r = bus_read_message(bus, hint_priority, priority); if (r < 0) return r; - if (r == 0) + if (r == 0) { + *m = NULL; return ret; + } ret = 1; } @@ -1933,7 +1942,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) { * of the wqueue array is always allocated so * that we always can remember how much was * written. */ - bus->wqueue[0] = sd_bus_message_ref(m); + bus->wqueue[0] = bus_message_ref_queued(m, bus); bus->wqueue_size = 1; bus->windex = idx; } @@ -1947,7 +1956,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) { if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1)) return -ENOMEM; - bus->wqueue[bus->wqueue_size++] = sd_bus_message_ref(m); + bus->wqueue[bus->wqueue_size++] = bus_message_ref_queued(m, bus); } finish: @@ -2125,7 +2134,7 @@ _public_ int sd_bus_call( _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m); usec_t timeout; uint64_t cookie; - unsigned i; + size_t i; int r; bus_assert_return(m, -EINVAL, error); @@ -2167,37 +2176,30 @@ _public_ int sd_bus_call( usec_t left; while (i < bus->rqueue_size) { - sd_bus_message *incoming = NULL; + _cleanup_(sd_bus_message_unrefp) sd_bus_message *incoming = NULL; - incoming = bus->rqueue[i]; + incoming = sd_bus_message_ref(bus->rqueue[i]); if (incoming->reply_cookie == cookie) { /* Found a match! */ - memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1)); - bus->rqueue_size--; + rqueue_drop_one(bus, i); log_debug_bus_message(incoming); if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) { if (incoming->n_fds <= 0 || bus->accept_fd) { if (reply) - *reply = incoming; - else - sd_bus_message_unref(incoming); + *reply = TAKE_PTR(incoming); return 1; } - r = sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry."); - sd_bus_message_unref(incoming); - return r; + return sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry."); - } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) { - r = sd_bus_error_copy(error, &incoming->error); - sd_bus_message_unref(incoming); - return r; - } else { + } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) + return sd_bus_error_copy(error, &incoming->error); + else { r = -EIO; goto fail; } @@ -2207,15 +2209,11 @@ _public_ int sd_bus_call( incoming->sender && streq(bus->unique_name, incoming->sender)) { - memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1)); - bus->rqueue_size--; + rqueue_drop_one(bus, i); - /* Our own message? Somebody is trying - * to send its own client a message, - * let's not dead-lock, let's fail - * immediately. */ + /* Our own message? Somebody is trying to send its own client a message, + * let's not dead-lock, let's fail immediately. */ - sd_bus_message_unref(incoming); r = -ELOOP; goto fail; } @@ -2673,7 +2671,6 @@ static int process_builtin(sd_bus *bus, sd_bus_message *m) { SD_BUS_ERROR_UNKNOWN_METHOD, "Unknown method '%s' on interface '%s'.", m->member, m->interface); } - if (r < 0) return r; @@ -2797,7 +2794,6 @@ static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd return r; *ret = TAKE_PTR(m); - return 1; } diff --git a/src/libsystemd/sd-bus/test-bus-address.c b/src/libsystemd/sd-bus/test-bus-address.c index db5ff72ef4..c58c52a778 100644 --- a/src/libsystemd/sd-bus/test-bus-address.c +++ b/src/libsystemd/sd-bus/test-bus-address.c @@ -40,8 +40,8 @@ static void test_bus_set_address_system_remote(char **args) { -EINVAL, NULL); test_one_address(b, "user@host", 0, "unixexec:path=ssh,argv1=-xT,argv2=--,argv3=user%40host,argv4=systemd-stdio-bridge"); - test_one_address(b, "user@host@host", - -EINVAL, NULL); + test_one_address(b, "user@host@host", + -EINVAL, NULL); test_one_address(b, "[::1]", 0, "unixexec:path=ssh,argv1=-xT,argv2=--,argv3=%3a%3a1,argv4=systemd-stdio-bridge"); test_one_address(b, "user@[::1]", diff --git a/src/libsystemd/sd-bus/test-bus-queue-ref-cycle.c b/src/libsystemd/sd-bus/test-bus-queue-ref-cycle.c new file mode 100644 index 0000000000..70901c30b0 --- /dev/null +++ b/src/libsystemd/sd-bus/test-bus-queue-ref-cycle.c @@ -0,0 +1,44 @@ +/* SPDX-License-Identifier: LGPL-2.1+ */ + +#include "main-func.h" +#include "sd-bus.h" +#include "tests.h" + +static int run(int argc, char *argv[]) { + sd_bus_message *m = NULL; + sd_bus *bus = NULL; + int r; + + /* This test will result in a memory leak in <= v240, but not on v241. Hence to be really useful it + * should be run through a leak tracker such as valgrind. */ + + r = sd_bus_open_system(&bus); + if (r < 0) + return log_tests_skipped("Failed to connect to bus"); + + /* Create a message and enqueue it (this shouldn't send it though as the connection setup is not complete yet) */ + assert_se(sd_bus_message_new_method_call(bus, &m, "foo.bar", "/foo", "quux.quux", "waldo") >= 0); + assert_se(sd_bus_send(bus, m, NULL) >= 0); + + /* Let's now unref the message first and the bus second. */ + m = sd_bus_message_unref(m); + bus = sd_bus_unref(bus); + + /* We should have a memory leak now on <= v240. Let's do this again, but destory in the opposite + * order. On v240 that too should be a leak. */ + + r = sd_bus_open_system(&bus); + if (r < 0) + return log_tests_skipped("Failed to connect to bus"); + + assert_se(sd_bus_message_new_method_call(bus, &m, "foo.bar", "/foo", "quux.quux", "waldo") >= 0); + assert_se(sd_bus_send(bus, m, NULL) >= 0); + + /* Let's now unref things in the opposite order */ + bus = sd_bus_unref(bus); + m = sd_bus_message_unref(m); + + return 0; +} + +DEFINE_MAIN_FUNCTION(run); diff --git a/src/test/meson.build b/src/test/meson.build index 86d8a30afa..c53b9653f9 100644 --- a/src/test/meson.build +++ b/src/test/meson.build @@ -867,6 +867,10 @@ tests += [ [], [threads]], + [['src/libsystemd/sd-bus/test-bus-queue-ref-cycle.c'], + [], + [threads]], + [['src/libsystemd/sd-bus/test-bus-watch-bind.c'], [], [threads], '', 'timeout=120'],