diff --git a/TODO b/TODO index 436363e836..3b3fe2e323 100644 --- a/TODO +++ b/TODO @@ -132,17 +132,18 @@ Features: * run0: maybe enable utmp for run0 sessions, so that they are easily visible. +* maybe beef up sd-event: optionally, allow sd-event to query the timestamp of + next pending datagram inside a SOCK_DGRAM IO fd, and order event source + dispatching by that. Enable this on the native + syslog sockets in journald, + so that we add correct ordering between the two. Use MSG_PEEK + SCM_TIMESTAMP + for this. + * maybe replace nss-machines with logic in networkd that registers records with systemd-resolved, based on DHCP leases, so that we gain compat with VMs. Implementation idea: encode in an ifaltname the intended local name to expose this under and then parse that out and map it to the combined A/AAAA of all handed out leases. -* make journalctl something we can invoke like the askpw/polkit agents, and - then spawn it from "systemctl start" and similar with a precise filter on the - unit. Make sure that it syncs on the journal when done so that we show - "complete" logs. Make this easily reachable via a new "-v" switch or so. - * bsod: add target "bsod.target" or so, which invokes systemd-bsod.target and waits and then reboots. Then use OnFailure=bsod.target from various jobs that should result in system reboots, such as TPM tamper detection cases. @@ -2420,7 +2421,6 @@ Features: journalctl /usr/bin/X11 --invocation=-1 - systemctl: change 'status' to show logs for the last invocation, not a fixed number of lines - - systemctl: expand --wait to show logs for the invocation with a new switch - improve journalctl performance by loading journal files lazily. Encode just enough information in the file name, so that we do not have to open it to know that it is not interesting for us, for @@ -2690,11 +2690,6 @@ Features: ensure deterministic behaviour if two unit files conflict (like DMs do, for example) -* add "systemctl start -v foobar.service" that shows logs of a service - while the start command runs. This is non-trivial to do without - races though, since we should flush out all journal messages before - returning from the "systemctl stop". - * systemctl: if some operation fails, show log output? * Add a new verb "systemctl top" diff --git a/man/journalctl.xml b/man/journalctl.xml index b977c7521a..2a3af870fd 100644 --- a/man/journalctl.xml +++ b/man/journalctl.xml @@ -773,8 +773,13 @@ - Show only the most recent journal entries, and continuously print new entries as - they are appended to the journal. + Show only the most recent journal entries, and continuously print new entries as they + are appended to the journal, until Ctrl-C is hit (or the tool is otherwise terminated). + + journalctl will send an + sd_notify3 + READY=1 message once it initialized and successfully established its watch on the + journal. @@ -792,6 +797,20 @@ --"), any warning messages regarding inaccessible system journals when run as a normal user. + + + + + Takes a boolean argument. If true and operating in mode, a + journal synchronization request (equivalent to journalctl --sync) is issued when + SIGTERM/SIGINT is received, and log output continues until + this request completes. This is useful for synchronizing journal log output to the runtime of + services or external events, ensuring that any log data enqueued to the logging subsystem by + the time SIGTERM/SIGINT is issued is guaranteed to be + processed and displayed by the time log output ends. Defaults to false. + + + diff --git a/man/systemctl.xml b/man/systemctl.xml index b1fb6cec8c..a966e3ea7a 100644 --- a/man/systemctl.xml +++ b/man/systemctl.xml @@ -2348,6 +2348,17 @@ Jan 12 10:46:45 example.com bluetoothd[8900]: gatt-time-server: Input/output err + + + + + + Display unit log output while executing unit operations. + + + + + diff --git a/man/systemd-run.xml b/man/systemd-run.xml index 998e934911..a9ccc167b6 100644 --- a/man/systemd-run.xml +++ b/man/systemd-run.xml @@ -394,6 +394,17 @@ + + + + + + Display unit log output while running. + + + + + diff --git a/shell-completion/bash/journalctl b/shell-completion/bash/journalctl index e7b8d57902..c3e50a05a6 100644 --- a/shell-completion/bash/journalctl +++ b/shell-completion/bash/journalctl @@ -47,7 +47,7 @@ _journalctl() { --show-cursor --dmesg -k --pager-end -e -r --reverse --utc -x --catalog --no-full --force --dump-catalog --flush --rotate --sync --no-hostname -N --fields - --list-namespaces --list-invocations -I' + --list-namespaces --list-invocations -I -v --verbose' [ARG]='-b --boot -D --directory --file -F --field -t --identifier -T --exclude-identifier --facility -M --machine -o --output -u --unit --user-unit -p --priority --root --case-sensitive diff --git a/shell-completion/bash/systemd-run b/shell-completion/bash/systemd-run index 4524744bb5..60017c1c1c 100644 --- a/shell-completion/bash/systemd-run +++ b/shell-completion/bash/systemd-run @@ -42,7 +42,7 @@ _systemd_run() { ) local OPTS="${opts_with_values[*]} --no-ask-password --scope -u --slice-inherit -r --remain-after-exit --send-sighup -d --same-dir -t --pty -P --pipe -S --shell -q --quiet --ignore-failure - --on-clock-change --on-timezone-change --no-block --wait -G --collect --user --system -h --help --version" + --on-clock-change --on-timezone-change --no-block --wait -G --collect --user --system -h --help --version -v --verbose" local mode=--system local i for (( i=1; i <= COMP_CWORD; i++ )); do diff --git a/src/basic/compress.c b/src/basic/compress.c index 5f71eb6853..c8096635ef 100644 --- a/src/basic/compress.c +++ b/src/basic/compress.c @@ -495,11 +495,10 @@ int decompress_blob_zstd( }; size_t k = sym_ZSTD_decompressStream(dctx, &output, &input); - if (sym_ZSTD_isError(k)) { - log_debug("ZSTD decoder failed: %s", sym_ZSTD_getErrorName(k)); - return zstd_ret_to_errno(k); - } - assert(output.pos >= size); + if (sym_ZSTD_isError(k)) + return log_debug_errno(zstd_ret_to_errno(k), "ZSTD decoder failed: %s", sym_ZSTD_getErrorName(k)); + if (output.pos < size) + return log_debug_errno(SYNTHETIC_ERRNO(EBADMSG), "ZSTD decoded less data than indicated, probably corrupted stream."); *dst_size = size; return 0; diff --git a/src/basic/socket-util.c b/src/basic/socket-util.c index 2c003106c9..c5499d6e48 100644 --- a/src/basic/socket-util.c +++ b/src/basic/socket-util.c @@ -1560,7 +1560,6 @@ int socket_autobind(int fd, char **ret_name) { * "autobind" feature, but uses 64-bit random number internally. */ assert(fd >= 0); - assert(ret_name); random = random_u64(); @@ -1577,7 +1576,8 @@ int socket_autobind(int fd, char **ret_name) { if (bind(fd, &sa.sa, r) < 0) return -errno; - *ret_name = TAKE_PTR(name); + if (ret_name) + *ret_name = TAKE_PTR(name); return 0; } diff --git a/src/home/homed-manager.c b/src/home/homed-manager.c index 44468e04ea..ec3bfdab99 100644 --- a/src/home/homed-manager.c +++ b/src/home/homed-manager.c @@ -1154,7 +1154,8 @@ static int manager_listen_notify(Manager *m) { * of a client before it exits. */ on_notify_socket, m, - &m->notify_socket_path); + &m->notify_socket_path, + /* ret_event_source= */ NULL); if (r < 0) return log_error_errno(r, "Failed to prepare notify socket: %m"); diff --git a/src/import/importd.c b/src/import/importd.c index 6390b76d54..45b5434ca0 100644 --- a/src/import/importd.c +++ b/src/import/importd.c @@ -724,7 +724,8 @@ static int manager_new(Manager **ret) { SD_EVENT_PRIORITY_NORMAL, manager_on_notify, m, - &m->notify_socket_path); + &m->notify_socket_path, + /* ret_event_source= */ NULL); if (r < 0) return r; diff --git a/src/journal-remote/journal-remote-main.c b/src/journal-remote/journal-remote-main.c index 3dd72ddcf4..e5ede19e03 100644 --- a/src/journal-remote/journal-remote-main.c +++ b/src/journal-remote/journal-remote-main.c @@ -98,10 +98,11 @@ static MHDDaemonWrapper* MHDDaemonWrapper_free(MHDDaemonWrapper *d) { if (!d) return NULL; + d->io_event = sd_event_source_unref(d->io_event); + d->timer_event = sd_event_source_unref(d->timer_event); + if (d->daemon) MHD_stop_daemon(d->daemon); - sd_event_source_unref(d->io_event); - sd_event_source_unref(d->timer_event); return mfree(d); } diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c index eae8142c6b..2967318663 100644 --- a/src/journal-remote/journal-upload-journal.c +++ b/src/journal-remote/journal-upload-journal.c @@ -28,6 +28,11 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { u->current_cursor = mfree(u->current_cursor); r = sd_journal_get_cursor(u->journal, &u->current_cursor); + if (r == -EBADMSG) { + log_debug("Encountered bad or partially written entry while acquiring cursor, leaving."); + u->entry_state = ENTRY_OUTRO; + continue; + } if (r < 0) return log_error_errno(r, "Failed to get cursor: %m"); @@ -53,6 +58,11 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { usec_t realtime; r = sd_journal_get_realtime_usec(u->journal, &realtime); + if (r == -EBADMSG) { + log_debug("Encountered bad or partially written realtime timestamp, leaving."); + u->entry_state = ENTRY_OUTRO; + continue; + } if (r < 0) return log_error_errno(r, "Failed to get realtime timestamp: %m"); @@ -79,6 +89,11 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { sd_id128_t boot_id; r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); + if (r == -EBADMSG) { + log_debug("Encountered bad or partially written monotonic timestamp, leaving."); + u->entry_state = ENTRY_OUTRO; + continue; + } if (r < 0) return log_error_errno(r, "Failed to get monotonic timestamp: %m"); @@ -103,7 +118,12 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { case ENTRY_BOOT_ID: { sd_id128_t boot_id; - r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); + r = sd_journal_get_monotonic_usec(u->journal, /* ret_monotonic= */ NULL, &boot_id); + if (r == -EBADMSG) { + log_debug("Encountered bad or partially written boot ID, leaving."); + u->entry_state = ENTRY_OUTRO; + continue; + } if (r < 0) return log_error_errno(r, "Failed to get monotonic timestamp: %m"); @@ -131,9 +151,14 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { r = sd_journal_enumerate_data(u->journal, &u->field_data, &u->field_length); + if (r == -EBADMSG) { + log_debug("Encountered bad or partially written data field, leaving."); + u->entry_state = ENTRY_OUTRO; + continue; + } if (r < 0) return log_error_errno(r, "Failed to move to next field in entry: %m"); - else if (r == 0) { + if (r == 0) { u->entry_state = ENTRY_OUTRO; continue; } @@ -171,10 +196,10 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { pos += tocopy + 1; u->entry_state = ENTRY_NEW_FIELD; continue; - } else { - u->field_pos += tocopy; - return size; } + + u->field_pos += tocopy; + return size; } case ENTRY_BINARY_FIELD_START: { @@ -182,9 +207,11 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { size_t len; c = memchr(u->field_data, '=', u->field_length); - if (!c || c == u->field_data) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), - "Invalid field."); + if (!c || c == u->field_data) { + log_debug("Encountered field without '='. Assuming field is still being written, leaving."); + u->entry_state = ENTRY_OUTRO; + continue; + } len = c - (const char*)u->field_data; @@ -198,8 +225,9 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { u->field_pos = len + 1; u->entry_state++; - } + _fallthrough_; + } case ENTRY_BINARY_FIELD_SIZE: { uint64_t le64; @@ -274,10 +302,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void while (j && filled < size * nmemb) { if (u->entry_state == ENTRY_DONE) { r = sd_journal_next(j); - if (r < 0) { - log_error_errno(r, "Failed to move to next entry in journal: %m"); - return CURL_READFUNC_ABORT; - } else if (r == 0) { + if (r == 0) { if (u->input_event) log_debug("No more entries, waiting for journal."); else { @@ -286,10 +311,23 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void } u->uploading = false; - break; } + if (r == -EBADMSG) { + if (u->input_event) + log_debug("Read bad or partially written entry, waiting for journal."); + else { + log_info("Read bad or partially written entry, waiting for journal."); + close_journal_input(u); + } + u->uploading = false; + break; + } + if (r < 0) { + log_error_errno(r, "Failed to move to next entry in journal: %m"); + return CURL_READFUNC_ABORT; + } u->entry_state = ENTRY_CURSOR; } diff --git a/src/journal/journalctl-show.c b/src/journal/journalctl-show.c index 7515db0db9..d4a7c730c5 100644 --- a/src/journal/journalctl-show.c +++ b/src/journal/journalctl-show.c @@ -2,7 +2,9 @@ #include +#include "sd-daemon.h" #include "sd-event.h" +#include "sd-varlink.h" #include "ansi-color.h" #include "fileio.h" @@ -10,7 +12,7 @@ #include "journalctl-filter.h" #include "journalctl-show.h" #include "journalctl-util.h" -#include "log.h" +#include "journalctl-varlink.h" #include "logs-show.h" #include "terminal-util.h" @@ -26,11 +28,15 @@ typedef struct Context { sd_id128_t previous_boot_id; sd_id128_t previous_boot_id_output; dual_timestamp previous_ts_output; + sd_event *event; + sd_varlink *synchronize_varlink; } Context; static void context_done(Context *c) { assert(c); + c->synchronize_varlink = sd_varlink_flush_close_unref(c->synchronize_varlink); + c->event = sd_event_unref(c->event); sd_journal_close(c->journal); } @@ -269,15 +275,14 @@ static int show(Context *c) { return n_shown; } -static int show_and_fflush(Context *c, sd_event_source *s) { +static int show_and_fflush(Context *c) { int r; assert(c); - assert(s); r = show(c); if (r < 0) - return sd_event_exit(sd_event_source_get_event(s), r); + return sd_event_exit(c->event, r); fflush(stdout); return 0; @@ -292,39 +297,136 @@ static int on_journal_event(sd_event_source *s, int fd, uint32_t revents, void * r = sd_journal_process(c->journal); if (r < 0) { log_error_errno(r, "Failed to process journal events: %m"); - return sd_event_exit(sd_event_source_get_event(s), r); + return sd_event_exit(c->event, r); } - return show_and_fflush(c, s); + return show_and_fflush(c); } static int on_first_event(sd_event_source *s, void *userdata) { - return show_and_fflush(userdata, s); + Context *c = ASSERT_PTR(userdata); + int r; + + assert(s); + + r = show_and_fflush(c); + if (r < 0) + return r; + + if (arg_follow && !arg_reverse && !arg_cursor && !arg_after_cursor && !arg_cursor_file && !arg_since_set) { + r = sd_journal_get_cursor(c->journal, /* ret_cursor= */ NULL); + if (r == -EADDRNOTAVAIL) { + /* If we shall operate in --follow mode, and we are unable to get a cursor after + * doing our first round of output, then this means there was no data to show + * whatsoever, and we hence have no stable position on any line at all. This means, + * when we get notified about changes, we shouldn't try to position the cursor at the + * end of the logs anymore, but at the beginning, since anything showing up from now + * that matches our filters is good now. Hence, simply disable the effect of --lines= + * now. */ + + r = sd_journal_seek_head(c->journal); + if (r < 0) + return log_error_errno(r, "Failed to seek to head: %m"); + + c->need_seek = true; + + } else if (r < 0) + return log_error_errno(r, "Failed to get cursor: %m"); + } + + (void) sd_notify(/* unset_environment= */ false, "READY=1"); + return 0; +} + +static int on_synchronize_reply( + sd_varlink *vl, + sd_json_variant *parameters, + const char *error_id, + sd_varlink_reply_flags_t flags, + void *userdata) { + + Context *c = ASSERT_PTR(userdata); + int r; + + assert(vl); + + if (error_id) + log_warning("Failed to synchronize on Journal, ignoring: %s", error_id); + + r = show_and_fflush(c); + if (r < 0) + return r; + + return sd_event_exit(c->event, EXIT_SUCCESS); } static int on_signal(sd_event_source *s, const struct signalfd_siginfo *si, void *userdata) { + _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *vl = NULL; + Context *c = ASSERT_PTR(userdata); + int r; + assert(s); assert(si); assert(IN_SET(si->ssi_signo, SIGTERM, SIGINT)); - return sd_event_exit(sd_event_source_get_event(s), si->ssi_signo); + if (!arg_synchronize_on_exit) + goto finish; + + if (c->synchronize_varlink) /* Already pending? Then exit immediately, so that user can cancel the sync */ + return sd_event_exit(c->event, EXIT_SUCCESS); + + r = varlink_connect_journal(&vl); + if (r < 0) { + log_error_errno(r, "Failed to connect to Journal Varlink IPC interface, ignoring: %m"); + goto finish; + } + + /* Set a low priority on the idle event handler, so that we show any log messages first */ + r = sd_varlink_attach_event(vl, c->event, SD_EVENT_PRIORITY_IDLE); + if (r < 0) { + log_warning_errno(r, "Failed to attach Varlink connectio to event loop: %m"); + goto finish; + } + + r = sd_varlink_bind_reply(vl, on_synchronize_reply); + if (r < 0) { + log_warning_errno(r, "Failed to bind synchronization reply: %m"); + goto finish; + } + + (void) sd_varlink_set_userdata(vl, c); + + r = sd_varlink_invokebo( + vl, + "io.systemd.Journal.Synchronize", + SD_JSON_BUILD_PAIR_BOOLEAN("offline", false)); + if (r < 0) { + log_warning_errno(r, "Failed to issue synchronization request: %m"); + goto finish; + } + + c->synchronize_varlink = TAKE_PTR(vl); + return 0; + +finish: + return sd_event_exit(c->event, si->ssi_signo); } -static int setup_event(Context *c, int fd, sd_event **ret) { - _cleanup_(sd_event_unrefp) sd_event *e = NULL; +static int setup_event(Context *c, int fd) { int r; assert(arg_follow); assert(c); assert(fd >= 0); - assert(ret); + assert(!c->event); + _cleanup_(sd_event_unrefp) sd_event *e = NULL; r = sd_event_default(&e); if (r < 0) return log_error_errno(r, "Failed to allocate sd_event object: %m"); - (void) sd_event_add_signal(e, NULL, SIGTERM | SD_EVENT_SIGNAL_PROCMASK, on_signal, NULL); - (void) sd_event_add_signal(e, NULL, SIGINT | SD_EVENT_SIGNAL_PROCMASK, on_signal, NULL); + (void) sd_event_add_signal(e, /* ret_event_source= */ NULL, SIGTERM | SD_EVENT_SIGNAL_PROCMASK, on_signal, c); + (void) sd_event_add_signal(e, /* ret_event_source= */ NULL, SIGINT | SD_EVENT_SIGNAL_PROCMASK, on_signal, c); r = sd_event_add_io(e, NULL, fd, EPOLLIN, &on_journal_event, c); if (r < 0) @@ -346,7 +448,7 @@ static int setup_event(Context *c, int fd, sd_event **ret) { return log_error_errno(r, "Failed to add defer event source: %m"); } - *ret = TAKE_PTR(e); + c->event = TAKE_PTR(e); return 0; } @@ -434,16 +536,15 @@ int action_show(char **matches) { } if (arg_follow) { - _cleanup_(sd_event_unrefp) sd_event *e = NULL; int sig; assert(poll_fd >= 0); - r = setup_event(&c, poll_fd, &e); + r = setup_event(&c, poll_fd); if (r < 0) return r; - r = sd_event_loop(e); + r = sd_event_loop(c.event); if (r < 0) return r; sig = r; @@ -456,6 +557,8 @@ int action_show(char **matches) { return sig; } + (void) sd_notify(/* unset_environment= */ false, "READY=1"); + r = show(&c); if (r < 0) return r; diff --git a/src/journal/journalctl-varlink.c b/src/journal/journalctl-varlink.c index 1f6671c01d..5c5f5827ba 100644 --- a/src/journal/journalctl-varlink.c +++ b/src/journal/journalctl-varlink.c @@ -11,7 +11,7 @@ #include "log.h" #include "varlink-util.h" -static int varlink_connect_journal(sd_varlink **ret) { +int varlink_connect_journal(sd_varlink **ret) { _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *vl = NULL; const char *address; int r; diff --git a/src/journal/journalctl-varlink.h b/src/journal/journalctl-varlink.h index e10983a048..8b35a30d16 100644 --- a/src/journal/journalctl-varlink.h +++ b/src/journal/journalctl-varlink.h @@ -1,6 +1,10 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ #pragma once +#include "sd-varlink.h" + +int varlink_connect_journal(sd_varlink **ret); + int action_flush_to_var(void); int action_relinquish_var(void); int action_rotate(void); diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c index 9125831a9d..13ebfda579 100644 --- a/src/journal/journalctl.c +++ b/src/journal/journalctl.c @@ -94,7 +94,8 @@ Set *arg_output_fields = NULL; char *arg_pattern = NULL; pcre2_code *arg_compiled_pattern = NULL; PatternCompileCase arg_case = PATTERN_COMPILE_CASE_AUTO; -static ImagePolicy *arg_image_policy = NULL; +ImagePolicy *arg_image_policy = NULL; +bool arg_synchronize_on_exit = false; STATIC_DESTRUCTOR_REGISTER(arg_cursor, freep); STATIC_DESTRUCTOR_REGISTER(arg_cursor_file, freep); @@ -269,6 +270,8 @@ static int help(void) { " --no-tail Show all lines, even in follow mode\n" " --truncate-newline Truncate entries by first newline character\n" " -q --quiet Do not show info messages and privilege warning\n" + " --synchronize-on-exit=BOOL\n" + " Wait for Journal synchronization before exiting\n" "\n%3$sPager Control Options:%4$s\n" " --no-pager Do not pipe output into a pager\n" " -e --pager-end Immediately jump to the end in the pager\n" @@ -357,6 +360,7 @@ static int parse_argv(int argc, char *argv[]) { ARG_OUTPUT_FIELDS, ARG_NAMESPACE, ARG_LIST_NAMESPACES, + ARG_SYNCHRONIZE_ON_EXIT, }; static const struct option options[] = { @@ -430,6 +434,7 @@ static int parse_argv(int argc, char *argv[]) { { "output-fields", required_argument, NULL, ARG_OUTPUT_FIELDS }, { "namespace", required_argument, NULL, ARG_NAMESPACE }, { "list-namespaces", no_argument, NULL, ARG_LIST_NAMESPACES }, + { "synchronize-on-exit", required_argument, NULL, ARG_SYNCHRONIZE_ON_EXIT }, {} }; @@ -973,6 +978,14 @@ static int parse_argv(int argc, char *argv[]) { break; } + + case ARG_SYNCHRONIZE_ON_EXIT: + r = parse_boolean_argument("--synchronize-on-exit", optarg, &arg_synchronize_on_exit); + if (r < 0) + return r; + + break; + case '?': return -EINVAL; diff --git a/src/journal/journalctl.h b/src/journal/journalctl.h index 9db6689759..25b9c9c3eb 100644 --- a/src/journal/journalctl.h +++ b/src/journal/journalctl.h @@ -7,6 +7,7 @@ #include "sd-id128.h" #include "sd-json.h" +#include "image-policy.h" #include "output-mode.h" #include "pager.h" #include "pcre2-util.h" @@ -97,6 +98,8 @@ extern Set *arg_output_fields; extern char *arg_pattern; extern pcre2_code *arg_compiled_pattern; extern PatternCompileCase arg_case; +extern ImagePolicy *arg_image_policy; +extern bool arg_synchronize_on_exit; static inline bool arg_lines_needs_seek_end(void) { return arg_lines >= 0 && !arg_lines_oldest; diff --git a/src/journal/journald-kmsg.c b/src/journal/journald-kmsg.c index d2942315ef..767e590b26 100644 --- a/src/journal/journald-kmsg.c +++ b/src/journal/journald-kmsg.c @@ -160,7 +160,7 @@ void dev_kmsg_record(Server *s, char *p, size_t l) { *s->kernel_seqnum = serial + 1; } - /* monotonic timestamp */ + /* CLOCK_BOOTTIME timestamp */ l -= (e - p) + 1; p = e + 1; e = memchr(p, ',', l); @@ -316,6 +316,9 @@ void dev_kmsg_record(Server *s, char *p, size_t l) { if (saved_log_max_level != INT_MAX) log_set_max_level(saved_log_max_level); + s->dev_kmsg_timestamp = usec; + sync_req_revalidate_by_timestamp(s); + finish: for (j = 0; j < z; j++) free(iovec[j].iov_base); @@ -410,7 +413,7 @@ int server_open_dev_kmsg(Server *s) { if (r < 0) return log_error_errno(r, "Failed to add /dev/kmsg fd to event loop: %m"); - r = sd_event_source_set_priority(es, SD_EVENT_PRIORITY_IMPORTANT+10); + r = sd_event_source_set_priority(es, SD_EVENT_PRIORITY_NORMAL+5); if (r < 0) return log_error_errno(r, "Failed to adjust priority of kmsg event source: %m"); diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c index 0593b97b16..419dc040cb 100644 --- a/src/journal/journald-server.c +++ b/src/journal/journald-server.c @@ -1596,6 +1596,9 @@ int server_process_datagram( log_ratelimit_warning(JOURNAL_LOG_RATELIMIT, "Got file descriptors via syslog socket. Ignoring."); + if (tv) + s->syslog_timestamp = timeval_load(tv); + } else if (fd == s->native_fd) { if (n > 0 && n_fds == 0) server_process_native_message(s, s->buffer, n, ucred, tv, label, label_len); @@ -1605,6 +1608,9 @@ int server_process_datagram( log_ratelimit_warning(JOURNAL_LOG_RATELIMIT, "Got too many file descriptors via native socket. Ignoring."); + if (tv) + s->native_timestamp = timeval_load(tv); + } else { assert(fd == s->audit_fd); @@ -1617,6 +1623,9 @@ int server_process_datagram( close_many(fds, n_fds); + if (tv) + sync_req_revalidate_by_timestamp(s); + server_refresh_idle_timer(s); return 0; } @@ -2753,6 +2762,15 @@ Server* server_free(Server *s) { mmap_cache_unref(s->mmap); + SyncReq *req; + while ((req = prioq_peek(s->sync_req_realtime_prioq))) + sync_req_free(req); + prioq_free(s->sync_req_realtime_prioq); + + while ((req = prioq_peek(s->sync_req_boottime_prioq))) + sync_req_free(req); + prioq_free(s->sync_req_boottime_prioq); + return mfree(s); } diff --git a/src/journal/journald-server.h b/src/journal/journald-server.h index e565769779..fd07ca8d07 100644 --- a/src/journal/journald-server.h +++ b/src/journal/journald-server.h @@ -13,6 +13,7 @@ #include "journal-file.h" #include "journald-context.h" #include "journald-stream.h" +#include "journald-sync.h" #include "list.h" #include "prioq.h" #include "ratelimit.h" @@ -183,6 +184,17 @@ typedef struct Server { ClientContext *pid1_context; /* the context of PID 1 */ sd_varlink_server *varlink_server; + + /* timestamp of most recently processed log messages from each source (CLOCK_REALTIME for the first + * two, CLOCK_BOOTTIME for the other) */ + usec_t native_timestamp, syslog_timestamp, dev_kmsg_timestamp; + + /* Pending synchronization requests, ordered by their timestamp */ + Prioq *sync_req_realtime_prioq; + Prioq *sync_req_boottime_prioq; + + /* Pending synchronization requests with non-zero rqlen counter */ + LIST_HEAD(SyncReq, sync_req_pending_rqlen); } Server; #define SERVER_MACHINE_ID(s) ((s)->machine_id_field + STRLEN("_MACHINE_ID=")) diff --git a/src/journal/journald-stream.c b/src/journal/journald-stream.c index 71b0db639d..4afcc7be03 100644 --- a/src/journal/journald-stream.c +++ b/src/journal/journald-stream.c @@ -49,17 +49,6 @@ * let's enforce a line length matching the maximum unit name length (255) */ #define STDOUT_STREAM_SETUP_PROTOCOL_LINE_MAX (UNIT_NAME_MAX-1U) -typedef enum StdoutStreamState { - STDOUT_STREAM_IDENTIFIER, - STDOUT_STREAM_UNIT_ID, - STDOUT_STREAM_PRIORITY, - STDOUT_STREAM_LEVEL_PREFIX, - STDOUT_STREAM_FORWARD_TO_SYSLOG, - STDOUT_STREAM_FORWARD_TO_KMSG, - STDOUT_STREAM_FORWARD_TO_CONSOLE, - STDOUT_STREAM_RUNNING, -} StdoutStreamState; - /* The different types of log record terminators: a real \n was read, a NUL character was read, the maximum line length * was reached, or the end of the stream was reached */ @@ -73,44 +62,13 @@ typedef enum LineBreak { _LINE_BREAK_INVALID = -EINVAL, } LineBreak; -struct StdoutStream { - Server *server; - StdoutStreamState state; - - int fd; - - struct ucred ucred; - char *label; - char *identifier; - char *unit_id; - int priority; - bool level_prefix:1; - bool forward_to_syslog:1; - bool forward_to_kmsg:1; - bool forward_to_console:1; - - bool fdstore:1; - bool in_notify_queue:1; - - char *buffer; - size_t length; - - sd_event_source *event_source; - - char *state_file; - - ClientContext *context; - - LIST_FIELDS(StdoutStream, stdout_stream); - LIST_FIELDS(StdoutStream, stdout_stream_notify_queue); - - char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX]; -}; - StdoutStream* stdout_stream_free(StdoutStream *s) { if (!s) return NULL; + while (s->stream_sync_reqs) + stream_sync_req_free(s->stream_sync_reqs); + if (s->server) { if (s->context) client_context_release(s->server, s->context); @@ -121,8 +79,6 @@ StdoutStream* stdout_stream_free(StdoutStream *s) { if (s->in_notify_queue) LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s); - - (void) server_start_or_stop_idle_timer(s->server); /* Maybe we are idle now? */ } sd_event_source_disable_unref(s->event_source); @@ -145,7 +101,16 @@ void stdout_stream_terminate(StdoutStream *s) { if (s->state_file) (void) unlink(s->state_file); - stdout_stream_free(s); + StreamSyncReq *ssr; + while ((ssr = s->stream_sync_reqs)) { + SyncReq *req = ssr->req; + stream_sync_req_free(TAKE_PTR(ssr)); + sync_req_revalidate(TAKE_PTR(req)); + } + + Server *server = s->server; + stdout_stream_free(TAKE_PTR(s)); + (void) server_start_or_stop_idle_timer(server); /* Maybe we are idle now? */ } static int stdout_stream_save(StdoutStream *s) { @@ -647,6 +612,10 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, s->length = l - consumed; memmove(s->buffer, p + consumed, s->length); + LIST_FOREACH(by_stdout_stream, ssr, s->stream_sync_reqs) + /* NB: this might invalidate the stdout stream! */ + stream_sync_req_advance_revalidate(ssr, consumed); + return 1; terminate: @@ -747,14 +716,23 @@ static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revent fd = safe_close(fd); server_driver_message(s, u.pid, LOG_MESSAGE("Too many stdout streams, refusing connection.")); + + server_notify_stream(s, /* stream= */ NULL); return 0; } - r = stdout_stream_install(s, fd, NULL); - if (r < 0) + StdoutStream *stream; + r = stdout_stream_install(s, fd, &stream); + if (r < 0) { + server_notify_stream(s, /* stream= */ NULL); return r; + } TAKE_FD(fd); + + /* Tell the synchronization logic that we dropped one item from the incoming connection queue */ + server_notify_stream(s, stream); + return 0; } @@ -840,10 +818,9 @@ static int stdout_stream_restore(Server *s, const char *fname, int fd) { assert(fname); assert(fd >= 0); - if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) { - log_warning("Too many stdout streams, refusing restoring of stream."); - return -ENOBUFS; - } + if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) + return log_warning_errno(SYNTHETIC_ERRNO(ENOBUFS), + "Too many stdout streams, refusing restoring of stream."); r = stdout_stream_install(s, fd, &stream); if (r < 0) diff --git a/src/journal/journald-stream.h b/src/journal/journald-stream.h index 4bb4cbbe2f..ba15b97a37 100644 --- a/src/journal/journald-stream.h +++ b/src/journal/journald-stream.h @@ -1,10 +1,59 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ #pragma once -#include "fdset.h" - typedef struct Server Server; typedef struct StdoutStream StdoutStream; +typedef struct StreamSyncReq StreamSyncReq; + +#include "fdset.h" +#include "list.h" + +typedef enum StdoutStreamState { + STDOUT_STREAM_IDENTIFIER, + STDOUT_STREAM_UNIT_ID, + STDOUT_STREAM_PRIORITY, + STDOUT_STREAM_LEVEL_PREFIX, + STDOUT_STREAM_FORWARD_TO_SYSLOG, + STDOUT_STREAM_FORWARD_TO_KMSG, + STDOUT_STREAM_FORWARD_TO_CONSOLE, + STDOUT_STREAM_RUNNING, +} StdoutStreamState; + +struct StdoutStream { + Server *server; + StdoutStreamState state; + + int fd; + + struct ucred ucred; + char *label; + char *identifier; + char *unit_id; + int priority; + bool level_prefix:1; + bool forward_to_syslog:1; + bool forward_to_kmsg:1; + bool forward_to_console:1; + + bool fdstore:1; + bool in_notify_queue:1; + + char *buffer; + size_t length; + + sd_event_source *event_source; + + char *state_file; + + ClientContext *context; + + LIST_FIELDS(StdoutStream, stdout_stream); + LIST_FIELDS(StdoutStream, stdout_stream_notify_queue); + + char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX]; + + LIST_HEAD(StreamSyncReq, stream_sync_reqs); +}; int server_open_stdout_socket(Server *s, const char *stdout_socket); int server_restore_streams(Server *s, FDSet *fds); diff --git a/src/journal/journald-sync.c b/src/journal/journald-sync.c new file mode 100644 index 0000000000..c3b8de7412 --- /dev/null +++ b/src/journal/journald-sync.c @@ -0,0 +1,372 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include + +#include "sd-varlink.h" + +#include "io-util.h" +#include "journald-server.h" +#include "journald-stream.h" +#include "journald-sync.h" +#include "journald-varlink.h" +#include "socket-netlink.h" +#include "time-util.h" + +StreamSyncReq *stream_sync_req_free(StreamSyncReq *ssr) { + if (!ssr) + return NULL; + + if (ssr->req) + LIST_REMOVE(by_sync_req, ssr->req->stream_sync_reqs, ssr); + if (ssr->stream) + LIST_REMOVE(by_stdout_stream, ssr->stream->stream_sync_reqs, ssr); + + return mfree(ssr); +} + +void stream_sync_req_advance_revalidate(StreamSyncReq *ssr, size_t p) { + assert(ssr); + + /* Subtract the specified number of bytes from the byte counter. And when we hit zero we consider + * this stream processed for the synchronization request */ + + /* NB: This might invalidate the 'ssr' object! */ + + if (p < ssr->pending_siocinq) { + ssr->pending_siocinq -= p; + return; + } + + SyncReq *req = ASSERT_PTR(ssr->req); + stream_sync_req_free(TAKE_PTR(ssr)); + + /* Maybe we are done now? */ + sync_req_revalidate(TAKE_PTR(req)); +} + +static bool sync_req_is_complete(SyncReq *req) { + int r; + + assert(req); + assert(req->server); + + /* In case the clock jumped backwards, let's adjust the timestamp, to guarantee reasonably quick + * termination */ + usec_t n = now(CLOCK_REALTIME); + if (n < req->realtime) + req->realtime = n; + + if (req->realtime_prioq_idx != PRIOQ_IDX_NULL) { + /* If this sync request is still in the priority queue it means we still need to check if + * incoming message timestamps are now newer than then sync request timestamp. */ + + if (req->server->native_fd >= 0 && + req->server->native_timestamp < req->realtime) { + r = fd_wait_for_event(req->server->native_fd, POLLIN, /* timeout= */ 0); + if (r < 0) + log_debug_errno(r, "Failed to determine pending IO events of native socket, ignoring: %m"); + else if (r != 0) /* if there's more queued we need to wait for the timestamp to pass. If it's idle though we are done here. */ + return false; + } + + if (req->server->syslog_fd >= 0&& + req->server->syslog_timestamp < req->realtime) { + r = fd_wait_for_event(req->server->syslog_fd, POLLIN, /* timeout= */ 0); + if (r < 0) + log_debug_errno(r, "Failed to determine pending IO events of syslog socket, ignoring: %m"); + else if (r != 0) + return false; + } + + /* This sync request is fulfilled for the native + syslog datagram streams? Then, let's + * remove this sync request from the priority queue, so that we dont need to consider it + * anymore. */ + assert(prioq_remove(req->server->sync_req_realtime_prioq, req, &req->realtime_prioq_idx) > 0); + } + + if (req->boottime_prioq_idx != PRIOQ_IDX_NULL) { + /* Very similar to the above, but for /dev/kmsg we operate on the CLOCK_BOOTTIME clock */ + + if (req->server->dev_kmsg_fd >= 0 && + req->server->dev_kmsg_timestamp < req->boottime) { + r = fd_wait_for_event(req->server->dev_kmsg_fd, POLLIN, /* timeout= */ 0); + if (r < 0) + log_debug_errno(r, "Failed to determine pending IO events of /dev/kmsg file descriptor, ignoring: %m"); + else if (r != 0) + return false; + } + + assert(prioq_remove(req->server->sync_req_boottime_prioq, req, &req->boottime_prioq_idx) > 0); + } + + /* If there are still streams with pending counters, we still need to look into things */ + if (req->stream_sync_reqs) + return false; + + /* If there are still pending connections from before the sync started, we still need to look into things */ + if (req->pending_rqlen > 0) + return false; + + return true; +} + +static int on_idle(sd_event_source *s, void *userdata) { + SyncReq *req = ASSERT_PTR(userdata); + + req->idle_event_source = sd_event_source_disable_unref(req->idle_event_source); + + /* When this idle event triggers, then we definitely are done with the synchronization request. This + * is a safety net of a kind, to ensure we'll definitely put an end to any synchronization request, + * even if we are confused by CLOCK_REALTIME jumps or similar. */ + sync_req_varlink_reply(TAKE_PTR(req)); + return 0; +} + +SyncReq* sync_req_free(SyncReq *req) { + if (!req) + return NULL; + + if (req->server) { + if (req->realtime_prioq_idx != PRIOQ_IDX_NULL) + assert_se(prioq_remove(req->server->sync_req_realtime_prioq, req, &req->realtime_prioq_idx) > 0); + + if (req->boottime_prioq_idx != PRIOQ_IDX_NULL) + assert_se(prioq_remove(req->server->sync_req_boottime_prioq, req, &req->boottime_prioq_idx) > 0); + + if (req->pending_rqlen > 0) + LIST_REMOVE(pending_rqlen, req->server->sync_req_pending_rqlen, req); + } + + req->idle_event_source = sd_event_source_disable_unref(req->idle_event_source); + + sd_varlink_unref(req->link); + + while (req->stream_sync_reqs) + stream_sync_req_free(req->stream_sync_reqs); + + return mfree(req); +} + +static int sync_req_realtime_compare(const SyncReq *x, const SyncReq *y) { + return CMP(ASSERT_PTR(x)->realtime, ASSERT_PTR(y)->realtime); +} + +static int sync_req_boottime_compare(const SyncReq *x, const SyncReq *y) { + return CMP(ASSERT_PTR(x)->boottime, ASSERT_PTR(y)->boottime); +} + +static int sync_req_add_stream(SyncReq *req, StdoutStream *ss) { + assert(req); + assert(ss); + + int v = 0; + if (ioctl(ss->fd, SIOCINQ, &v) < 0) + log_debug_errno(errno, "Failed to issue SIOCINQ on stream socket, ignoring: %m"); + if (v <= 0) + return 0; /* Pending messages are zero anyway? then there's nothing to track */ + + _cleanup_(stream_sync_req_freep) StreamSyncReq *ssr = new(StreamSyncReq, 1); + if (!ssr) + return -ENOMEM; + + *ssr = (StreamSyncReq) { + .stream = ss, + .pending_siocinq = v, + .req = req, + }; + + LIST_PREPEND(by_sync_req, req->stream_sync_reqs, ssr); + LIST_PREPEND(by_stdout_stream, ss->stream_sync_reqs, ssr); + + TAKE_PTR(ssr); + return 1; +} + +int sync_req_new(Server *s, sd_varlink *link, SyncReq **ret) { + int r; + + assert(s); + assert(link); + assert(ret); + + _cleanup_(sync_req_freep) SyncReq *req = new(SyncReq, 1); + if (!req) + return -ENOMEM; + + *req = (SyncReq) { + .server = s, + .link = sd_varlink_ref(link), + .realtime_prioq_idx = PRIOQ_IDX_NULL, + .boottime_prioq_idx = PRIOQ_IDX_NULL, + }; + + /* We use five distinct mechanisms to determine when the synchronization request is complete: + * + * 1. For the syslog/native AF_UNIX/SOCK_DGRAM sockets we look at the datagram timestamps: once the + * most recently seen datagram on the socket is newer than the timestamp when we initiated the + * sync request we know that all previously enqueued messages have been processed by us. + * + * 2. For established stream AF_UNIX/SOCK_STREAM sockets we have no timestamps. For them we take the + * SIOCINQ counter at the moment the synchronization request was enqueued. And once we processed + * the indicated number of input bytes we know that anything further was enqueued later than the + * original synchronization request we started from. + * + * 3. For pending new, un-accept()ed stream AF_UNIX/SOCK_STREAM sockets we have no timestamps either, + * but we can query the number of pending connections via the sockdiag netlink protocol (I so wish + * there was an easier, quicker way!). Once we accept()ed that many connections we know all + * further connections are definitely more recent than the sync request. + * + * 4. For /dev/kmsg we look at the log message timestamps, similar to the AF_UNIX/SOCK_DGRAM case, + * and they are in CLOCK_BOOTTIME clock. + * + * 5. Finally, as safety net we install an idle handler with a very low priority (lower than the + * syslog/native/stream IO handlers). If this handler is called we know that there's no pending + * IO, hence everything so far queued is definitely processed. + * + * Note the asymmetry: for AF_UNIX/SOCK_DGRAM + /dev/kmsg we go by timestamp, for established + * AF_UNIX/SOCK_STREAM we count bytes. That's because for SOCK_STREAM we have no timestamps, and for + * SOCK_DGRAM we have no API to query all pending bytes (as SIOCINQ on SOCK_DGRAM reports size of + * next datagram, not size of all pending datagrams). Ideally, we'd actually use neither of this, and + * the kernel would provide us CLOCK_MONOTONIC timestamps... + * + * Note that CLOCK_REALTIME is not necessarily monotonic (that's the whole point of it after all). If + * the clock jumps then we know the algorithm will eventually terminate, because of the idle handler + * that is our safety net. (Also, whenever we see poll() return an empty revents for some source we + * know everything is processed by now regardless of any timestamps or pending byte or connection + * counts.) */ + + req->realtime = now(CLOCK_REALTIME); + req->boottime = now(CLOCK_BOOTTIME); + + if (s->native_event_source || s->syslog_event_source) { + r = prioq_ensure_put(&s->sync_req_realtime_prioq, sync_req_realtime_compare, req, &req->realtime_prioq_idx); + if (r < 0) + return r; + } + + if (s->dev_kmsg_event_source) { + r = prioq_ensure_put(&s->sync_req_boottime_prioq, sync_req_boottime_compare, req, &req->boottime_prioq_idx); + if (r < 0) + return r; + } + + r = sd_event_add_defer(s->event, &req->idle_event_source, on_idle, req); + if (r < 0) + return r; + + r = sd_event_source_set_priority(req->idle_event_source, SD_EVENT_PRIORITY_NORMAL+15); + if (r < 0) + return r; + + (void) sd_event_source_set_description(req->idle_event_source, "deferred-sync"); + + /* Now determine the pending byte counter for each stdout stream. If non-zero allocate a + * StreamSyncReq for the stream to keep track of it */ + LIST_FOREACH(stdout_stream, ss, s->stdout_streams) { + r = sync_req_add_stream(req, ss); + if (r < 0) + return r; + } + + /* Also track how many pending, incoming stream sockets there are currently, so that we process them + * too */ + r = af_unix_get_qlen(s->stdout_fd, &req->pending_rqlen); + if (r < 0) + log_warning_errno(r, "Failed to determine current incoming queue length, ignoring: %m"); + if (req->pending_rqlen > 0) + LIST_PREPEND(pending_rqlen, s->sync_req_pending_rqlen, req); + + *ret = TAKE_PTR(req); + return 0; +} + +static void sync_req_advance_rqlen_revalidate(SyncReq *req, uint32_t current_rqlen, StdoutStream *ss) { + int r; + + assert(req); + + /* Invoked whenever a new connection was accept()ed, i.e. dropped off the queue of pending incoming + * connections. We decrease the qlen counter by one here, except if the new overall counter is + * already below our target. */ + + uint32_t n; + if (req->pending_rqlen <= 0) + n = 0; + else if (req->pending_rqlen > current_rqlen) + n = current_rqlen; + else + n = req->pending_rqlen - 1; + + if (req->pending_rqlen > 0) { + /* if this synchronization request is supposed to process a non-zero number of connections we + * need to also track what's inside those stream connections */ + if (ss) { + r = sync_req_add_stream(req, ss); + if (r < 0) + log_warning_errno(r, "Failed to track stream queue size, ignoring: %m"); + } + + /* If there are no more connections to wait for, remove us from the list of synchronization + * requests with non-zero pending connection counters */ + if (n == 0) + LIST_REMOVE(pending_rqlen, req->server->sync_req_pending_rqlen, req); + } + + req->pending_rqlen = n; + + sync_req_revalidate(req); +} + +void server_notify_stream(Server *s, StdoutStream *ss) { + int r; + + assert(s); + + /* Invoked whenever a new connection was accept()ed, i.e. dropped off the queue of pending incoming + * connections. */ + + if (!s->sync_req_pending_rqlen) + return; + + uint32_t current_qlen; + + r = af_unix_get_qlen(s->stdout_fd, ¤t_qlen); + if (r < 0) { + log_warning_errno(r, "Failed to determine current AF_UNIX stream socket pending connections, ignoring: %m"); + current_qlen = UINT32_MAX; + } + + LIST_FOREACH(pending_rqlen, sr, s->sync_req_pending_rqlen) + /* NB: this might invalidate the SyncReq object! */ + sync_req_advance_rqlen_revalidate(sr, current_qlen, ss); +} + +bool sync_req_revalidate(SyncReq *req) { + assert(req); + + /* Check if the synchronization request is complete now. If so, answer the Varlink client. NB: this + * might invalidate the SyncReq object */ + + if (!sync_req_is_complete(req)) + return false; + + sync_req_varlink_reply(TAKE_PTR(req)); + return true; +} + +void sync_req_revalidate_by_timestamp(Server *s) { + assert(s); + + /* Go through the pending sync requests by timestamp, and complete those for which a sync is now + * complete. */ + + SyncReq *req; + while ((req = prioq_peek(s->sync_req_realtime_prioq))) + if (!sync_req_revalidate(req)) + break; + + while ((req = prioq_peek(s->sync_req_boottime_prioq))) + if (!sync_req_revalidate(req)) + break; +} diff --git a/src/journal/journald-sync.h b/src/journal/journald-sync.h new file mode 100644 index 0000000000..3f6d3769db --- /dev/null +++ b/src/journal/journald-sync.h @@ -0,0 +1,56 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +typedef struct Server Server; +typedef struct StreamSyncReq StreamSyncReq; +typedef struct SyncReq SyncReq; + +#include "journald-stream.h" +#include "list.h" +#include "macro.h" + +/* Encapsulates the synchronization request data we need to keep per STDOUT stream. Primarily a byte counter + * to count down. */ +struct StreamSyncReq { + SyncReq *req; + StdoutStream *stream; + + uint64_t pending_siocinq; /* The SIOCINQ counter when the sync was initiated */ + + LIST_FIELDS(StreamSyncReq, by_sync_req); + LIST_FIELDS(StreamSyncReq, by_stdout_stream); +}; + +/* Encapsulates a synchronization request */ +struct SyncReq { + Server *server; + sd_varlink *link; + + bool offline; /* if true, we'll offline the journal files after sync is complete */ + + usec_t realtime; /* CLOCK_REALTIME timestamp when synchronization request was initiated (for syncing on AF_UNIX/SOCK_DGRAM) */ + usec_t boottime; /* CLOCK_BOOTTIME timestamp when synchronization request was initiated (for syncing on /dev/kmsg) */ + + sd_event_source *idle_event_source; + + uint32_t pending_rqlen; /* The rqlen counter on the stream AF_UNIX socket when the sync was initiated */ + LIST_FIELDS(SyncReq, pending_rqlen); + + LIST_HEAD(StreamSyncReq, stream_sync_reqs); + + unsigned realtime_prioq_idx; + unsigned boottime_prioq_idx; +}; + +StreamSyncReq *stream_sync_req_free(StreamSyncReq *ssr); +DEFINE_TRIVIAL_CLEANUP_FUNC(StreamSyncReq*, stream_sync_req_free); +void stream_sync_req_advance_revalidate(StreamSyncReq *ssr, size_t p); + +int sync_req_new(Server *s, sd_varlink *link, SyncReq **ret); +SyncReq* sync_req_free(SyncReq *req); +DEFINE_TRIVIAL_CLEANUP_FUNC(SyncReq*, sync_req_free); + +bool sync_req_revalidate(SyncReq *req); +void sync_req_revalidate_by_timestamp(Server *s); + +void server_notify_stream(Server *s, StdoutStream *ss); diff --git a/src/journal/journald-varlink.c b/src/journal/journald-varlink.c index d2ba34c747..1f9ad3ad3e 100644 --- a/src/journal/journald-varlink.c +++ b/src/journal/journald-varlink.c @@ -1,76 +1,79 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ #include "journald-server.h" +#include "journald-sync.h" #include "journald-varlink.h" #include "varlink-io.systemd.Journal.h" #include "varlink-io.systemd.service.h" #include "varlink-util.h" -static int synchronize_second_half(sd_event_source *event_source, void *userdata) { - sd_varlink *link = ASSERT_PTR(userdata); - Server *s; +void sync_req_varlink_reply(SyncReq *req) { int r; - assert_se(s = sd_varlink_get_userdata(link)); + assert(req); - /* This is the "second half" of the Synchronize() varlink method. This function is called as deferred - * event source at a low priority to ensure the synchronization completes after all queued log - * messages are processed. */ - server_full_sync(s, /* wait = */ true); + /* This is the "second half" of the Synchronize() varlink method. This function is called when we + * determine that no messages that were enqueued to us when the request was initiated is pending + * anymore. */ - /* Let's get rid of the event source now, by marking it as non-floating again. It then has no ref - * anymore and is immediately destroyed after we return from this function, i.e. from this event - * source handler at the end. */ - r = sd_event_source_set_floating(event_source, false); + if (req->offline) + server_full_sync(req->server, /* wait = */ true); + + /* Disconnect the SyncReq from the Varlink connection object, and free it */ + _cleanup_(sd_varlink_unrefp) sd_varlink *vl = TAKE_PTR(req->link); + sd_varlink_set_userdata(vl, req->server); /* reinstall server object */ + req = sync_req_free(req); + + r = sd_varlink_reply(vl, NULL); if (r < 0) - return log_error_errno(r, "Failed to mark event source as non-floating: %m"); - - return sd_varlink_reply(link, NULL); -} - -static void synchronize_destroy(void *userdata) { - sd_varlink_unref(userdata); + log_debug_errno(r, "Failed to reply to Synchronize() client, ignoring: %m"); } static int vl_method_synchronize(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { - _cleanup_(sd_event_source_unrefp) sd_event_source *event_source = NULL; + int offline = -1; + + static const sd_json_dispatch_field dispatch_table[] = { + { "offline", SD_JSON_VARIANT_BOOLEAN, sd_json_dispatch_tristate, 0, 0}, + {} + }; + Server *s = ASSERT_PTR(userdata); int r; assert(link); - r = sd_varlink_dispatch(link, parameters, /* dispatch_table = */ NULL, /* userdata = */ NULL); + r = sd_varlink_dispatch(link, parameters, dispatch_table, &offline); if (r != 0) return r; - log_info("Received client request to sync journal."); + if (offline > 0) { + /* Do not allow unprivileged clients to offline the journal files, since that's potentially slow */ + r = varlink_check_privileged_peer(link); + if (r < 0) + return r; + } else if (offline < 0) { + uid_t uid = 0; - /* We don't do the main work now, but instead enqueue a deferred event loop job which will do - * it. That job is scheduled at low priority, so that we return from this method call only after all - * queued but not processed log messages are written to disk, so that this method call returning can - * be used as nice synchronization point. */ - r = sd_event_add_defer(s->event, &event_source, synchronize_second_half, link); + r = sd_varlink_get_peer_uid(link, &uid); + if (r < 0) + return r; + + offline = uid == 0; /* for compat, if not specified default to offlining, except for non-root */ + } + + log_full(offline ? LOG_INFO : LOG_DEBUG, + "Received client request to sync journal (%s offlining).", offline ? "with" : "without"); + + _cleanup_(sync_req_freep) SyncReq *sr = NULL; + + r = sync_req_new(s, link, &sr); if (r < 0) - return log_error_errno(r, "Failed to allocate defer event source: %m"); + return r; - r = sd_event_source_set_destroy_callback(event_source, synchronize_destroy); - if (r < 0) - return log_error_errno(r, "Failed to set event source destroy callback: %m"); - - sd_varlink_ref(link); /* The varlink object is now left to the destroy callback to unref */ - - r = sd_event_source_set_priority(event_source, SD_EVENT_PRIORITY_NORMAL+15); - if (r < 0) - return log_error_errno(r, "Failed to set defer event source priority: %m"); - - /* Give up ownership of this event source. It will now be destroyed along with event loop itself, - * unless it destroys itself earlier. */ - r = sd_event_source_set_floating(event_source, true); - if (r < 0) - return log_error_errno(r, "Failed to mark event source as floating: %m"); - - (void) sd_event_source_set_description(event_source, "deferred-sync"); + sr->offline = offline; + sd_varlink_set_userdata(link, sr); + sync_req_revalidate(TAKE_PTR(sr)); return 0; } @@ -84,6 +87,10 @@ static int vl_method_rotate(sd_varlink *link, sd_json_variant *parameters, sd_va if (r != 0) return r; + r = varlink_check_privileged_peer(link); + if (r < 0) + return r; + log_info("Received client request to rotate journal, rotating."); server_full_rotate(s); @@ -100,6 +107,10 @@ static int vl_method_flush_to_var(sd_varlink *link, sd_json_variant *parameters, if (r != 0) return r; + r = varlink_check_privileged_peer(link); + if (r < 0) + return r; + if (s->namespace) return sd_varlink_error(link, "io.systemd.Journal.NotSupportedByNamespaces", NULL); @@ -119,6 +130,10 @@ static int vl_method_relinquish_var(sd_varlink *link, sd_json_variant *parameter if (r != 0) return r; + r = varlink_check_privileged_peer(link); + if (r < 0) + return r; + if (s->namespace) return sd_varlink_error(link, "io.systemd.Journal.NotSupportedByNamespaces", NULL); @@ -145,6 +160,15 @@ static void vl_disconnect(sd_varlink_server *server, sd_varlink *link, void *use assert(server); assert(link); + void *u = sd_varlink_get_userdata(link); + if (u != s) { + /* If this is a Varlink connection that does not have the Server object as userdata, then it has a SyncReq object instead. Let's finish it. */ + + SyncReq *req = u; + sd_varlink_set_userdata(link, s); /* reinstall server object */ + sync_req_free(req); + } + (void) server_start_or_stop_idle_timer(s); /* maybe we are idle now */ } @@ -155,7 +179,7 @@ int server_open_varlink(Server *s, const char *socket, int fd) { r = varlink_server_new( &s->varlink_server, - SD_VARLINK_SERVER_ROOT_ONLY|SD_VARLINK_SERVER_INHERIT_USERDATA, + SD_VARLINK_SERVER_ACCOUNT_UID|SD_VARLINK_SERVER_INHERIT_USERDATA, s); if (r < 0) return log_error_errno(r, "Failed to allocate varlink server object: %m"); @@ -188,7 +212,7 @@ int server_open_varlink(Server *s, const char *socket, int fd) { return r; if (fd < 0) - r = sd_varlink_server_listen_address(s->varlink_server, socket, 0600); + r = sd_varlink_server_listen_address(s->varlink_server, socket, 0666); else r = sd_varlink_server_listen_fd(s->varlink_server, fd); if (r < 0) diff --git a/src/journal/journald-varlink.h b/src/journal/journald-varlink.h index ab34e01117..e2c3ce40a0 100644 --- a/src/journal/journald-varlink.h +++ b/src/journal/journald-varlink.h @@ -1,6 +1,9 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ #pragma once -typedef struct Server Server; +#include "journald-server.h" +#include "journald-sync.h" int server_open_varlink(Server *s, const char *socket, int fd); + +void sync_req_varlink_reply(SyncReq *req); diff --git a/src/journal/meson.build b/src/journal/meson.build index a7fcd9a41f..1b8d9831c1 100644 --- a/src/journal/meson.build +++ b/src/journal/meson.build @@ -14,6 +14,7 @@ systemd_journald_extract_sources = files( 'journald-server.c', 'journald-socket.c', 'journald-stream.c', + 'journald-sync.c', 'journald-syslog.c', 'journald-varlink.c', 'journald-wall.c', diff --git a/src/libsystemd/meson.build b/src/libsystemd/meson.build index 988e5e0ce7..60b73f4da1 100644 --- a/src/libsystemd/meson.build +++ b/src/libsystemd/meson.build @@ -109,10 +109,12 @@ sd_netlink_sources = files( 'sd-netlink/netlink-message-rtnl.c', 'sd-netlink/netlink-message.c', 'sd-netlink/netlink-slot.c', + 'sd-netlink/netlink-sock-diag.c', 'sd-netlink/netlink-socket.c', 'sd-netlink/netlink-types-genl.c', 'sd-netlink/netlink-types-nfnl.c', 'sd-netlink/netlink-types-rtnl.c', + 'sd-netlink/netlink-types-sdnl.c', 'sd-netlink/netlink-types.c', 'sd-netlink/netlink-util.c', 'sd-netlink/sd-netlink.c', diff --git a/src/libsystemd/sd-netlink/netlink-genl.c b/src/libsystemd/sd-netlink/netlink-genl.c index 8d2269b54d..9c31105772 100644 --- a/src/libsystemd/sd-netlink/netlink-genl.c +++ b/src/libsystemd/sd-netlink/netlink-genl.c @@ -244,7 +244,7 @@ static int genl_message_new( if (!policy_set) return -EOPNOTSUPP; - r = message_new_full(nl, family->id, policy_set, + r = message_new_full(nl, family->id, NLM_F_REQUEST | NLM_F_ACK, policy_set, sizeof(struct genlmsghdr) + family->additional_header_size, &m); if (r < 0) return r; diff --git a/src/libsystemd/sd-netlink/netlink-internal.h b/src/libsystemd/sd-netlink/netlink-internal.h index 5b9ccb336e..7346902bdc 100644 --- a/src/libsystemd/sd-netlink/netlink-internal.h +++ b/src/libsystemd/sd-netlink/netlink-internal.h @@ -131,10 +131,11 @@ int message_new_empty(sd_netlink *nl, sd_netlink_message **ret); int message_new_full( sd_netlink *nl, uint16_t nlmsg_type, + uint16_t nlmsg_flags, const NLAPolicySet *policy_set, size_t header_size, sd_netlink_message **ret); -int message_new(sd_netlink *nl, sd_netlink_message **ret, uint16_t type); +int message_new(sd_netlink *nl, sd_netlink_message **ret, uint16_t type, uint16_t flags); int message_new_synthetic_error(sd_netlink *nl, int error, uint32_t serial, sd_netlink_message **ret); static inline uint32_t message_get_serial(sd_netlink_message *m) { diff --git a/src/libsystemd/sd-netlink/netlink-message-nfnl.c b/src/libsystemd/sd-netlink/netlink-message-nfnl.c index 72bcdde1bd..321b9b2006 100644 --- a/src/libsystemd/sd-netlink/netlink-message-nfnl.c +++ b/src/libsystemd/sd-netlink/netlink-message-nfnl.c @@ -34,7 +34,7 @@ int sd_nfnl_message_new(sd_netlink *nfnl, sd_netlink_message **ret, int nfproto, assert_return(nfproto_is_valid(nfproto), -EINVAL); assert_return(NFNL_MSG_TYPE(msg_type) == msg_type, -EINVAL); - r = message_new(nfnl, &m, subsys << 8 | msg_type); + r = message_new(nfnl, &m, subsys << 8 | msg_type, NLM_F_REQUEST | NLM_F_ACK); if (r < 0) return r; diff --git a/src/libsystemd/sd-netlink/netlink-message-rtnl.c b/src/libsystemd/sd-netlink/netlink-message-rtnl.c index fdde1fb2c8..b730b63fb0 100644 --- a/src/libsystemd/sd-netlink/netlink-message-rtnl.c +++ b/src/libsystemd/sd-netlink/netlink-message-rtnl.c @@ -244,13 +244,13 @@ int sd_rtnl_message_new_route( IN_SET(family, AF_INET, AF_INET6), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, + ret, + nlmsg_type, + NLM_F_REQUEST|NLM_F_ACK|(nlmsg_type == RTM_NEWROUTE ? NLM_F_CREATE | NLM_F_APPEND : 0)); if (r < 0) return r; - if (nlmsg_type == RTM_NEWROUTE) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_APPEND; - rtm = NLMSG_DATA((*ret)->hdr); rtm->rtm_family = family; @@ -281,13 +281,13 @@ int sd_rtnl_message_new_nexthop(sd_netlink *rtnl, sd_netlink_message **ret, } assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, + ret, + nlmsg_type, + NLM_F_REQUEST | NLM_F_ACK | (nlmsg_type == RTM_NEWNEXTHOP ? NLM_F_CREATE | NLM_F_REPLACE : 0)); if (r < 0) return r; - if (nlmsg_type == RTM_NEWNEXTHOP) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_REPLACE; - nhm = NLMSG_DATA((*ret)->hdr); nhm->nh_family = family; @@ -311,17 +311,18 @@ int sd_rtnl_message_new_neigh( assert_return(IN_SET(family, AF_UNSPEC, AF_INET, AF_INET6, AF_BRIDGE), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); - if (r < 0) - return r; - + uint16_t flags = NLM_F_REQUEST | NLM_F_ACK; if (nlmsg_type == RTM_NEWNEIGH) { if (family == AF_BRIDGE) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_APPEND; + flags |= NLM_F_CREATE | NLM_F_APPEND; else - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_REPLACE; + flags |= NLM_F_CREATE | NLM_F_REPLACE; } + r = message_new(rtnl, ret, nlmsg_type, flags); + if (r < 0) + return r; + ndm = NLMSG_DATA((*ret)->hdr); ndm->ndm_family = family; @@ -337,15 +338,16 @@ int sd_rtnl_message_new_link(sd_netlink *rtnl, sd_netlink_message **ret, uint16_ assert_return(rtnl_message_type_is_link(nlmsg_type), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + uint16_t flags = NLM_F_REQUEST | NLM_F_ACK; + if (nlmsg_type == RTM_NEWLINK && ifindex == 0) + flags |= NLM_F_CREATE | NLM_F_EXCL; + else if (nlmsg_type == RTM_NEWLINKPROP) + flags |= NLM_F_CREATE | NLM_F_EXCL | NLM_F_APPEND; + + r = message_new(rtnl, ret, nlmsg_type, flags); if (r < 0) return r; - if (nlmsg_type == RTM_NEWLINK && ifindex == 0) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_EXCL; - else if (nlmsg_type == RTM_NEWLINKPROP) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_EXCL | NLM_F_APPEND; - ifi = NLMSG_DATA((*ret)->hdr); ifi->ifi_family = AF_UNSPEC; @@ -371,7 +373,7 @@ int sd_rtnl_message_new_addr( IN_SET(family, AF_INET, AF_INET6), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, ret, nlmsg_type, NLM_F_REQUEST | NLM_F_ACK); if (r < 0) return r; @@ -439,13 +441,13 @@ int sd_rtnl_message_new_addrlabel( assert_return(rtnl_message_type_is_addrlabel(nlmsg_type), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, + ret, + nlmsg_type, + NLM_F_REQUEST | NLM_F_ACK | (nlmsg_type == RTM_NEWADDRLABEL ? NLM_F_CREATE | NLM_F_REPLACE : 0)); if (r < 0) return r; - if (nlmsg_type == RTM_NEWADDRLABEL) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_REPLACE; - addrlabel = NLMSG_DATA((*ret)->hdr); addrlabel->ifal_family = family; @@ -466,13 +468,13 @@ int sd_rtnl_message_new_routing_policy_rule( assert_return(rtnl_message_type_is_routing_policy_rule(nlmsg_type), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, + ret, + nlmsg_type, + NLM_F_REQUEST | NLM_F_ACK | (nlmsg_type == RTM_NEWRULE ? NLM_F_CREATE | NLM_F_EXCL : 0)); if (r < 0) return r; - if (nlmsg_type == RTM_NEWRULE) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_EXCL; - frh = NLMSG_DATA((*ret)->hdr); frh->family = family; @@ -493,13 +495,13 @@ int sd_rtnl_message_new_traffic_control( assert_return(rtnl_message_type_is_traffic_control(nlmsg_type), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, + ret, + nlmsg_type, + NLM_F_REQUEST | NLM_F_ACK | (IN_SET(nlmsg_type, RTM_NEWQDISC, RTM_NEWTCLASS) ? NLM_F_CREATE | NLM_F_REPLACE : 0)); if (r < 0) return r; - if (IN_SET(nlmsg_type, RTM_NEWQDISC, RTM_NEWTCLASS)) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_REPLACE; - tcm = NLMSG_DATA((*ret)->hdr); tcm->tcm_ifindex = ifindex; tcm->tcm_handle = handle; @@ -520,13 +522,13 @@ int sd_rtnl_message_new_mdb( assert_return(rtnl_message_type_is_mdb(nlmsg_type), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, + ret, + nlmsg_type, + NLM_F_REQUEST | NLM_F_ACK | (nlmsg_type == RTM_NEWMDB ? NLM_F_CREATE | NLM_F_REPLACE : 0)); if (r < 0) return r; - if (nlmsg_type == RTM_NEWMDB) - (*ret)->hdr->nlmsg_flags |= NLM_F_CREATE | NLM_F_REPLACE; - bpm = NLMSG_DATA((*ret)->hdr); bpm->family = AF_BRIDGE; bpm->ifindex = ifindex; @@ -545,7 +547,7 @@ int sd_rtnl_message_new_nsid( assert_return(rtnl_message_type_is_nsid(nlmsg_type), -EINVAL); assert_return(ret, -EINVAL); - r = message_new(rtnl, ret, nlmsg_type); + r = message_new(rtnl, ret, nlmsg_type, NLM_F_REQUEST | NLM_F_ACK); if (r < 0) return r; diff --git a/src/libsystemd/sd-netlink/netlink-message.c b/src/libsystemd/sd-netlink/netlink-message.c index eb88ae5e2b..8052e46768 100644 --- a/src/libsystemd/sd-netlink/netlink-message.c +++ b/src/libsystemd/sd-netlink/netlink-message.c @@ -44,6 +44,7 @@ int message_new_empty(sd_netlink *nl, sd_netlink_message **ret) { int message_new_full( sd_netlink *nl, uint16_t nlmsg_type, + uint16_t nlmsg_flags, const NLAPolicySet *policy_set, size_t header_size, sd_netlink_message **ret) { @@ -69,7 +70,7 @@ int message_new_full( if (!m->hdr) return -ENOMEM; - m->hdr->nlmsg_flags = NLM_F_REQUEST | NLM_F_ACK; + m->hdr->nlmsg_flags = nlmsg_flags; m->hdr->nlmsg_len = size; m->hdr->nlmsg_type = nlmsg_type; @@ -77,7 +78,7 @@ int message_new_full( return 0; } -int message_new(sd_netlink *nl, sd_netlink_message **ret, uint16_t nlmsg_type) { +int message_new(sd_netlink *nl, sd_netlink_message **ret, uint16_t nlmsg_type, uint16_t nlmsg_flags) { const NLAPolicySet *policy_set; size_t size; int r; @@ -85,11 +86,11 @@ int message_new(sd_netlink *nl, sd_netlink_message **ret, uint16_t nlmsg_type) { assert_return(nl, -EINVAL); assert_return(ret, -EINVAL); - r = netlink_get_policy_set_and_header_size(nl, nlmsg_type, &policy_set, &size); + r = netlink_get_policy_set_and_header_size(nl, nlmsg_type, nlmsg_flags, &policy_set, &size); if (r < 0) return r; - return message_new_full(nl, nlmsg_type, policy_set, size, ret); + return message_new_full(nl, nlmsg_type, nlmsg_flags, policy_set, size, ret); } int message_new_synthetic_error(sd_netlink *nl, int error, uint32_t serial, sd_netlink_message **ret) { @@ -98,7 +99,7 @@ int message_new_synthetic_error(sd_netlink *nl, int error, uint32_t serial, sd_n assert(error <= 0); - r = message_new(nl, ret, NLMSG_ERROR); + r = message_new(nl, ret, NLMSG_ERROR, 0); if (r < 0) return r; @@ -1328,8 +1329,12 @@ int sd_netlink_message_rewind(sd_netlink_message *m, sd_netlink *nl) { assert(m->hdr); - r = netlink_get_policy_set_and_header_size(nl, m->hdr->nlmsg_type, - &m->containers[0].policy_set, &size); + r = netlink_get_policy_set_and_header_size( + nl, + m->hdr->nlmsg_type, + m->hdr->nlmsg_flags, + &m->containers[0].policy_set, + &size); if (r < 0) return r; diff --git a/src/libsystemd/sd-netlink/netlink-sock-diag.c b/src/libsystemd/sd-netlink/netlink-sock-diag.c new file mode 100644 index 0000000000..0ed222ea20 --- /dev/null +++ b/src/libsystemd/sd-netlink/netlink-sock-diag.c @@ -0,0 +1,43 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include + +#include "netlink-internal.h" +#include "netlink-sock-diag.h" +#include "netlink-util.h" + +int sd_sock_diag_socket_open(sd_netlink **ret) { + return netlink_open_family(ret, NETLINK_SOCK_DIAG); +} + +int sd_sock_diag_message_new_unix( + sd_netlink *sdnl, + sd_netlink_message **ret, + ino_t inode, + uint64_t cookie, + uint32_t show) { + + _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *m = NULL; + int r; + + assert_return(sdnl, -EINVAL); + assert_return(ret, -EINVAL); + + r = message_new(sdnl, &m, SOCK_DIAG_BY_FAMILY, NLM_F_REQUEST | NLM_F_ACK); + if (r < 0) + return r; + + *(struct unix_diag_req*) NLMSG_DATA(m->hdr) = (struct unix_diag_req) { + .sdiag_family = AF_UNIX, + .udiag_ino = inode, + .udiag_show = show, + .udiag_cookie = { + cookie & UINT32_MAX, + (cookie >> 32) & UINT32_MAX, + }, + }; + + *ret = TAKE_PTR(m); + return 0; +} diff --git a/src/libsystemd/sd-netlink/netlink-sock-diag.h b/src/libsystemd/sd-netlink/netlink-sock-diag.h new file mode 100644 index 0000000000..043913889c --- /dev/null +++ b/src/libsystemd/sd-netlink/netlink-sock-diag.h @@ -0,0 +1,11 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include + +#include "sd-netlink.h" + +/* TODO: to be exported later */ + +int sd_sock_diag_socket_open(sd_netlink **ret); +int sd_sock_diag_message_new_unix(sd_netlink *sdnl, sd_netlink_message **ret, ino_t inode, uint64_t cookie, uint32_t show); diff --git a/src/libsystemd/sd-netlink/netlink-socket.c b/src/libsystemd/sd-netlink/netlink-socket.c index 1b098cadee..0f00e3f2fc 100644 --- a/src/libsystemd/sd-netlink/netlink-socket.c +++ b/src/libsystemd/sd-netlink/netlink-socket.c @@ -307,7 +307,7 @@ static int parse_message_one(sd_netlink *nl, uint32_t group, const struct nlmsgh goto finalize; /* check that we support this message type */ - r = netlink_get_policy_set_and_header_size(nl, hdr->nlmsg_type, NULL, &size); + r = netlink_get_policy_set_and_header_size(nl, hdr->nlmsg_type, hdr->nlmsg_flags, NULL, &size); if (r == -EOPNOTSUPP) { log_debug("sd-netlink: ignored message with unknown type: %i", hdr->nlmsg_type); goto finalize; diff --git a/src/libsystemd/sd-netlink/netlink-types-sdnl.c b/src/libsystemd/sd-netlink/netlink-types-sdnl.c new file mode 100644 index 0000000000..146f4177fd --- /dev/null +++ b/src/libsystemd/sd-netlink/netlink-types-sdnl.c @@ -0,0 +1,37 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include + +#include "missing_network.h" +#include "netlink-types-internal.h" +#include "netlink-types.h" + +static const NLAPolicy unix_diag_req_policies[] = { +}; +DEFINE_POLICY_SET(unix_diag_req); + +static const NLAPolicy sdnl_req_policies[] = { + [SOCK_DIAG_BY_FAMILY] = BUILD_POLICY_NESTED_WITH_SIZE(unix_diag_req, sizeof(struct unix_diag_req)), +}; + +DEFINE_POLICY_SET(sdnl_req); + +static const NLAPolicy unix_diag_msg_policies[] = { + [UNIX_DIAG_RQLEN] = BUILD_POLICY_WITH_SIZE(BINARY, sizeof(struct unix_diag_rqlen)), +}; +DEFINE_POLICY_SET(unix_diag_msg); + +static const NLAPolicy sdnl_msg_policies[] = { + [SOCK_DIAG_BY_FAMILY] = BUILD_POLICY_NESTED_WITH_SIZE(unix_diag_msg, sizeof(struct unix_diag_msg)), +}; + +DEFINE_POLICY_SET(sdnl_msg); + +const NLAPolicy *sdnl_get_policy(uint16_t nlmsg_type, uint16_t flags) { + /* for sock_diag we need to look at whether a message is a response or request to determine how to decode it. */ + if (flags & NLM_F_REQUEST) + return policy_set_get_policy(&sdnl_req_policy_set, nlmsg_type); + + return policy_set_get_policy(&sdnl_msg_policy_set, nlmsg_type); +} diff --git a/src/libsystemd/sd-netlink/netlink-types.c b/src/libsystemd/sd-netlink/netlink-types.c index 21ef80c2ec..e10a90c5aa 100644 --- a/src/libsystemd/sd-netlink/netlink-types.c +++ b/src/libsystemd/sd-netlink/netlink-types.c @@ -51,6 +51,7 @@ const NLAPolicySetUnion *policy_get_policy_set_union(const NLAPolicy *policy) { int netlink_get_policy_set_and_header_size( sd_netlink *nl, uint16_t type, + uint16_t flags, const NLAPolicySet **ret_policy_set, size_t *ret_header_size) { @@ -70,6 +71,9 @@ int netlink_get_policy_set_and_header_size( break; case NETLINK_GENERIC: return genl_get_policy_set_and_header_size(nl, type, ret_policy_set, ret_header_size); + case NETLINK_SOCK_DIAG: + policy = sdnl_get_policy(type, flags); + break; default: return -EOPNOTSUPP; } diff --git a/src/libsystemd/sd-netlink/netlink-types.h b/src/libsystemd/sd-netlink/netlink-types.h index e034a984e9..85ff925edd 100644 --- a/src/libsystemd/sd-netlink/netlink-types.h +++ b/src/libsystemd/sd-netlink/netlink-types.h @@ -37,6 +37,7 @@ typedef struct NLAPolicySetUnion NLAPolicySetUnion; const NLAPolicy *rtnl_get_policy(uint16_t nlmsg_type); const NLAPolicy *nfnl_get_policy(uint16_t nlmsg_type); +const NLAPolicy *sdnl_get_policy(uint16_t nlmsg_type, uint16_t nlmsg_flags); const NLAPolicySet *genl_get_policy_set_by_name(const char *name); int genl_get_policy_set_and_header_size( sd_netlink *nl, @@ -52,6 +53,7 @@ const NLAPolicySetUnion *policy_get_policy_set_union(const NLAPolicy *policy); int netlink_get_policy_set_and_header_size( sd_netlink *nl, uint16_t type, + uint16_t flags, const NLAPolicySet **ret_policy_set, size_t *ret_header_size); diff --git a/src/libsystemd/sd-netlink/test-netlink.c b/src/libsystemd/sd-netlink/test-netlink.c index fd481dced0..92138ec556 100644 --- a/src/libsystemd/sd-netlink/test-netlink.c +++ b/src/libsystemd/sd-netlink/test-netlink.c @@ -5,18 +5,22 @@ #include #include #include +#include #include #include #include +#include #include #include "sd-netlink.h" #include "alloc-util.h" #include "ether-addr-util.h" +#include "fd-util.h" #include "macro.h" #include "netlink-genl.h" #include "netlink-internal.h" +#include "netlink-sock-diag.h" #include "netlink-util.h" #include "socket-util.h" #include "stdio-util.h" @@ -699,4 +703,28 @@ TEST(rtnl_set_link_name) { ASSERT_NULL(resolved = mfree(resolved)); } +TEST(sock_diag_unix) { + _cleanup_(sd_netlink_unrefp) sd_netlink *nl = NULL; + + ASSERT_OK(sd_sock_diag_socket_open(&nl)); + + _cleanup_close_ int unix_fd = ASSERT_FD(socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0)); + ASSERT_OK(socket_autobind(unix_fd, /* ret_name= */ NULL)); + ASSERT_OK_ERRNO(listen(unix_fd, 123)); + + struct stat st; + ASSERT_OK_ERRNO(fstat(unix_fd, &st)); + + uint64_t cookie; + socklen_t cookie_len = sizeof(cookie); + ASSERT_OK_ERRNO(getsockopt(unix_fd, SOL_SOCKET, SO_COOKIE, &cookie, &cookie_len)); + ASSERT_EQ(cookie_len, sizeof(cookie)); + + _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *message = NULL; + ASSERT_OK(sd_sock_diag_message_new_unix(nl, &message, st.st_ino, cookie, UDIAG_SHOW_RQLEN)); + + _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *reply = NULL; + ASSERT_OK(sd_netlink_call(nl, message, /* usec= */ 0, &reply)); +} + DEFINE_TEST_MAIN(LOG_DEBUG); diff --git a/src/notify/notify.c b/src/notify/notify.c index 6f924d719c..4a0f88a40e 100644 --- a/src/notify/notify.c +++ b/src/notify/notify.c @@ -513,7 +513,8 @@ static int action_fork(char *const *_command) { * more interesting, "positive" information. */ on_notify_socket, &child, - &addr_string); + &addr_string, + /* ret_event_source= */ NULL); if (r < 0) return log_error_errno(r, "Failed to prepare notify socket: %m"); diff --git a/src/run/run.c b/src/run/run.c index 44949d0bcf..4005d26d45 100644 --- a/src/run/run.c +++ b/src/run/run.c @@ -30,6 +30,8 @@ #include "exec-util.h" #include "exit-status.h" #include "fd-util.h" +#include "fork-journal.h" +#include "format-table.h" #include "format-util.h" #include "fs-util.h" #include "hostname-setup.h" @@ -92,6 +94,7 @@ static char **arg_socket_property = NULL; static char **arg_timer_property = NULL; static bool arg_with_timer = false; static bool arg_quiet = false; +static bool arg_verbose = false; static bool arg_aggressive_gc = false; static char *arg_working_directory = NULL; static bool arg_shell = false; @@ -159,6 +162,7 @@ static int help(void) { " agents until unit is started up\n" " -P --pipe Pass STDIN/STDOUT/STDERR directly to service\n" " -q --quiet Suppress information messages during runtime\n" + " -v --verbose Show unit logs while executing operation\n" " --json=pretty|short|off Print unit name and invocation id as JSON\n" " -G --collect Unload unit after it ran, even when failed\n" " -S --shell Invoke a $SHELL interactively\n" @@ -338,6 +342,7 @@ static int parse_argv(int argc, char *argv[]) { { "pty-late", no_argument, NULL, 'T' }, { "pipe", no_argument, NULL, 'P' }, { "quiet", no_argument, NULL, 'q' }, + { "verbose", no_argument, NULL, 'v' }, { "on-active", required_argument, NULL, ARG_ON_ACTIVE }, { "on-boot", required_argument, NULL, ARG_ON_BOOT }, { "on-startup", required_argument, NULL, ARG_ON_STARTUP }, @@ -371,7 +376,7 @@ static int parse_argv(int argc, char *argv[]) { /* Resetting to 0 forces the invocation of an internal initialization routine of getopt_long() * that checks for GNU extensions in optstring ('-' or '+' at the beginning). */ optind = 0; - while ((c = getopt_long(argc, argv, "+hrC:H:M:E:p:tTPqGdSu:", options, NULL)) >= 0) + while ((c = getopt_long(argc, argv, "+hrC:H:M:E:p:tTPqvGdSu:", options, NULL)) >= 0) switch (c) { @@ -498,6 +503,10 @@ static int parse_argv(int argc, char *argv[]) { arg_quiet = true; break; + case 'v': + arg_verbose = true; + break; + case ARG_ON_ACTIVE: r = add_timer_property("OnActiveSec", optarg); if (r < 0) @@ -2177,6 +2186,142 @@ static int run_context_setup_ptyfwd(RunContext *c) { return 0; } +static int run_context_show_result(RunContext *c) { + int r; + + assert(c); + + _cleanup_(table_unrefp) Table *t = table_new_vertical(); + if (!t) + return log_oom(); + + if (!isempty(c->result)) { + r = table_add_many( + t, + TABLE_FIELD, "Finished with result", + TABLE_STRING, c->result, + TABLE_SET_COLOR, streq(c->result, "success") ? ansi_highlight_green() : ansi_highlight_red()); + if (r < 0) + return table_log_add_error(r); + } + + if (c->exit_code > 0) { + r = table_add_cell( + t, + /* ret_cell= */ NULL, + TABLE_FIELD, + "Main processes terminated with"); + if (r < 0) + return table_log_add_error(r); + + r = table_add_cell_stringf( + t, + /* ret_cell= */ NULL, + "code=%s, status=%u/%s", + sigchld_code_to_string(c->exit_code), + c->exit_status, + strna(c->exit_code == CLD_EXITED ? + exit_status_to_string(c->exit_status, EXIT_STATUS_FULL) : + signal_to_string(c->exit_status))); + if (r < 0) + return table_log_add_error(r); + } + + if (timestamp_is_set(c->inactive_enter_usec) && + timestamp_is_set(c->inactive_exit_usec) && + c->inactive_enter_usec > c->inactive_exit_usec) { + r = table_add_many( + t, + TABLE_FIELD, "Service runtime", + TABLE_TIMESPAN_MSEC, c->inactive_enter_usec - c->inactive_exit_usec); + if (r < 0) + return table_log_add_error(r); + } + + if (c->cpu_usage_nsec != NSEC_INFINITY) { + r = table_add_many( + t, + TABLE_FIELD, "CPU time consumed", + TABLE_TIMESPAN_MSEC, DIV_ROUND_UP(c->cpu_usage_nsec, NSEC_PER_USEC)); + if (r < 0) + return table_log_add_error(r); + } + + if (c->memory_peak != UINT64_MAX) { + const char *swap; + + if (c->memory_swap_peak != UINT64_MAX) + swap = strjoina(" (swap: ", FORMAT_BYTES(c->memory_swap_peak), ")"); + else + swap = ""; + + r = table_add_cell( + t, + /* ret_cell= */ NULL, + TABLE_FIELD, "Memory peak"); + if (r < 0) + return table_log_add_error(r); + + r = table_add_cell_stringf( + t, + /* ret_cell= */ NULL, + "%s%s", + FORMAT_BYTES(c->memory_peak), swap); + if (r < 0) + return table_log_add_error(r); + } + + const char *ip_ingress = NULL, *ip_egress = NULL; + if (!IN_SET(c->ip_ingress_bytes, 0, UINT64_MAX)) + ip_ingress = strjoina("received ", FORMAT_BYTES(c->ip_ingress_bytes)); + if (!IN_SET(c->ip_egress_bytes, 0, UINT64_MAX)) + ip_egress = strjoina("sent ", FORMAT_BYTES(c->ip_egress_bytes)); + + if (ip_ingress || ip_egress) { + r = table_add_cell( + t, + /* ret_cell= */ NULL, + TABLE_FIELD, "IP Traffic"); + if (r < 0) + return table_log_add_error(r); + + r = table_add_cell_stringf( + t, + /* ret_cell= */ NULL, + "%s%s%s", strempty(ip_ingress), ip_ingress && ip_egress ? ", " : "", strempty(ip_egress)); + if (r < 0) + return table_log_add_error(r); + } + + const char *io_read = NULL, *io_write = NULL; + if (!IN_SET(c->io_read_bytes, 0, UINT64_MAX)) + io_read = strjoina("read ", FORMAT_BYTES(c->io_read_bytes)); + if (!IN_SET(c->io_write_bytes, 0, UINT64_MAX)) + io_write = strjoina("written ", FORMAT_BYTES(c->io_write_bytes)); + + if (io_read || io_write) { + r = table_add_cell( + t, + /* ret_cell= */ NULL, + TABLE_FIELD, "IO Bytes"); + if (r < 0) + return table_log_add_error(r); + + r = table_add_cell_stringf( + t, + /* ret_cell= */ NULL, + "%s%s%s", strempty(io_read), io_read && io_write ? ", " : "", strempty(io_write)); + if (r < 0) + return table_log_add_error(r); + } + + r = table_print(t, stderr); + if (r < 0) + return table_log_print_error(r); + + return 0; +} + static int start_transient_service(sd_bus *bus) { _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL; _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; @@ -2291,6 +2436,10 @@ static int start_transient_service(sd_bus *bus) { return r; peer_fd = safe_close(peer_fd); + _cleanup_(journal_terminate) PidRef journal_pid = PIDREF_NULL; + if (arg_verbose) + (void) journal_fork(arg_runtime_scope, (const char**) STRV_MAKE(c.unit), &journal_pid); + r = bus_call_with_hint(bus, m, "service", &reply); if (r < 0) return r; @@ -2365,60 +2514,11 @@ static int start_transient_service(sd_bus *bus) { if (r < 0) return log_error_errno(r, "Failed to run event loop: %m"); - if (arg_wait && !arg_quiet) { + /* Close the journal watch logic before we output the exit summary */ + journal_terminate(&journal_pid); - if (!isempty(c.result)) - log_info("Finished with result: %s", strna(c.result)); - - if (c.exit_code > 0) - log_info("Main processes terminated with: code=%s, status=%u/%s", - sigchld_code_to_string(c.exit_code), - c.exit_status, - strna(c.exit_code == CLD_EXITED ? - exit_status_to_string(c.exit_status, EXIT_STATUS_FULL) : - signal_to_string(c.exit_status))); - - if (timestamp_is_set(c.inactive_enter_usec) && - timestamp_is_set(c.inactive_exit_usec) && - c.inactive_enter_usec > c.inactive_exit_usec) - log_info("Service runtime: %s", - FORMAT_TIMESPAN(c.inactive_enter_usec - c.inactive_exit_usec, USEC_PER_MSEC)); - - if (c.cpu_usage_nsec != NSEC_INFINITY) - log_info("CPU time consumed: %s", - FORMAT_TIMESPAN(DIV_ROUND_UP(c.cpu_usage_nsec, NSEC_PER_USEC), USEC_PER_MSEC)); - - if (c.memory_peak != UINT64_MAX) { - const char *swap; - - if (c.memory_swap_peak != UINT64_MAX) - swap = strjoina(" (swap: ", FORMAT_BYTES(c.memory_swap_peak), ")"); - else - swap = ""; - - log_info("Memory peak: %s%s", FORMAT_BYTES(c.memory_peak), swap); - } - - const char *ip_ingress = NULL, *ip_egress = NULL; - - if (!IN_SET(c.ip_ingress_bytes, 0, UINT64_MAX)) - ip_ingress = strjoina(" received: ", FORMAT_BYTES(c.ip_ingress_bytes)); - if (!IN_SET(c.ip_egress_bytes, 0, UINT64_MAX)) - ip_egress = strjoina(" sent: ", FORMAT_BYTES(c.ip_egress_bytes)); - - if (ip_ingress || ip_egress) - log_info("IP traffic%s%s", strempty(ip_ingress), strempty(ip_egress)); - - const char *io_read = NULL, *io_write = NULL; - - if (!IN_SET(c.io_read_bytes, 0, UINT64_MAX)) - io_read = strjoina(" read: ", FORMAT_BYTES(c.io_read_bytes)); - if (!IN_SET(c.io_write_bytes, 0, UINT64_MAX)) - io_write = strjoina(" written: ", FORMAT_BYTES(c.io_write_bytes)); - - if (io_read || io_write) - log_info("IO bytes%s%s", strempty(io_read), strempty(io_write)); - } + if (arg_wait && !arg_quiet) + run_context_show_result(&c); /* Try to propagate the service's return value. But if the service defines * e.g. SuccessExitStatus, honour this, and return 0 to mean "success". */ diff --git a/src/shared/fork-journal.c b/src/shared/fork-journal.c new file mode 100644 index 0000000000..ccefc74df0 --- /dev/null +++ b/src/shared/fork-journal.c @@ -0,0 +1,203 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include + +#include "build-path.h" +#include "escape.h" +#include "event-util.h" +#include "exit-status.h" +#include "fd-util.h" +#include "fork-journal.h" +#include "log.h" +#include "notify-recv.h" +#include "parse-util.h" +#include "process-util.h" +#include "signal-util.h" +#include "socket-util.h" +#include "strv.h" + +static int on_child_exit(sd_event_source *s, const siginfo_t *si, void *userdata) { + PidRef *child = ASSERT_PTR(userdata); + + assert(si); + assert(si->si_pid == child->pid); + + /* Let's first do some debug logging about the exit status of the child */ + + if (si->si_code == CLD_EXITED) { + if (si->si_status == EXIT_SUCCESS) + log_debug("journalctl " PID_FMT " exited successfully.", si->si_pid); + else + log_debug("journalctl " PID_FMT " died with a failure exit status %i, ignoring.", si->si_pid, si->si_status); + } else if (si->si_code == CLD_KILLED) + log_debug("journalctl " PID_FMT " was killed by signal %s, ignoring.", si->si_pid, signal_to_string(si->si_status)); + else if (si->si_code == CLD_DUMPED) + log_debug("journalctl " PID_FMT " dumped core by signal %s, ignoring.", si->si_pid, signal_to_string(si->si_status)); + else + log_debug("Got unexpected exit code %i via SIGCHLD, ignoring.", si->si_code); + + /* And let's then fail the whole thing, because regardless what the exit status of the child is + * (i.e. even if successful), if it exits before sending READY=1 something is wrong. */ + + return log_debug_errno(SYNTHETIC_ERRNO(EPROTO), "Child " PID_FMT " died before sending notification message.", child->pid); +} + +static int on_child_notify(sd_event_source *s, int fd, uint32_t revents, void *userdata) { + PidRef *child = ASSERT_PTR(userdata); + int r; + + assert(s); + assert(fd >= 0); + + _cleanup_strv_free_ char **msg = NULL; + _cleanup_(pidref_done) PidRef sender = PIDREF_NULL; + r = notify_recv_strv(fd, &msg, /* ret_ucred= */ NULL, &sender); + if (r == -EAGAIN) + return 0; + if (r < 0) + return r; + + if (!pidref_equal(child, &sender)) { + log_warning("Received notification message from unexpected process " PID_FMT " (expected " PID_FMT "), ignoring.", + sender.pid, child->pid); + return 0; + } + + if (strv_contains(msg, "READY=1")) + return sd_event_exit(sd_event_source_get_event(s), EXIT_SUCCESS); + + const char *e = strv_find_startswith(msg, "ERRNO="); + if (e) { + int error; + + r = safe_atoi(e, &error); + if (r < 0) { + log_debug_errno(r, "Received invalid ERRNO= notification message, ignoring: %s", e); + return 0; + } + if (error <= 0) { + log_debug("Received non-positive ERRNO= notification message, ignoring: %m"); + return 0; + } + + return -error; + } + + return 0; +} + +int journal_fork(RuntimeScope scope, const char * const *units, PidRef *ret_pidref) { + int r; + + assert(scope >= 0); + assert(scope < _RUNTIME_SCOPE_MAX); + assert(ret_pidref); + + if (!is_main_thread()) + return -EPERM; + + if (strv_isempty((char**) units)) + return 0; + + _cleanup_(sd_event_unrefp) sd_event *event = NULL; + r = sd_event_new(&event); + if (r < 0) + return r; + + _cleanup_(sd_event_source_disable_unrefp) sd_event_source *notify_event_source = NULL; + _cleanup_(pidref_done_sigkill_wait) PidRef child = PIDREF_NULL; + _cleanup_free_ char *addr_string = NULL; + r = notify_socket_prepare( + event, + SD_EVENT_PRIORITY_NORMAL-10, /* We want the notification message from the child before the SIGCHLD */ + on_child_notify, + &child, + &addr_string, + ¬ify_event_source); + if (r < 0) + return r; + + r = sd_event_source_set_exit_on_failure(notify_event_source, true); + if (r < 0) + return r; + + _cleanup_strv_free_ char **argv = strv_new( + "journalctl", + "-q", + "--follow", + "--no-pager", + "--lines=1", + "--synchronize-on-exit=yes"); + if (!argv) + return log_oom_debug(); + + STRV_FOREACH(u, units) + if (strv_extendf(&argv, + scope == RUNTIME_SCOPE_SYSTEM ? "--unit=%s" : "--user-unit=%s", + *u) < 0) + return log_oom_debug(); + + if (DEBUG_LOGGING) { + _cleanup_free_ char *l = quote_command_line(argv, SHELL_ESCAPE_EMPTY); + log_debug("Invoking '%s' as child.", strnull(l)); + } + + BLOCK_SIGNALS(SIGCHLD); + + r = pidref_safe_fork_full( + "(journalctl)", + (const int[3]) { -EBADF, STDOUT_FILENO, STDERR_FILENO }, + /* except_fds= */ NULL, + /* n_except_fds= */ 0, + FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_CLOSE_ALL_FDS|FORK_REARRANGE_STDIO, + &child); + if (r < 0) + return r; + if (r == 0) { + /* In the child: */ + + if (setenv("NOTIFY_SOCKET", addr_string, /* overwrite= */ true) < 0) { + log_debug_errno(errno, "Failed to set $NOTIFY_SOCKET: %m"); + _exit(EXIT_MEMORY); + } + + r = invoke_callout_binary(argv[0], (char**) argv); + log_debug_errno(r, "Failed to invoke journalctl: %m"); + _exit(EXIT_EXEC); + } + + _cleanup_(sd_event_source_disable_unrefp) sd_event_source *child_event_source = NULL; + r = event_add_child_pidref(event, &child_event_source, &child, WEXITED, on_child_exit, &child); + if (r < 0) + return r; + + r = sd_event_source_set_exit_on_failure(child_event_source, true); + if (r < 0) + return r; + + (void) sd_event_source_set_description(child_event_source, "fork-journal-child"); + + r = sd_event_loop(event); + if (r < 0) + return r; + assert(r == 0); + + *ret_pidref = TAKE_PIDREF(child); + + return 0; +} + +void journal_terminate(PidRef *pidref) { + int r; + + if (!pidref_is_set(pidref)) + return; + + r = pidref_kill(pidref, SIGTERM); + if (r < 0) + log_debug_errno(r, "Failed to send SIGTERM to journalctl child " PID_FMT ", ignoring: %m", pidref->pid); + + (void) pidref_wait_for_terminate_and_check("journalctl", pidref, /* flags= */ 0); + pidref_done(pidref); +} diff --git a/src/shared/fork-journal.h b/src/shared/fork-journal.h new file mode 100644 index 0000000000..c7ff87b8f7 --- /dev/null +++ b/src/shared/fork-journal.h @@ -0,0 +1,11 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include "macro.h" +#include "pidref.h" +#include "runtime-scope.h" +#include "set.h" + +int journal_fork(RuntimeScope scope, const char * const *units, PidRef *ret_pidref); + +void journal_terminate(PidRef *pidref); diff --git a/src/shared/logs-show.c b/src/shared/logs-show.c index 664bffbc71..270fe2bc34 100644 --- a/src/shared/logs-show.c +++ b/src/shared/logs-show.c @@ -363,7 +363,6 @@ finish: static int output_timestamp_realtime( FILE *f, - sd_journal *j, OutputMode mode, OutputFlags flags, usec_t usec) { @@ -372,7 +371,6 @@ static int output_timestamp_realtime( int r; assert(f); - assert(j); if (!VALID_REALTIME(usec)) return log_debug_errno(SYNTHETIC_ERRNO(EINVAL), "No valid realtime timestamp available, skipping showing journal entry."); @@ -621,7 +619,7 @@ static int output_short( } else { usec_t usec; parse_display_realtime(j, realtime, monotonic, &usec); - r = output_timestamp_realtime(f, j, mode, flags, usec); + r = output_timestamp_realtime(f, mode, flags, usec); } if (r == -EINVAL) return 0; @@ -813,7 +811,7 @@ static int output_verbose( r = get_display_realtime(j, &usec); if (IN_SET(r, -EBADMSG, -EADDRNOTAVAIL)) { - log_debug_errno(r, "Skipping message we can't read: %m"); + log_debug_errno(r, "Unable to read realtime timestamp from entry, assuming bad or partially written entry: %m"); return 0; } if (r < 0) @@ -823,6 +821,10 @@ static int output_verbose( return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "No valid realtime timestamp available"); r = sd_journal_get_cursor(j, &cursor); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to determine cursor for entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get cursor: %m"); @@ -843,12 +845,16 @@ static int output_verbose( size_t fieldlen, valuelen; c = memchr(data, '=', length); - if (!c) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Invalid field."); + if (!c) { + log_debug("Encountered field without '=', assuming bad or partially written entry, leaving."); + break; + } fieldlen = c - (const char*) data; - if (!journal_field_valid(data, fieldlen, true)) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Invalid field."); + if (!journal_field_valid(data, fieldlen, /* allow_protected= */ true)) { + log_debug("Encountered invalid field, assuming bad or partially written entry, leaving."); + break; + } r = field_set_test(output_fields, data, fieldlen); if (r < 0) @@ -860,10 +866,10 @@ static int output_verbose( p = c + 1; if (flags & OUTPUT_COLOR) { - if (startswith(data, "MESSAGE=")) { + if (memory_startswith(data, length, "MESSAGE=")) { on = ansi_highlight(); off = ansi_normal(); - } else if (startswith(data, "CONFIG_FILE=")) { + } else if (memory_startswith(data, length, "CONFIG_FILE=")) { _cleanup_free_ char *u = NULL; u = memdup_suffix0(p, valuelen); @@ -875,7 +881,7 @@ static int output_verbose( valuelen = strlen(urlified); } - } else if (startswith(data, "_")) { + } else if (memory_startswith(data, length, "_")) { /* Highlight trusted data as such */ on = ansi_green(); off = ansi_normal(); @@ -931,18 +937,34 @@ static int output_export( (void) sd_journal_set_data_threshold(j, 0); r = sd_journal_get_cursor(j, &cursor); + if (IN_SET(r, -EBADMSG, -EADDRNOTAVAIL)) { + log_debug_errno(r, "Unable to determine cursor of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get cursor: %m"); r = sd_journal_get_realtime_usec(j, &realtime); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to read realtime timestamp of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get realtime timestamp: %m"); r = sd_journal_get_monotonic_usec(j, &monotonic, &journal_boot_id); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to read monotonic timestamp of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get monotonic timestamp: %m"); r = sd_journal_get_seqnum(j, &seqnum, &seqnum_id); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to read sequence number of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get seqnum: %m"); @@ -969,12 +991,16 @@ static int output_export( continue; c = memchr(data, '=', length); - if (!c) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Invalid field."); + if (!c) { + log_debug("Encountered data field without '=', assuming bad or partially written entry, leaving."); + break; + } fieldlen = c - (const char*) data; - if (!journal_field_valid(data, fieldlen, true)) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Invalid field."); + if (!journal_field_valid(data, fieldlen, /* allow_protected= */ true)) { + log_debug("Encountered invalid field, assuming bad or partially written entry, leaving."); + break; + } r = field_set_test(output_fields, data, fieldlen); if (r < 0) @@ -1160,8 +1186,10 @@ static int update_json_data_split( return 0; fieldlen = eq - (const char*) data; - if (!journal_field_valid(data, fieldlen, true)) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Invalid field."); + if (!journal_field_valid(data, fieldlen, /* allow_protected= */ true)) { + log_debug("Encountered invalid field, assuming bad or incompletely written field, leaving."); + return 0; + } name = strndupa_safe(data, fieldlen); if (output_fields && !set_contains(output_fields, name)) @@ -1198,18 +1226,34 @@ static int output_json( (void) sd_journal_set_data_threshold(j, flags & OUTPUT_SHOW_ALL ? 0 : JSON_THRESHOLD); r = sd_journal_get_cursor(j, &cursor); + if (IN_SET(r, -EBADMSG, -EADDRNOTAVAIL)) { + log_debug_errno(r, "Unable to determine cursor of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get cursor: %m"); r = sd_journal_get_realtime_usec(j, &realtime); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to read realtime timestamp of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get realtime timestamp: %m"); r = sd_journal_get_monotonic_usec(j, &monotonic, &journal_boot_id); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to read monotonic timestamp of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get monotonic timestamp: %m"); r = sd_journal_get_seqnum(j, &seqnum, &seqnum_id); + if (r == -EBADMSG) { + log_debug_errno(r, "Unable to read sequence number of entry, assuming bad or partially written entry: %m"); + return 0; + } if (r < 0) return log_error_errno(r, "Failed to get seqnum: %m"); @@ -1525,6 +1569,10 @@ int show_journal( if (need_seek) { r = sd_journal_next(j); + if (r == -EBADMSG) { + log_debug_errno(r, "Bad or partially written entry, leaving."); + break; + } if (r < 0) return log_error_errno(r, "Failed to iterate through journal: %m"); } diff --git a/src/shared/meson.build b/src/shared/meson.build index dfcbc136f7..b5522637f0 100644 --- a/src/shared/meson.build +++ b/src/shared/meson.build @@ -77,6 +77,7 @@ shared_sources = files( 'find-esp.c', 'firewall-util-nft.c', 'firewall-util.c', + 'fork-journal.c', 'format-table.c', 'fstab-util.c', 'generator.c', diff --git a/src/shared/notify-recv.c b/src/shared/notify-recv.c index 0e64bdde04..0169a6220c 100644 --- a/src/shared/notify-recv.c +++ b/src/shared/notify-recv.c @@ -14,12 +14,12 @@ int notify_socket_prepare( int64_t priority, sd_event_io_handler_t handler, void *userdata, - char **ret_path) { + char **ret_path, + sd_event_source **ret_event_source) { int r; assert(event); - assert(ret_path); /* This creates an autobind AF_UNIX socket and adds an IO event source for the socket, which helps * prepare the notification socket used to communicate with worker processes. */ @@ -58,11 +58,17 @@ int notify_socket_prepare( (void) sd_event_source_set_description(s, "notify-socket"); - r = sd_event_source_set_floating(s, true); - if (r < 0) - return log_debug_errno(r, "Failed to make notification event source floating: %m"); + if (ret_event_source) + *ret_event_source = TAKE_PTR(s); + else { + r = sd_event_source_set_floating(s, true); + if (r < 0) + return log_debug_errno(r, "Failed to make notification event source floating: %m"); + } + + if (ret_path) + *ret_path = TAKE_PTR(path); - *ret_path = TAKE_PTR(path); return 0; } diff --git a/src/shared/notify-recv.h b/src/shared/notify-recv.h index 24482fdf1d..9035ad4872 100644 --- a/src/shared/notify-recv.h +++ b/src/shared/notify-recv.h @@ -13,7 +13,8 @@ int notify_socket_prepare( int64_t priority, sd_event_io_handler_t handler, void *userdata, - char **ret_path); + char **ret_path, + sd_event_source **ret_event_source); int notify_recv_with_fds( int fd, diff --git a/src/shared/socket-netlink.c b/src/shared/socket-netlink.c index 0054641c22..1ecb2284fe 100644 --- a/src/shared/socket-netlink.c +++ b/src/shared/socket-netlink.c @@ -3,8 +3,12 @@ #include #include #include +#include #include #include +#include + +#include "sd-netlink.h" #include "alloc-util.h" #include "errno-util.h" @@ -13,6 +17,7 @@ #include "log.h" #include "memory-util.h" #include "namespace-util.h" +#include "netlink-sock-diag.h" #include "netlink-util.h" #include "parse-util.h" #include "socket-netlink.h" @@ -479,3 +484,63 @@ int netns_get_nsid(int netnsfd, uint32_t *ret) { return -ENXIO; } + +int af_unix_get_qlen(int fd, uint32_t *ret) { + int r; + + assert(fd >= 0); + assert(ret); + + /* Returns the current queue length for an AF_UNIX listening socket */ + + struct stat st; + if (fstat(fd, &st) < 0) + return -errno; + if (!S_ISSOCK(st.st_mode)) + return -ENOTSOCK; + + _cleanup_(sd_netlink_unrefp) sd_netlink *nl = NULL; + r = sd_sock_diag_socket_open(&nl); + if (r < 0) + return r; + + uint64_t cookie = 0; + socklen_t cookie_len = sizeof(cookie); + if (getsockopt(fd, SOL_SOCKET, SO_COOKIE, &cookie, &cookie_len) < 0) + return -errno; + + assert(cookie_len == sizeof(cookie)); + + _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *message = NULL; + r = sd_sock_diag_message_new_unix(nl, &message, st.st_ino, cookie, UDIAG_SHOW_RQLEN); + if (r < 0) + return r; + + _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *reply = NULL; + r = sd_netlink_call(nl, message, /* usec= */ 0, &reply); + if (r < 0) + return r; + + for (sd_netlink_message *m = reply; m; m = sd_netlink_message_next(m)) { + r = sd_netlink_message_get_errno(m); + if (r < 0) + return r; + + _cleanup_free_ void *data = NULL; + size_t size = 0; + + r = sd_netlink_message_read_data(m, UNIX_DIAG_RQLEN, &size, &data); + if (r == -ENODATA) + continue; + if (r < 0) + return r; + + assert(size == sizeof(struct unix_diag_rqlen)); + const struct unix_diag_rqlen *udrql = ASSERT_PTR(data); + + *ret = udrql->udiag_rqueue; + return 0; + } + + return -ENODATA; +} diff --git a/src/shared/socket-netlink.h b/src/shared/socket-netlink.h index 12647b3d14..88b9c7688b 100644 --- a/src/shared/socket-netlink.h +++ b/src/shared/socket-netlink.h @@ -45,3 +45,5 @@ int in_addr_full_new_from_string(const char *s, struct in_addr_full **ret); const char* in_addr_full_to_string(struct in_addr_full *a); int netns_get_nsid(int netnsfd, uint32_t *ret); + +int af_unix_get_qlen(int fd, uint32_t *ret); diff --git a/src/shared/varlink-io.systemd.Journal.c b/src/shared/varlink-io.systemd.Journal.c index ddc3637086..a084d8753d 100644 --- a/src/shared/varlink-io.systemd.Journal.c +++ b/src/shared/varlink-io.systemd.Journal.c @@ -2,7 +2,11 @@ #include "varlink-io.systemd.Journal.h" -static SD_VARLINK_DEFINE_METHOD(Synchronize); +static SD_VARLINK_DEFINE_METHOD( + Synchronize, + SD_VARLINK_FIELD_COMMENT("Controls whether to offline the journal files as part of the synchronization operation."), + SD_VARLINK_DEFINE_INPUT(offline, SD_VARLINK_BOOL, SD_VARLINK_NULLABLE)); + static SD_VARLINK_DEFINE_METHOD(Rotate); static SD_VARLINK_DEFINE_METHOD(FlushToVar); static SD_VARLINK_DEFINE_METHOD(RelinquishVar); diff --git a/src/systemctl/systemctl-start-unit.c b/src/systemctl/systemctl-start-unit.c index 7ea2cf8998..7e81afae00 100644 --- a/src/systemctl/systemctl-start-unit.c +++ b/src/systemctl/systemctl-start-unit.c @@ -10,6 +10,7 @@ #include "bus-util.h" #include "bus-wait-for-jobs.h" #include "bus-wait-for-units.h" +#include "fork-journal.h" #include "macro.h" #include "special.h" #include "string-util.h" @@ -388,9 +389,13 @@ int verb_start(int argc, char *argv[], void *userdata) { return log_error_errno(r, "Failed to allocate unit watch context: %m"); } + _cleanup_(journal_terminate) PidRef journal_pid = PIDREF_NULL; if (arg_marked) ret = enqueue_marked_jobs(bus, w); - else + else { + if (arg_verbose) + (void) journal_fork(arg_runtime_scope, (const char**) names, &journal_pid); + STRV_FOREACH(name, names) { _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; @@ -404,6 +409,7 @@ int verb_start(int argc, char *argv[], void *userdata) { return log_oom(); } } + } if (!arg_no_block) { const char *extra_args[4]; diff --git a/src/systemctl/systemctl.c b/src/systemctl/systemctl.c index fdbde7dc5e..3c691c5dee 100644 --- a/src/systemctl/systemctl.c +++ b/src/systemctl/systemctl.c @@ -90,6 +90,7 @@ bool arg_show_types = false; int arg_check_inhibitors = -1; bool arg_dry_run = false; bool arg_quiet = false; +bool arg_verbose = false; bool arg_no_warn = false; bool arg_full = false; bool arg_recursive = false; @@ -304,6 +305,7 @@ static int systemctl_help(void) { " suspend-then-hibernate, hybrid-sleep, default,\n" " rescue, emergency, and exit.\n" " -q --quiet Suppress output\n" + " -v --verbose Show unit logs while executing operation\n" " --no-warn Suppress several warnings shown by default\n" " --wait For (re)start, wait until service stopped again\n" " For is-system-running, wait until startup is completed\n" @@ -510,6 +512,7 @@ static int systemctl_parse_argv(int argc, char *argv[]) { { "no-wall", no_argument, NULL, ARG_NO_WALL }, { "dry-run", no_argument, NULL, ARG_DRY_RUN }, { "quiet", no_argument, NULL, 'q' }, + { "verbose", no_argument, NULL, 'v' }, { "no-warn", no_argument, NULL, ARG_NO_WARN }, { "root", required_argument, NULL, ARG_ROOT }, { "image", required_argument, NULL, ARG_IMAGE }, @@ -556,7 +559,7 @@ static int systemctl_parse_argv(int argc, char *argv[]) { /* We default to allowing interactive authorization only in systemctl (not in the legacy commands) */ arg_ask_password = true; - while ((c = getopt_long(argc, argv, "hC:t:p:P:alqfs:H:M:n:o:iTr.::", options, NULL)) >= 0) + while ((c = getopt_long(argc, argv, "hC:t:p:P:alqvfs:H:M:n:o:iTr.::", options, NULL)) >= 0) switch (c) { @@ -770,6 +773,10 @@ static int systemctl_parse_argv(int argc, char *argv[]) { break; + case 'v': + arg_verbose = true; + break; + case 'f': arg_force++; break; diff --git a/src/systemctl/systemctl.h b/src/systemctl/systemctl.h index 00405f4705..e282380323 100644 --- a/src/systemctl/systemctl.h +++ b/src/systemctl/systemctl.h @@ -68,6 +68,7 @@ extern bool arg_show_types; extern int arg_check_inhibitors; extern bool arg_dry_run; extern bool arg_quiet; +extern bool arg_verbose; extern bool arg_no_warn; extern bool arg_full; extern bool arg_recursive; diff --git a/src/sysupdate/sysupdate-transfer.c b/src/sysupdate/sysupdate-transfer.c index 0247864eca..9ed55dd816 100644 --- a/src/sysupdate/sysupdate-transfer.c +++ b/src/sysupdate/sysupdate-transfer.c @@ -1062,7 +1062,8 @@ static int run_callout( SD_EVENT_PRIORITY_NORMAL - 5, helper_on_notify, ctx, - &bind_name); + &bind_name, + /* ret_event_source= */ NULL); if (r < 0) return log_error_errno(r, "Failed to prepare notify socket: %m"); diff --git a/src/sysupdate/sysupdated.c b/src/sysupdate/sysupdated.c index e75bfb4a16..9a8402482c 100644 --- a/src/sysupdate/sysupdated.c +++ b/src/sysupdate/sysupdated.c @@ -1740,7 +1740,8 @@ static int manager_new(Manager **ret) { SD_EVENT_PRIORITY_NORMAL, manager_on_notify, m, - &m->notify_socket_path); + &m->notify_socket_path, + /* ret_event_source= */ NULL); if (r < 0) return r; diff --git a/src/test/test-notify-recv.c b/src/test/test-notify-recv.c index f4f96a74e2..3178b9da31 100644 --- a/src/test/test-notify-recv.c +++ b/src/test/test-notify-recv.c @@ -87,7 +87,7 @@ TEST(notify_socket_prepare) { .pidref = PIDREF_NULL, }; _cleanup_free_ char *path = NULL; - ASSERT_OK(notify_socket_prepare(e, SD_EVENT_PRIORITY_NORMAL - 10, on_recv, &c, &path)); + ASSERT_OK(notify_socket_prepare(e, SD_EVENT_PRIORITY_NORMAL - 10, on_recv, &c, &path, /* ret_event_source= */ NULL)); ASSERT_OK(sigprocmask_many(SIG_BLOCK, NULL, SIGCHLD)); diff --git a/src/test/test-socket-netlink.c b/src/test/test-socket-netlink.c index 659e583fa3..45144a16ea 100644 --- a/src/test/test-socket-netlink.c +++ b/src/test/test-socket-netlink.c @@ -1,6 +1,9 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ +#include + #include "alloc-util.h" +#include "fd-util.h" #include "missing_network.h" #include "socket-netlink.h" #include "string-util.h" @@ -381,4 +384,32 @@ TEST(netns_get_nsid) { log_info("Our NSID is %" PRIu32, u); } +TEST(af_unix_get_qlen) { + _cleanup_close_ int unix_fd = ASSERT_FD(socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0)); + ASSERT_OK(socket_autobind(unix_fd, /* ret_name= */ NULL)); + ASSERT_OK_ERRNO(listen(unix_fd, 123)); + + uint32_t q; + ASSERT_OK(af_unix_get_qlen(unix_fd, &q)); + ASSERT_EQ(q, 0U); + + _cleanup_close_ int conn_fd = ASSERT_FD(socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)); + union sockaddr_union sa; + socklen_t salen = sizeof(sa); + ASSERT_OK_ERRNO(getsockname(unix_fd, &sa.sa, &salen)); + ASSERT_OK(connect(conn_fd, &sa.sa, salen)); + + ASSERT_OK(af_unix_get_qlen(unix_fd, &q)); + ASSERT_EQ(q, 1U); + + _cleanup_close_ int conn2_fd = ASSERT_FD(socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)); + ASSERT_OK(connect(conn2_fd, &sa.sa, salen)); + + ASSERT_OK(af_unix_get_qlen(unix_fd, &q)); + ASSERT_EQ(q, 2U); + + _cleanup_close_ int efd = ASSERT_FD(eventfd(0, EFD_CLOEXEC)); + ASSERT_ERROR(af_unix_get_qlen(efd, &q), ENOTSOCK); +} + DEFINE_TEST_MAIN(LOG_DEBUG); diff --git a/src/udev/udev-manager.c b/src/udev/udev-manager.c index fe26496618..2cd40f1d1d 100644 --- a/src/udev/udev-manager.c +++ b/src/udev/udev-manager.c @@ -1227,7 +1227,8 @@ static int manager_start_worker_notify(Manager *manager) { EVENT_PRIORITY_WORKER_NOTIFY, on_worker_notify, manager, - &manager->worker_notify_socket_path); + &manager->worker_notify_socket_path, + /* ret_event_source= */ NULL); if (r < 0) return log_error_errno(r, "Failed to prepare worker notification socket: %m"); diff --git a/test/units/TEST-04-JOURNAL.LogFilterPatterns.sh b/test/units/TEST-04-JOURNAL.LogFilterPatterns.sh index 79482d6078..277e367b32 100755 --- a/test/units/TEST-04-JOURNAL.LogFilterPatterns.sh +++ b/test/units/TEST-04-JOURNAL.LogFilterPatterns.sh @@ -36,6 +36,7 @@ run_service_and_fetch_logs() { fi systemctl start "$unit" + journalctl --sync journalctl -q -u "$unit" -I -p notice } diff --git a/test/units/TEST-07-PID1.startv.sh b/test/units/TEST-07-PID1.startv.sh new file mode 100755 index 0000000000..bcc7c0bc68 --- /dev/null +++ b/test/units/TEST-07-PID1.startv.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# SPDX-License-Identifier: LGPL-2.1-or-later +# shellcheck disable=SC2016 +set -eux +set -o pipefail + +# shellcheck source=test/units/util.sh +. "$(dirname "$0")"/util.sh + +systemd-run -v --wait echo wampfl | grep wampfl + +systemd-run -v -p Type=notify bash -c 'echo brumfl ; systemd-notify --ready ; echo krass' | grep brumfl + +mkdir -p /run/systemd/journald.conf.d/ + +# Let's disable storage of debug messages, since we want to flood the journal +# daemon with messages that it will have to process, but we do not actually +# want to push out our own messages from storage while doing so +cat >> /run/systemd/journald.conf.d/50-disable-debug.conf <