journalctl: optionally delay --follow exit for a journal synchronization

Let's optionally issue a Varlink Synchronize() call in --follow mode
when asked to terminate. This is useful so that the tool can be called
and it is guaranteed it processed all messages generated before the
request to exit before it exits.

We want this in "systemd-run -v" in particular, so that we can be sure
we are not missing any log output from the invoked service before it
exits
This commit is contained in:
Lennart Poettering
2025-03-25 08:01:50 -04:00
parent 5be930db9f
commit d2f45c7681
7 changed files with 118 additions and 19 deletions

View File

@@ -797,6 +797,20 @@
--"), any warning messages regarding inaccessible system journals when run as a normal
user.</para></listitem>
</varlistentry>
<varlistentry>
<term><option>--synchronize-on-exit=</option></term>
<listitem><para>Takes a boolean argument. If true and operating in <option>--follow</option> mode, a
journal synchronization request (equivalent to <command>journalctl --sync</command>) is issued when
<constant>SIGTERM</constant>/<constant>SIGINT</constant> is received, and log output continues until
this request completes. This is useful for synchronizing journal log output to the runtime of
services or external events, ensuring that any log data enqueued to the logging subsystem by
the time <constant>SIGTERM</constant>/<constant>SIGINT</constant> is issued is guaranteed to be
processed and displayed by the time log output ends. Defaults to false.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
</variablelist>
</refsect1>

View File

