sysupdate: Report download progress via sd_notify

We set up a NOTIFY_SOCKET to get download progress notifications from
each individual import helper. Along with the number of import jobs we
have to run, this gives an overall progress value which we report using
sd_notify
This commit is contained in:
Adrian Vovk
2023-06-22 19:47:20 -04:00
committed by Tom Coldrick
parent e37164cafc
commit 8db5e9b657
4 changed files with 283 additions and 22 deletions

View File

@@ -8,6 +8,7 @@
#include "chase.h"
#include "conf-parser.h"
#include "dirent-util.h"
#include "event-util.h"
#include "fd-util.h"
#include "glyph-util.h"
#include "gpt.h"
@@ -16,8 +17,12 @@
#include "mkdir.h"
#include "parse-helpers.h"
#include "parse-util.h"
#include "percent-util.h"
#include "process-util.h"
#include "random-util.h"
#include "rm-rf.h"
#include "signal-util.h"
#include "socket-util.h"
#include "specifier.h"
#include "stat-util.h"
#include "stdio-util.h"
@@ -53,7 +58,7 @@ Transfer *transfer_free(Transfer *t) {
return mfree(t);
}
Transfer *transfer_new(void) {
Transfer *transfer_new(Context *ctx) {
Transfer *t;
t = new(Transfer, 1);
@@ -78,6 +83,8 @@ Transfer *transfer_new(void) {
.install_read_only = -1,
.partition_info = PARTITION_INFO_NULL,
.context = ctx,
};
return t;
@@ -784,30 +791,234 @@ static void compile_pattern_fields(
memcpy(ret->sha256sum, i->metadata.sha256sum, sizeof(ret->sha256sum));
}
typedef struct CalloutContext {
const Transfer *transfer;
const Instance *instance;
TransferProgress callback;
PidRef pid;
const char *name;
void* userdata;
} CalloutContext;
static CalloutContext *callout_context_free(CalloutContext *ctx) {
if (!ctx)
return NULL;
/* We don't own any data but need to clean up the job pid */
pidref_done(&ctx->pid);
return mfree(ctx);
}
DEFINE_TRIVIAL_CLEANUP_FUNC(CalloutContext*, callout_context_free);
static int callout_context_new(const Transfer *t, const Instance *i, TransferProgress cb,
const char *name, void* userdata, CalloutContext **ret) {
_cleanup_(callout_context_freep) CalloutContext *ctx = NULL;
assert(t);
assert(i);
assert(cb);
ctx = new(CalloutContext, 1);
if (!ctx)
return -ENOMEM;
*ctx = (CalloutContext) {
.transfer = t,
.instance = i,
.callback = cb,
.pid = PIDREF_NULL,
.name = name,
.userdata = userdata,
};
*ret = TAKE_PTR(ctx);
return 0;
}
static int helper_on_exit(sd_event_source *s, const siginfo_t *si, void *userdata) {
_cleanup_(callout_context_freep) CalloutContext *ctx = ASSERT_PTR(userdata);
int code;
assert(s);
assert(si);
assert(ctx);
if (si->si_code == CLD_EXITED) {
code = si->si_status;
if (code != EXIT_SUCCESS)
log_error("%s failed with exit status %i.", ctx->name, code);
else
log_debug("%s succeeded.", ctx->name);
} else {
code = -EPROTO;
if (IN_SET(si->si_code, CLD_KILLED, CLD_DUMPED))
log_error("%s terminated by signal %s.", ctx->name, signal_to_string(si->si_status));
else
log_error("%s failed due to unknown reason.", ctx->name);
}
pidref_done(&ctx->pid);
return sd_event_exit(sd_event_source_get_event(s), code);
}
static int helper_on_notify(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
char buf[NOTIFY_BUFFER_MAX+1];
struct iovec iovec = {
.iov_base = buf,
.iov_len = sizeof(buf)-1,
};
CMSG_BUFFER_TYPE(CMSG_SPACE(sizeof(struct ucred))) control;
struct msghdr msghdr = {
.msg_iov = &iovec,
.msg_iovlen = 1,
.msg_control = &control,
.msg_controllen = sizeof(control),
};
struct ucred *ucred;
CalloutContext *ctx = ASSERT_PTR(userdata);
char* progress_str;
int progress;
ssize_t n;
n = recvmsg_safe(fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
if (n < 0) {
if (ERRNO_IS_TRANSIENT(n))
return 0;
return (int) n;
}
cmsg_close_all(&msghdr);
if (msghdr.msg_flags & MSG_TRUNC) {
log_warning("Got overly long notification datagram, ignoring.");
return 0;
}
ucred = CMSG_FIND_DATA(&msghdr, SOL_SOCKET, SCM_CREDENTIALS, struct ucred);
if (!ucred || ucred->pid <= 0) {
log_warning("Got notification datagram lacking credential information, ignoring.");
return 0;
}
if (ucred->pid != ctx->pid.pid) {
log_warning("Got notification datagram from unexpected peer, ignoring.");
return 0;
}
buf[n] = 0;
progress_str = find_line_startswith(buf, "X_IMPORT_PROGRESS=");
if (!progress_str)
return 0;
truncate_nl(progress_str);
progress = parse_percent(progress_str);
if (progress < 0) {
log_warning("Got invalid percent value '%s', ignoring.", progress_str);
return 0;
}
return ctx->callback(ctx->transfer, ctx->instance, progress);
}
static int run_callout(
const char *name,
char *cmdline[]) {
char *cmdline[],
const Transfer *transfer,
const Instance *instance,
TransferProgress callback,
void *userdata) {
_cleanup_(sd_event_unrefp) sd_event *event = NULL;
_cleanup_(sd_event_source_unrefp) sd_event_source *exit_source = NULL, *notify_source = NULL;
_cleanup_close_ int fd = -EBADF;
_cleanup_free_ char *bind_name = NULL;
union sockaddr_union bsa;
int r;
assert(name);
assert(cmdline);
assert(cmdline[0]);
r = safe_fork(name, FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG|FORK_WAIT, NULL);
_cleanup_(callout_context_freep) CalloutContext *ctx = NULL;
r = callout_context_new(transfer, instance, callback, name, userdata, &ctx);
if (r < 0)
return r;
return log_oom();
r = sd_event_new(&event);
if (r < 0)
return log_error_errno(r, "Failed to create event: %m");
/* Kill the helper & return an error if we get interrupted by a signal */
r = sd_event_add_signal(event, NULL, SIGINT | SD_EVENT_SIGNAL_PROCMASK, NULL, INT_TO_PTR(-ECANCELED));
if (r < 0)
return log_error_errno(r, "Failed to register signal to event: %m");
r = sd_event_add_signal(event, NULL, SIGTERM | SD_EVENT_SIGNAL_PROCMASK, NULL, INT_TO_PTR(-ECANCELED));
if (r < 0)
return log_error_errno(r, "Failed to register signal to event: %m");
fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
if (fd < 0)
return log_error_errno(errno, "Failed to create UNIX socket for notification: %m");
if (asprintf(&bind_name, "@%" PRIx64 "/sysupdate/" PID_FMT "/notify", random_u64(), getpid_cached()) < 0)
return log_oom();
r = sockaddr_un_set_path(&bsa.un, bind_name);
if (r < 0)
return log_error_errno(r, "Failed to set socket path: %m");
if (bind(fd, &bsa.sa, r) < 0)
return log_error_errno(errno, "Failed to bind to notification socket: %m");
r = setsockopt_int(fd, SOL_SOCKET, SO_PASSCRED, true);
if (r < 0)
return log_error_errno(r, "Failed to set socket options: %m");
r = pidref_safe_fork(ctx->name, FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &ctx->pid);
if (r < 0)
return log_error_errno(r, "Failed to fork process %s: %m", ctx->name);
if (r == 0) {
/* Child */
if (setenv("NOTIFY_SOCKET", bind_name, 1) < 0) {
log_error_errno(errno, "setenv() failed: %m");
_exit(EXIT_FAILURE);
}
r = invoke_callout_binary(cmdline[0], (char *const*) cmdline);
log_error_errno(r, "Failed to execute %s tool: %m", cmdline[0]);
_exit(EXIT_FAILURE);
}
return 0;
/* Quit the loop w/ when child process exits */
r = event_add_child_pidref(event, &exit_source, &ctx->pid, WEXITED, helper_on_exit, (void*) ctx);
if (r < 0)
return log_error_errno(r, "Failed to add child process to event loop: %m");
r = sd_event_source_set_child_process_own(exit_source, true);
if (r < 0)
return log_error_errno(r, "Failed to take ownership of child process: %m");
/* Propagate sd_notify calls */
r = sd_event_add_io(event, &notify_source, fd, EPOLLIN, helper_on_notify, TAKE_PTR(ctx));
if (r < 0)
return log_error_errno(r, "Failed to add notification propagation to event loop: %m");
(void) sd_event_source_set_description(notify_source, "notify-socket");
(void) sd_event_source_set_priority(notify_source, SD_EVENT_PRIORITY_NORMAL - 5);
r = sd_event_source_set_io_fd_own(notify_source, true);
if (r < 0)
return log_error_errno(r, "Event loop failed to take ownership of notification source: %m");
TAKE_FD(fd);
/* Process events until the helper quits */
return sd_event_loop(event);
}
int transfer_acquire_instance(Transfer *t, Instance *i) {
int transfer_acquire_instance(Transfer *t, Instance *i, TransferProgress cb, void *userdata) {
_cleanup_free_ char *formatted_pattern = NULL, *digest = NULL;
char offset[DECIMAL_STR_MAX(uint64_t)+1], max_size[DECIMAL_STR_MAX(uint64_t)+1];
const char *where = NULL;
@@ -819,6 +1030,7 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
assert(i);
assert(i->resource);
assert(t == container_of(i->resource, Transfer, source));
assert(cb);
/* Does this instance already exist in the target? Then we don't need to acquire anything */
existing = resource_find_instance(&t->target, i->metadata.version);
@@ -914,7 +1126,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
"--direct", /* just copy/unpack the specified file, don't do anything else */
arg_sync ? "--sync=yes" : "--sync=no",
i->path,
t->temporary_path));
t->temporary_path),
t, i, cb, userdata);
break;
case RESOURCE_PARTITION:
@@ -930,7 +1143,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
"--size-max", max_size,
arg_sync ? "--sync=yes" : "--sync=no",
i->path,
t->target.path));
t->target.path),
t, i, cb, userdata);
break;
default:
@@ -953,7 +1167,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
arg_sync ? "--sync=yes" : "--sync=no",
t->target.type == RESOURCE_SUBVOLUME ? "--btrfs-subvol=yes" : "--btrfs-subvol=no",
i->path,
t->temporary_path));
t->temporary_path),
t, i, cb, userdata);
break;
case RESOURCE_TAR:
@@ -969,7 +1184,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
arg_sync ? "--sync=yes" : "--sync=no",
t->target.type == RESOURCE_SUBVOLUME ? "--btrfs-subvol=yes" : "--btrfs-subvol=no",
i->path,
t->temporary_path));
t->temporary_path),
t, i, cb, userdata);
break;
case RESOURCE_URL_FILE:
@@ -988,7 +1204,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
"--verify", digest, /* validate by explicit SHA256 sum */
arg_sync ? "--sync=yes" : "--sync=no",
i->path,
t->temporary_path));
t->temporary_path),
t, i, cb, userdata);
break;
case RESOURCE_PARTITION:
@@ -1005,7 +1222,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
"--size-max", max_size,
arg_sync ? "--sync=yes" : "--sync=no",
i->path,
t->target.path));
t->target.path),
t, i, cb, userdata);
break;
default:
@@ -1026,7 +1244,8 @@ int transfer_acquire_instance(Transfer *t, Instance *i) {
t->target.type == RESOURCE_SUBVOLUME ? "--btrfs-subvol=yes" : "--btrfs-subvol=no",
arg_sync ? "--sync=yes" : "--sync=no",
i->path,
t->temporary_path));
t->temporary_path),
t, i, cb, userdata);
break;
default:

View File

@@ -12,6 +12,7 @@ typedef struct Transfer Transfer;
#include "sysupdate-partition.h"
#include "sysupdate-resource.h"
#include "sysupdate.h"
struct Transfer {
char *definition_path;
@@ -44,9 +45,13 @@ struct Transfer {
/* If we write to a partition in a partition table, the metrics of it */
PartitionInfo partition_info;
PartitionChange partition_change;
Context *context;
};
Transfer *transfer_new(void);
typedef int (*TransferProgress)(const Transfer *t, const Instance *inst, unsigned percentage);
Transfer *transfer_new(Context *ctx);
Transfer *transfer_free(Transfer *t);
DEFINE_TRIVIAL_CLEANUP_FUNC(Transfer*, transfer_free);
@@ -57,6 +62,6 @@ int transfer_resolve_paths(Transfer *t, const char *root, const char *node);
int transfer_vacuum(Transfer *t, uint64_t space, const char *extra_protected_version);
int transfer_acquire_instance(Transfer *t, Instance *i);
int transfer_acquire_instance(Transfer *t, Instance *i, TransferProgress cb, void *userdata);
int transfer_install_instance(Transfer *t, Instance *i, const char *root);

View File

@@ -25,6 +25,7 @@
#include "path-util.h"
#include "pretty-print.h"
#include "set.h"
#include "signal-util.h"
#include "sort-util.h"
#include "string-util.h"
#include "strv.h"
@@ -135,7 +136,7 @@ static int context_read_definitions(
if (!GREEDY_REALLOC(c->transfers, c->n_transfers + 1))
return log_oom();
t = transfer_new();
t = transfer_new(c);
if (!t)
return log_oom();
@@ -744,6 +745,30 @@ static int context_make_online(Context **ret, const char *node) {
return 0;
}
static int context_on_acquire_progress(const Transfer *t, const Instance *inst, unsigned percentage) {
const Context *c = ASSERT_PTR(t->context);
size_t i, n = c->n_transfers;
uint64_t base, scaled;
unsigned overall;
for (i = 0; i < n; i++)
if (c->transfers[i] == t)
break;
assert(i < n); /* We should have found the index */
base = (100 * 100 * i) / n;
scaled = (100 * percentage) / n;
overall = (unsigned) ((base + scaled) / 100);
assert(overall <= 100);
log_debug("Transfer %" PRIu64 "/%zu is %u%% complete (%u%% overall).", i+1, n, percentage, overall);
return sd_notifyf(/* unset= */ false, "X_SYSUPDATE_PROGRESS=%u\n"
"X_SYSUPDATE_TRANSFERS_LEFT=%zu\n"
"X_SYSUPDATE_TRANSFERS_DONE=%zu\n"
"STATUS=Updating to '%s' (%u%% complete).",
overall, n - i, i, inst->metadata.version, overall);
}
static int context_apply(
Context *c,
const char *version,
@@ -793,8 +818,10 @@ static int context_apply(
log_info("Selected update '%s' for install.", us->version);
(void) sd_notifyf(false,
"STATUS=Making room for '%s'.", us->version);
(void) sd_notifyf(/* unset= */ false,
"READY=1\n"
"X_SYSUPDATE_VERSION=%s\n"
"STATUS=Making room for '%s'.", us->version, us->version);
/* Let's make some room. We make sure for each transfer we have one free space to fill. While
* removing stuff we'll protect the version we are trying to acquire. Why that? Maybe an earlier
@@ -809,14 +836,15 @@ static int context_apply(
if (arg_sync)
sync();
(void) sd_notifyf(false,
"STATUS=Updating to '%s'.\n", us->version);
(void) sd_notifyf(/* unset= */ false,
"STATUS=Updating to '%s'.", us->version);
/* There should now be one instance picked for each transfer, and the order is the same */
assert(us->n_instances == c->n_transfers);
for (size_t i = 0; i < c->n_transfers; i++) {
r = transfer_acquire_instance(c->transfers[i], us->instances[i]);
r = transfer_acquire_instance(c->transfers[i], us->instances[i],
context_on_acquire_progress, c);
if (r < 0)
return r;
}
@@ -824,6 +852,9 @@ static int context_apply(
if (arg_sync)
sync();
(void) sd_notifyf(/* unset= */ false,
"STATUS=Installing '%s'.", us->version);
for (size_t i = 0; i < c->n_transfers; i++) {
r = transfer_install_instance(c->transfers[i], us->instances[i], arg_root);
if (r < 0)
@@ -1412,6 +1443,9 @@ static int run(int argc, char *argv[]) {
if (r <= 0)
return r;
/* SIGCHLD signal must be blocked for sd_event_add_child to work */
BLOCK_SIGNALS(SIGCHLD);
return sysupdate_main(argc, argv);
}

View File

@@ -4,6 +4,9 @@
#include <inttypes.h>
#include <stdbool.h>
/* Forward declare this type so that Transfers can point at it */
typedef struct Context Context;
extern bool arg_sync;
extern uint64_t arg_instances_max;
extern char *arg_root;