@@ -4,6 +4,7 @@
#include "sd-daemon.h"
#include "sd-event.h"
#include "sd-varlink.h"
#include "ansi-color.h"
#include "fileio.h"
@@ -11,7 +12,7 @@
#include "journalctl-filter.h"
#include "journalctl-show.h"
#include "journalctl-util.h"
#include "log.h"
#include "journalctl-varlink.h"
#include "logs-show.h"
#include "terminal-util.h"
@@ -27,11 +28,15 @@ typedef struct Context {
sd_id128_t previous_boot_id;
sd_id128_t previous_boot_id_output;
dual_timestamp previous_ts_output;
sd_event *event;
sd_varlink *synchronize_varlink;
} Context;
static void context_done(Context *c) {
assert(c);
c->synchronize_varlink = sd_varlink_flush_close_unref(c->synchronize_varlink);
c->event = sd_event_unref(c->event);
sd_journal_close(c->journal);
}
@@ -270,15 +275,14 @@ static int show(Context *c) {
return n_shown;
}
static int show_and_fflush(Context *c, sd_event_source *s) {
static int show_and_fflush(Context *c) {
int r;
assert(c);
assert(s);
r = show(c);
if (r < 0)
return sd_event_exit(sd_event_source_get_event(s), r);
return sd_event_exit(c->event, r);
fflush(stdout);
return 0;
@@ -293,10 +297,10 @@ static int on_journal_event(sd_event_source *s, int fd, uint32_t revents, void *
r = sd_journal_process(c->journal);
if (r < 0) {
log_error_errno(r, "Failed to process journal events: %m");
return sd_event_exit(sd_event_source_get_event(s), r);
return sd_event_exit(c->event, r);
}
return show_and_fflush(c, s);
return show_and_fflush(c);
}
static int on_first_event(sd_event_source *s, void *userdata) {
@@ -305,7 +309,7 @@ static int on_first_event(sd_event_source *s, void *userdata) {
assert(s);
r = show_and_fflush(c, s);
r = show_and_fflush(c);
if (r < 0)
return r;
@@ -334,29 +338,92 @@ static int on_first_event(sd_event_source *s, void *userdata) {
return 0;
}
static int on_synchronize_reply(
sd_varlink *vl,
sd_json_variant *parameters,
const char *error_id,
sd_varlink_reply_flags_t flags,
void *userdata) {
Context *c = ASSERT_PTR(userdata);
int r;
assert(vl);
if (error_id)
log_warning("Failed to synchronize on Journal, ignoring: %s", error_id);
r = show_and_fflush(c);
if (r < 0)
return r;
return sd_event_exit(c->event, EXIT_SUCCESS);
}
static int on_signal(sd_event_source *s, const struct signalfd_siginfo *si, void *userdata) {
_cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *vl = NULL;
Context *c = ASSERT_PTR(userdata);
int r;
assert(s);
assert(si);
assert(IN_SET(si->ssi_signo, SIGTERM, SIGINT));
return sd_event_exit(sd_event_source_get_event(s), si->ssi_signo);
if (!arg_synchronize_on_exit)
goto finish;
if (c->synchronize_varlink) /* Already pending? Then exit immediately, so that user can cancel the sync */
return sd_event_exit(c->event, EXIT_SUCCESS);
r = varlink_connect_journal(&vl);
if (r < 0) {
log_error_errno(r, "Failed to connect to Journal Varlink IPC interface, ignoring: %m");
goto finish;
}
/* Set a low priority on the idle event handler, so that we show any log messages first */
r = sd_varlink_attach_event(vl, c->event, SD_EVENT_PRIORITY_IDLE);
if (r < 0) {
log_warning_errno(r, "Failed to attach Varlink connectio to event loop: %m");
goto finish;
}
r = sd_varlink_bind_reply(vl, on_synchronize_reply);
if (r < 0) {
log_warning_errno(r, "Failed to bind synchronization reply: %m");
goto finish;
}
(void) sd_varlink_set_userdata(vl, c);
r = sd_varlink_invoke(vl, "io.systemd.Journal.Synchronize", /* parameters= */ NULL);
if (r < 0) {
log_warning_errno(r, "Failed to issue synchronization request: %m");
goto finish;
}
c->synchronize_varlink = TAKE_PTR(vl);
return 0;
finish:
return sd_event_exit(c->event, si->ssi_signo);
}
static int setup_event(Context *c, int fd, sd_event **ret) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
static int setup_event(Context *c, int fd) {
int r;
assert(arg_follow);
assert(c);
assert(fd >= 0);
assert(ret);
assert(!c->event);
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
r = sd_event_default(&e);
if (r < 0)
return log_error_errno(r, "Failed to allocate sd_event object: %m");
(void) sd_event_add_signal(e, NULL, SIGTERM | SD_EVENT_SIGNAL_PROCMASK, on_signal, NULL);
(void) sd_event_add_signal(e, NULL, SIGINT | SD_EVENT_SIGNAL_PROCMASK, on_signal, NULL);
(void) sd_event_add_signal(e, /* ret_event_source= */ NULL, SIGTERM | SD_EVENT_SIGNAL_PROCMASK, on_signal, c);
(void) sd_event_add_signal(e, /* ret_event_source= */ NULL, SIGINT | SD_EVENT_SIGNAL_PROCMASK, on_signal, c);
r = sd_event_add_io(e, NULL, fd, EPOLLIN, &on_journal_event, c);
if (r < 0)
@@ -378,7 +445,7 @@ static int setup_event(Context *c, int fd, sd_event **ret) {
return log_error_errno(r, "Failed to add defer event source: %m");
}
*ret = TAKE_PTR(e);
c->event = TAKE_PTR(e);
return 0;
}
@@ -466,16 +533,15 @@ int action_show(char **matches) {
}
if (arg_follow) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
int sig;
assert(poll_fd >= 0);
r = setup_event(&c, poll_fd, &e);
r = setup_event(&c, poll_fd);
if (r < 0)
return r;
r = sd_event_loop(e);
r = sd_event_loop(c.event);
if (r < 0)
return r;
sig = r;

View File

@@ -11,7 +11,7 @@
#include "log.h"
#include "varlink-util.h"
static int varlink_connect_journal(sd_varlink **ret) {
int varlink_connect_journal(sd_varlink **ret) {
_cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *vl = NULL;
const char *address;
int r;

View File

@@ -1,6 +1,10 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once
#include "sd-varlink.h"
int varlink_connect_journal(sd_varlink **ret);
int action_flush_to_var(void);
int action_relinquish_var(void);
int action_rotate(void);

View File

@@ -95,6 +95,7 @@ char *arg_pattern = NULL;
pcre2_code *arg_compiled_pattern = NULL;
PatternCompileCase arg_case = PATTERN_COMPILE_CASE_AUTO;
ImagePolicy *arg_image_policy = NULL;
bool arg_synchronize_on_exit = false;
STATIC_DESTRUCTOR_REGISTER(arg_cursor, freep);
STATIC_DESTRUCTOR_REGISTER(arg_cursor_file, freep);
@@ -269,6 +270,8 @@ static int help(void) {
" --no-tail Show all lines, even in follow mode\n"
" --truncate-newline Truncate entries by first newline character\n"
" -q --quiet Do not show info messages and privilege warning\n"
" --synchronize-on-exit=BOOL\n"
" Wait for Journal synchronization before exiting\n"
"\n%3$sPager Control Options:%4$s\n"
" --no-pager Do not pipe output into a pager\n"
" -e --pager-end Immediately jump to the end in the pager\n"
@@ -357,6 +360,7 @@ static int parse_argv(int argc, char *argv[]) {
ARG_OUTPUT_FIELDS,
ARG_NAMESPACE,
ARG_LIST_NAMESPACES,
ARG_SYNCHRONIZE_ON_EXIT,
};
static const struct option options[] = {
@@ -430,6 +434,7 @@ static int parse_argv(int argc, char *argv[]) {
{ "output-fields", required_argument, NULL, ARG_OUTPUT_FIELDS },
{ "namespace", required_argument, NULL, ARG_NAMESPACE },
{ "list-namespaces", no_argument, NULL, ARG_LIST_NAMESPACES },
{ "synchronize-on-exit", required_argument, NULL, ARG_SYNCHRONIZE_ON_EXIT },
{}
};
@@ -973,6 +978,14 @@ static int parse_argv(int argc, char *argv[]) {
break;
}
case ARG_SYNCHRONIZE_ON_EXIT:
r = parse_boolean_argument("--synchronize-on-exit", optarg, &arg_synchronize_on_exit);
if (r < 0)
return r;
break;
case '?':
return -EINVAL;

View File

@@ -99,6 +99,7 @@ extern char *arg_pattern;
extern pcre2_code *arg_compiled_pattern;
extern PatternCompileCase arg_case;
extern ImagePolicy *arg_image_policy;
extern bool arg_synchronize_on_exit;
static inline bool arg_lines_needs_seek_end(void) {
return arg_lines >= 0 && !arg_lines_oldest;

View File

@@ -127,7 +127,8 @@ int journal_fork(RuntimeScope scope, const char * const *units, PidRef *ret_pidr
"-q",
"--follow",
"--no-pager",
"--lines=1");
"--lines=1",
"--synchronize-on-exit=yes");
if (!argv)
return log_oom_debug();