diff --git a/man/journal-remote.conf.xml b/man/journal-remote.conf.xml index 01dc94de55..34ef21a326 100644 --- a/man/journal-remote.conf.xml +++ b/man/journal-remote.conf.xml @@ -58,6 +58,19 @@ [Remote] section: + + Compression= + + Acceptable compression algorithms to be used by systemd-journal-upload. Compression algorithms are + used for Accept-Encoding header contruction with priorities set according to an order in configuration. + This parameter takes space separated list of compression algorithms. Example: + Compression=zstd lz4 + This option can be specified multiple times. If an empty string is assigned, then all the previous assignments are cleared. + + + + + Seal= diff --git a/man/journal-upload.conf.xml b/man/journal-upload.conf.xml index 7792617e51..4785ef1ab3 100644 --- a/man/journal-upload.conf.xml +++ b/man/journal-upload.conf.xml @@ -60,6 +60,37 @@ + + Compression= + + Takes a space separated list of compression algorithms to be applied to logs data before sending. + Supported algorithms are none, zstd, xz, + or lz4. Optionally, each algorithm (except for none) + followed by a colon (:) and its compression level, for example zstd:4. + The compression level is expected to be a positive integer. This option can be specified multiple times. + If an empty string is assigned, then all previous assignments are cleared. + Defaults to unset, and data will not be compressed. + + Example: + Compression=zstd:4 lz4:2 + + Even when compression is enabled, the initial requests are sent without compression. + It becomes effective either if ForceCompression= is enabled, + or the server response contains Accept-Encoding headers with a list of + compression algorithms that contains one of the algorithms specified in this option. + + + + + + ForceCompression= + + Takes a boolean value, enforces using compression without content encoding negotiation. + Defaults to false. + + + + ServerKeyFile= diff --git a/src/basic/compress.c b/src/basic/compress.c index 61e87addde..1d16f86038 100644 --- a/src/basic/compress.c +++ b/src/basic/compress.c @@ -8,6 +8,10 @@ #include #include +#if HAVE_LZ4 +#include +#endif + #if HAVE_XZ #include #endif @@ -43,6 +47,7 @@ static DLSYM_PROTOTYPE(LZ4F_freeCompressionContext) = NULL; static DLSYM_PROTOTYPE(LZ4F_freeDecompressionContext) = NULL; static DLSYM_PROTOTYPE(LZ4F_isError) = NULL; DLSYM_PROTOTYPE(LZ4_compress_default) = NULL; +DLSYM_PROTOTYPE(LZ4_compress_HC) = NULL; DLSYM_PROTOTYPE(LZ4_decompress_safe) = NULL; DLSYM_PROTOTYPE(LZ4_decompress_safe_partial) = NULL; DLSYM_PROTOTYPE(LZ4_versionNumber) = NULL; @@ -94,6 +99,7 @@ static DLSYM_PROTOTYPE(lzma_easy_encoder) = NULL; static DLSYM_PROTOTYPE(lzma_end) = NULL; static DLSYM_PROTOTYPE(lzma_stream_buffer_encode) = NULL; static DLSYM_PROTOTYPE(lzma_stream_decoder) = NULL; +static DLSYM_PROTOTYPE(lzma_lzma_preset) = NULL; /* We can't just do _cleanup_(sym_lzma_end) because a compiler bug makes * this fail with: @@ -116,7 +122,15 @@ static const char* const compression_table[_COMPRESSION_MAX] = { [COMPRESSION_ZSTD] = "ZSTD", }; +static const char* const compression_lowercase_table[_COMPRESSION_MAX] = { + [COMPRESSION_NONE] = "none", + [COMPRESSION_XZ] = "xz", + [COMPRESSION_LZ4] = "lz4", + [COMPRESSION_ZSTD] = "zstd", +}; + DEFINE_STRING_TABLE_LOOKUP(compression, Compression); +DEFINE_STRING_TABLE_LOOKUP(compression_lowercase, Compression); bool compression_supported(Compression c) { static const unsigned supported = @@ -145,12 +159,13 @@ int dlopen_lzma(void) { DLSYM_ARG(lzma_easy_encoder), DLSYM_ARG(lzma_end), DLSYM_ARG(lzma_stream_buffer_encode), + DLSYM_ARG(lzma_lzma_preset), DLSYM_ARG(lzma_stream_decoder)); } #endif int compress_blob_xz(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { assert(src); assert(src_size > 0); @@ -159,12 +174,12 @@ int compress_blob_xz(const void *src, uint64_t src_size, assert(dst_size); #if HAVE_XZ - static const lzma_options_lzma opt = { + lzma_options_lzma opt = { 1u << 20u, NULL, 0, LZMA_LC_DEFAULT, LZMA_LP_DEFAULT, LZMA_PB_DEFAULT, LZMA_MODE_FAST, 128, LZMA_MF_HC3, 4 }; - static const lzma_filter filters[] = { - { LZMA_FILTER_LZMA2, (lzma_options_lzma*) &opt }, + lzma_filter filters[] = { + { LZMA_FILTER_LZMA2, &opt }, { LZMA_VLI_UNKNOWN, NULL } }; lzma_ret ret; @@ -175,13 +190,19 @@ int compress_blob_xz(const void *src, uint64_t src_size, if (r < 0) return r; + if (level >= 0) { + r = sym_lzma_lzma_preset(&opt, (uint32_t) level); + if (r < 0) + return r; + } + /* Returns < 0 if we couldn't compress the data or the * compressed result is longer than the original */ if (src_size < 80) return -ENOBUFS; - ret = sym_lzma_stream_buffer_encode((lzma_filter*) filters, LZMA_CHECK_NONE, NULL, + ret = sym_lzma_stream_buffer_encode(filters, LZMA_CHECK_NONE, NULL, src, src_size, dst, &out_pos, dst_alloc_size); if (ret != LZMA_OK) return -ENOBUFS; @@ -214,6 +235,7 @@ int dlopen_lz4(void) { DLSYM_ARG(LZ4F_freeDecompressionContext), DLSYM_ARG(LZ4F_isError), DLSYM_ARG(LZ4_compress_default), + DLSYM_ARG(LZ4_compress_HC), DLSYM_ARG(LZ4_decompress_safe), DLSYM_ARG(LZ4_decompress_safe_partial), DLSYM_ARG(LZ4_versionNumber)); @@ -221,7 +243,7 @@ int dlopen_lz4(void) { #endif int compress_blob_lz4(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { assert(src); assert(src_size > 0); @@ -241,7 +263,10 @@ int compress_blob_lz4(const void *src, uint64_t src_size, if (src_size < 9) return -ENOBUFS; - r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8); + if (level <= 0) + r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8); + else + r = sym_LZ4_compress_HC(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8, level); if (r <= 0) return -ENOBUFS; @@ -285,7 +310,7 @@ int dlopen_zstd(void) { int compress_blob_zstd( const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { assert(src); assert(src_size > 0); @@ -301,7 +326,7 @@ int compress_blob_zstd( if (r < 0) return r; - k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, 0); + k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, level < 0 ? 0 : level); if (sym_ZSTD_isError(k)) return zstd_ret_to_errno(k); diff --git a/src/basic/compress.h b/src/basic/compress.h index 1ad87ee878..fd265bb026 100644 --- a/src/basic/compress.h +++ b/src/basic/compress.h @@ -24,15 +24,17 @@ typedef enum Compression { const char* compression_to_string(Compression compression); Compression compression_from_string(const char *compression); +const char* compression_lowercase_to_string(Compression compression); +Compression compression_lowercase_from_string(const char *compression); bool compression_supported(Compression c); int compress_blob_xz(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); int compress_blob_lz4(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); int compress_blob_zstd(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); int decompress_blob_xz(const void *src, uint64_t src_size, void **dst, size_t* dst_size, size_t dst_max); @@ -90,15 +92,15 @@ int dlopen_lzma(void); static inline int compress_blob( Compression compression, const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { switch (compression) { case COMPRESSION_ZSTD: - return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size); + return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size, level); case COMPRESSION_LZ4: - return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size); + return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size, level); case COMPRESSION_XZ: - return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size); + return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size, level); default: return -EOPNOTSUPP; } diff --git a/src/fuzz/fuzz-compress.c b/src/fuzz/fuzz-compress.c index c3f68f62dd..6fcad736b1 100644 --- a/src/fuzz/fuzz-compress.c +++ b/src/fuzz/fuzz-compress.c @@ -42,7 +42,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { } size_t csize; - r = compress_blob(alg, h->data, data_len, buf, size, &csize); + r = compress_blob(alg, h->data, data_len, buf, size, &csize, /* level = */ -1); if (r < 0) { log_error_errno(r, "Compression failed: %m"); return 0; diff --git a/src/journal-remote/journal-compression-util.c b/src/journal-remote/journal-compression-util.c new file mode 100644 index 0000000000..4def84b2bd --- /dev/null +++ b/src/journal-remote/journal-compression-util.c @@ -0,0 +1,91 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include "extract-word.h" +#include "journal-compression-util.h" +#include "parse-util.h" + +void compression_args_clear(CompressionArgs *args) { + assert(args); + args->size = 0; + args->opts = mfree(args->opts); +} + +int config_parse_compression( + const char *unit, + const char *filename, + unsigned line, + const char *section, + unsigned section_line, + const char *lvalue, + int ltype, + const char *rvalue, + void *data, + void *userdata) { + + CompressionArgs *args = ASSERT_PTR(data); + bool parse_level = ltype; + int r; + + assert(filename); + assert(lvalue); + assert(rvalue); + + if (isempty(rvalue)) { + compression_args_clear(args); + return 1; + } + + for (const char *p = rvalue;;) { + _cleanup_free_ char *algorithm = NULL, *word = NULL; + int level = -1; + + r = extract_first_word(&p, &word, NULL, 0); + if (r < 0) + return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue); + if (r == 0) + return 1; + + if (parse_level) { + const char *q = word; + r = extract_first_word(&q, &algorithm, ":", 0); + if (r < 0) + return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue); + if (!isempty(q)) { + r = safe_atoi(q, &level); + if (r < 0) { + log_syntax(unit, LOG_WARNING, filename, line, r, + "Compression level %s should be positive, ignoring.", q); + continue; + } + } + } else + algorithm = TAKE_PTR(word); + + Compression c = compression_lowercase_from_string(algorithm); + if (c < 0 || !compression_supported(c)) { + log_syntax(unit, LOG_WARNING, filename, line, c, + "Compression=%s is not supported on a system, ignoring.", algorithm); + continue; + } + + bool found = false; + FOREACH_ARRAY(opt, args->opts, args->size) + if (opt->algorithm == c) { + found = true; + if (parse_level) + opt->level = level; + break; + } + + if (found) + continue; + + if (!GREEDY_REALLOC(args->opts, args->size + 1)) + return log_oom(); + + args->opts[args->size++] = (CompressionOpts) { + .algorithm = c, + .level = level, + }; + } +} diff --git a/src/journal-remote/journal-compression-util.h b/src/journal-remote/journal-compression-util.h new file mode 100644 index 0000000000..785ede6b11 --- /dev/null +++ b/src/journal-remote/journal-compression-util.h @@ -0,0 +1,19 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include "compress.h" +#include "conf-parser.h" + +typedef struct CompressionOpts { + Compression algorithm; + int level; +} CompressionOpts; + +typedef struct CompressionArgs { + CompressionOpts *opts; + size_t size; +} CompressionArgs; + +CONFIG_PARSER_PROTOTYPE(config_parse_compression); + +void compression_args_clear(CompressionArgs *args); diff --git a/src/journal-remote/journal-remote-main.c b/src/journal-remote/journal-remote-main.c index c48b7df201..3c780b8f76 100644 --- a/src/journal-remote/journal-remote-main.c +++ b/src/journal-remote/journal-remote-main.c @@ -11,6 +11,7 @@ #include "daemon-util.h" #include "fd-util.h" #include "fileio.h" +#include "journal-compression-util.h" #include "journal-remote-write.h" #include "journal-remote.h" #include "logs-show.h" @@ -37,6 +38,7 @@ static const char *arg_getter = NULL; static const char *arg_listen_raw = NULL; static const char *arg_listen_http = NULL; static const char *arg_listen_https = NULL; +static CompressionArgs arg_compression = {}; static char **arg_files = NULL; /* Do not free this. */ static bool arg_compress = true; static bool arg_seal = false; @@ -65,6 +67,7 @@ STATIC_DESTRUCTOR_REGISTER(arg_key, freep); STATIC_DESTRUCTOR_REGISTER(arg_cert, freep); STATIC_DESTRUCTOR_REGISTER(arg_trust, freep); STATIC_DESTRUCTOR_REGISTER(arg_output, freep); +STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear); static const char* const journal_write_split_mode_table[_JOURNAL_WRITE_SPLIT_MAX] = { [JOURNAL_WRITE_SPLIT_NONE] = "none", @@ -152,6 +155,22 @@ static int dispatch_http_event(sd_event_source *event, uint32_t revents, void *userdata); +static int build_accept_encoding(char **ret) { + assert(ret); + + float q = 1.0, step = 1.0 / arg_compression.size; + _cleanup_free_ char *buf = NULL; + FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) { + const char *c = compression_lowercase_to_string(opt->algorithm); + if (strextendf_with_separator(&buf, ",", "%s;q=%.1f", c, q) < 0) + return -ENOMEM; + q -= step; + } + + *ret = TAKE_PTR(buf); + return 0; +} + static int request_meta(void **connection_cls, int fd, char *hostname) { RemoteSource *source; Writer *writer; @@ -174,6 +193,11 @@ static int request_meta(void **connection_cls, int fd, char *hostname) { log_debug("Added RemoteSource as connection metadata %p", source); + r = build_accept_encoding(&source->encoding); + if (r < 0) + return log_oom(); + + source->compression = COMPRESSION_NONE; *connection_cls = source; return 0; } @@ -212,8 +236,17 @@ static int process_http_upload( if (*upload_data_size) { log_trace("Received %zu bytes", *upload_data_size); - r = journal_importer_push_data(&source->importer, - upload_data, *upload_data_size); + if (source->compression != COMPRESSION_NONE) { + _cleanup_free_ char *buf = NULL; + size_t buf_size; + + r = decompress_blob(source->compression, upload_data, *upload_data_size, (void **) &buf, &buf_size, 0); + if (r < 0) + return mhd_respondf(connection, r, MHD_HTTP_BAD_REQUEST, "Decompression of received blob falied."); + + r = journal_importer_push_data(&source->importer, buf, buf_size); + } else + r = journal_importer_push_data(&source->importer, upload_data, *upload_data_size); if (r < 0) return mhd_respond_oom(connection); @@ -253,7 +286,7 @@ static int process_http_upload( remaining); } - return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK."); + return mhd_respond_with_encoding(connection, MHD_HTTP_ACCEPTED, source->encoding, "OK."); }; static mhd_result request_handler( @@ -278,10 +311,20 @@ static mhd_result request_handler( log_trace("Handling a connection %s %s %s", method, url, version); - if (*connection_cls) + if (*connection_cls) { + RemoteSource *source = *connection_cls; + header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Content-Encoding"); + if (header) { + Compression c = compression_lowercase_from_string(header); + if (c < 0 || !compression_supported(c)) + return mhd_respondf(connection, 0, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE, + "Unsupported Content-Encoding type: %s", header); + source->compression = c; + } return process_http_upload(connection, upload_data, upload_data_size, - *connection_cls); + source); + } if (!streq(method, "POST")) return mhd_respond(connection, MHD_HTTP_NOT_ACCEPTABLE, "Unsupported method."); @@ -722,6 +765,7 @@ static int parse_config(void) { { "Remote", "MaxFileSize", config_parse_iec_uint64, 0, &arg_max_size }, { "Remote", "MaxFiles", config_parse_uint64, 0, &arg_n_max_files }, { "Remote", "KeepFree", config_parse_iec_uint64, 0, &arg_keep_free }, + { "Remote", "Compression", config_parse_compression, 0, &arg_compression }, {} }; diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c index e23012c472..d743b217b0 100644 --- a/src/journal-remote/journal-remote-parse.c +++ b/src/journal-remote/journal-remote-parse.c @@ -18,6 +18,7 @@ void source_free(RemoteSource *source) { sd_event_source_unref(source->event); sd_event_source_unref(source->buffer_event); + free(source->encoding); free(source); } diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h index 703035b1ec..89d30b8721 100644 --- a/src/journal-remote/journal-remote-parse.h +++ b/src/journal-remote/journal-remote-parse.h @@ -3,6 +3,7 @@ #include "sd-event.h" +#include "compress.h" #include "journal-importer.h" #include "journal-remote-write.h" @@ -13,6 +14,8 @@ typedef struct RemoteSource { sd_event_source *event; sd_event_source *buffer_event; + Compression compression; + char *encoding; } RemoteSource; RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer); diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c index 23ad3b2d31..ecb323b217 100644 --- a/src/journal-remote/journal-upload-journal.c +++ b/src/journal-remote/journal-upload-journal.c @@ -251,6 +251,7 @@ static void check_update_watchdog(Uploader *u) { static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { Uploader *u = ASSERT_PTR(userp); + _cleanup_free_ char *compression_buffer = NULL; int r; sd_journal *j; size_t filled = 0; @@ -262,6 +263,14 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void j = u->journal; + if (u->compression.algorithm != COMPRESSION_NONE) { + compression_buffer = malloc_multiply(nmemb, size); + if (!compression_buffer) { + log_oom(); + return CURL_READFUNC_ABORT; + } + } + while (j && filled < size * nmemb) { if (u->entry_state == ENTRY_DONE) { r = sd_journal_next(j); @@ -284,7 +293,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void u->entry_state = ENTRY_CURSOR; } - w = write_entry((char*)buf + filled, size * nmemb - filled, u); + w = write_entry((compression_buffer ?: (char*) buf) + filled, size * nmemb - filled, u); if (w < 0) return CURL_READFUNC_ABORT; filled += w; @@ -300,6 +309,19 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void u->entries_sent, u->current_cursor); } + if (filled > 0 && u->compression.algorithm != COMPRESSION_NONE) { + size_t compressed_size; + r = compress_blob(u->compression.algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression.level); + if (r < 0) { + log_error_errno(r, "Failed to compress %zu bytes (Compression=%s, Level=%d): %m", + filled, compression_lowercase_to_string(u->compression.algorithm), u->compression.level); + return CURL_READFUNC_ABORT; + } + + assert(compressed_size <= size * nmemb); + return compressed_size; + } + return filled; } diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c index c702b00806..ca891a4e00 100644 --- a/src/journal-remote/journal-upload.c +++ b/src/journal-remote/journal-upload.c @@ -58,8 +58,11 @@ static bool arg_merge = false; static int arg_follow = -1; static const char *arg_save_state = NULL; static usec_t arg_network_timeout_usec = USEC_INFINITY; +static CompressionArgs arg_compression = {}; +static bool arg_force_compression = false; STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep); +STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear); static void close_fd_input(Uploader *u); @@ -203,6 +206,17 @@ int start_upload(Uploader *u, return log_oom(); h = l; + if (u->compression.algorithm != COMPRESSION_NONE) { + _cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm)); + if (!header) + return log_oom(); + + l = curl_slist_append(h, header); + if (!l) + return log_oom(); + h = l; + } + u->header = TAKE_PTR(h); } @@ -292,8 +306,10 @@ int start_upload(Uploader *u, } static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + _cleanup_free_ char *compression_buffer = NULL; Uploader *u = ASSERT_PTR(userp); ssize_t n; + int r; assert(nmemb < SSIZE_MAX / size); @@ -302,17 +318,35 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user assert(!size_multiply_overflow(size, nmemb)); - n = read(u->input, buf, size * nmemb); - log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, n); - if (n > 0) - return n; + if (u->compression.algorithm != COMPRESSION_NONE) { + compression_buffer = malloc_multiply(nmemb, size); + if (!compression_buffer) { + log_oom(); + return CURL_READFUNC_ABORT; + } + } - u->uploading = false; - if (n < 0) { + n = read(u->input, compression_buffer ?: buf, size * nmemb); + if (n > 0) { + log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n); + if (u->compression.algorithm == COMPRESSION_NONE) + return n; + + size_t compressed_size; + r = compress_blob(u->compression.algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression.level); + if (r < 0) { + log_error_errno(r, "Failed to compress %zd bytes using (Compression=%s, Level=%d): %m", + n, compression_lowercase_to_string(u->compression.algorithm), u->compression.level); + return CURL_READFUNC_ABORT; + } + assert(compressed_size <= size * nmemb); + return compressed_size; + } else if (n < 0) { log_error_errno(errno, "Aborting transfer after read error on input: %m."); return CURL_READFUNC_ABORT; } + u->uploading = false; log_debug("Reached EOF"); close_fd_input(u); return 0; @@ -389,8 +423,13 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file) *u = (Uploader) { .input = -1, + .compression.algorithm = COMPRESSION_NONE, + .compression.level = -1, }; + if (arg_force_compression && arg_compression.size > 0) + u->compression = arg_compression.opts[0]; + host = STARTSWITH_SET(url, "http://", "https://"); if (!host) { host = url; @@ -448,6 +487,66 @@ static void destroy_uploader(Uploader *u) { sd_event_unref(u->event); } +#if LIBCURL_VERSION_NUM >= 0x075300 +static int update_content_encoding(Uploader *u, const char *accept_encoding) { + int r; + + assert(u); + + for (const char *p = accept_encoding;;) { + _cleanup_free_ char *encoding_value = NULL, *alg = NULL; + Compression algorithm; + CURLcode code; + + r = extract_first_word(&p, &encoding_value, ",", 0); + if (r < 0) + return log_error_errno(r, "Failed to extract Accept-Encoding header value: %m"); + if (r == 0) + return 0; + + const char *q = encoding_value; + r = extract_first_word(&q, &alg, ";", 0); + if (r < 0) + return log_error_errno(r, "Failed to extract compression algorithm from Accept-Encoding header: %m"); + + algorithm = compression_lowercase_from_string(alg); + if (algorithm <= 0 || !compression_supported(algorithm)) { + continue; + } + + FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) { + if (opt->algorithm != algorithm) + continue; + + _cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm)); + if (!header) + return log_oom(); + + /* First, update existing Content-Encoding header. */ + bool found = false; + for (struct curl_slist *l = u->header; l; l = l->next) + if (startswith(l->data, "Content-Encoding:")) { + free_and_replace(l->data, header); + found = true; + break; + } + + /* If Content-Encoding header is not found, append new one. */ + if (!found) { + struct curl_slist *l = curl_slist_append(u->header, header); + if (!l) + return log_oom(); + u->header = l; + } + + easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL); + u->compression = *opt; + return 0; + } + } +} +#endif + static int perform_upload(Uploader *u) { CURLcode code; long status; @@ -480,9 +579,25 @@ static int perform_upload(Uploader *u) { return log_error_errno(SYNTHETIC_ERRNO(EIO), "Upload to %s finished with unexpected code %ld: %s", u->url, status, strna(u->answer)); - else + else { +#if LIBCURL_VERSION_NUM >= 0x075300 + int r; + if (u->compression.algorithm == COMPRESSION_NONE) { + struct curl_header *encoding_header; + CURLHcode hcode; + + hcode = curl_easy_header(u->easy, "Accept-Encoding", 0, CURLH_HEADER, -1, &encoding_header); + if (hcode == CURLHE_OK && encoding_header && encoding_header->value) { + r = update_content_encoding(u, encoding_header->value); + if (r < 0) + return r; + } + } +#endif + log_debug("Upload finished successfully with code %ld: %s", status, strna(u->answer)); + } free_and_replace(u->last_cursor, u->current_cursor); @@ -496,6 +611,8 @@ static int parse_config(void) { { "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert }, { "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust }, { "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec }, + { "Upload", "Compression", config_parse_compression, true, &arg_compression }, + { "Upload", "ForceCompression", config_parse_bool, 0, &arg_force_compression }, {} }; diff --git a/src/journal-remote/journal-upload.h b/src/journal-remote/journal-upload.h index fe6abb75a9..95f79a2638 100644 --- a/src/journal-remote/journal-upload.h +++ b/src/journal-remote/journal-upload.h @@ -7,6 +7,7 @@ #include "sd-event.h" #include "sd-journal.h" +#include "journal-compression-util.h" #include "time-util.h" typedef enum { @@ -53,6 +54,7 @@ typedef struct Uploader { char *last_cursor, *current_cursor; usec_t watchdog_timestamp; usec_t watchdog_usec; + CompressionOpts compression; } Uploader; #define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC) diff --git a/src/journal-remote/meson.build b/src/journal-remote/meson.build index 10a82751d7..0f3a91a621 100644 --- a/src/journal-remote/meson.build +++ b/src/journal-remote/meson.build @@ -1,11 +1,13 @@ # SPDX-License-Identifier: LGPL-2.1-or-later systemd_journal_upload_sources = files( + 'journal-compression-util.c', 'journal-upload-journal.c', 'journal-upload.c', ) libsystemd_journal_remote_sources = files( + 'journal-compression-util.c', 'journal-remote-parse.c', 'journal-remote-write.c', 'journal-remote.c', diff --git a/src/journal-remote/microhttpd-util.c b/src/journal-remote/microhttpd-util.c index 0d5997a03f..9f68cc65ca 100644 --- a/src/journal-remote/microhttpd-util.c +++ b/src/journal-remote/microhttpd-util.c @@ -28,6 +28,7 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) { int mhd_respond_internal( struct MHD_Connection *connection, enum MHD_RequestTerminationCode code, + const char *encoding, const char *buffer, size_t size, enum MHD_ResponseMemoryMode mode) { @@ -40,6 +41,10 @@ int mhd_respond_internal( return MHD_NO; log_debug("Queueing response %u: %s", code, buffer); + if (encoding) + if (MHD_add_response_header(response, "Accept-Encoding", encoding) == MHD_NO) + return MHD_NO; + if (MHD_add_response_header(response, "Content-Type", "text/plain") == MHD_NO) return MHD_NO; return MHD_queue_response(connection, code, response); @@ -53,6 +58,7 @@ int mhd_respondf_internal( struct MHD_Connection *connection, int error, enum MHD_RequestTerminationCode code, + const char *encoding, const char *format, ...) { char *m; @@ -72,7 +78,7 @@ int mhd_respondf_internal( if (r < 0) return respond_oom(connection); - return mhd_respond_internal(connection, code, m, r, MHD_RESPMEM_MUST_FREE); + return mhd_respond_internal(connection, code, encoding, m, r, MHD_RESPMEM_MUST_FREE); } #if HAVE_GNUTLS diff --git a/src/journal-remote/microhttpd-util.h b/src/journal-remote/microhttpd-util.h index 309c39aab0..bb888e8aed 100644 --- a/src/journal-remote/microhttpd-util.h +++ b/src/journal-remote/microhttpd-util.h @@ -65,29 +65,34 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) _printf_(2, 0); int mhd_respond_internal( struct MHD_Connection *connection, enum MHD_RequestTerminationCode code, + const char *encoding, const char *buffer, size_t size, enum MHD_ResponseMemoryMode mode); -#define mhd_respond(connection, code, message) \ - mhd_respond_internal( \ - connection, code, \ - message "\n", \ - strlen(message) + 1, \ +#define mhd_respond_with_encoding(connection, code, encoding, message) \ + mhd_respond_internal( \ + (connection), (code), (encoding), \ + message "\n", \ + strlen(message) + 1, \ MHD_RESPMEM_PERSISTENT) +#define mhd_respond(connection, code, message) \ + mhd_respond_with_encoding(connection, code, NULL, message) \ + int mhd_respond_oom(struct MHD_Connection *connection); int mhd_respondf_internal( struct MHD_Connection *connection, int error, enum MHD_RequestTerminationCode code, - const char *format, ...) _printf_(4,5); + const char *encoding, + const char *format, ...) _printf_(5,6); -#define mhd_respondf(connection, error, code, format, ...) \ - mhd_respondf_internal( \ - connection, error, code, \ - format "\n", \ +#define mhd_respondf(connection, error, code, format, ...) \ + mhd_respondf_internal( \ + connection, error, code, NULL, \ + format "\n", \ ##__VA_ARGS__) int check_permissions(struct MHD_Connection *connection, int *code, char **hostname); diff --git a/src/libsystemd/sd-journal/journal-file.c b/src/libsystemd/sd-journal/journal-file.c index 508b47cbe7..97f65c561c 100644 --- a/src/libsystemd/sd-journal/journal-file.c +++ b/src/libsystemd/sd-journal/journal-file.c @@ -1810,7 +1810,7 @@ static int maybe_compress_payload(JournalFile *f, uint8_t *dst, const uint8_t *s if (c == COMPRESSION_NONE || size < f->compress_threshold_bytes) return 0; - r = compress_blob(c, src, size, dst, size - 1, rsize); + r = compress_blob(c, src, size, dst, size - 1, rsize, /* level = */ -1); if (r < 0) return log_debug_errno(r, "Failed to compress data object using %s, ignoring: %m", compression_to_string(c)); diff --git a/src/test/test-compress-benchmark.c b/src/test/test-compress-benchmark.c index 1727db8134..c4e19f9256 100644 --- a/src/test/test-compress-benchmark.c +++ b/src/test/test-compress-benchmark.c @@ -13,7 +13,7 @@ #include "tests.h" typedef int (compress_t)(const void *src, uint64_t src_size, void *dst, - size_t dst_alloc_size, size_t *dst_size); + size_t dst_alloc_size, size_t *dst_size, int level); typedef int (decompress_t)(const void *src, uint64_t src_size, void **dst, size_t* dst_size, size_t dst_max); @@ -100,7 +100,7 @@ static void test_compress_decompress(const char* label, const char* type, memzero(buf, MIN(size + 1000, MAX_SIZE)); - r = compress(text, size, buf, size, &j); + r = compress(text, size, buf, size, &j, /* level = */ -1); /* assume compression must be successful except for small or random inputs */ assert_se(r >= 0 || (size < 2048 && r == -ENOBUFS) || streq(type, "random")); diff --git a/src/test/test-compress.c b/src/test/test-compress.c index 86311c6217..9688e10df4 100644 --- a/src/test/test-compress.c +++ b/src/test/test-compress.c @@ -33,7 +33,7 @@ #define HUGE_SIZE (4096*1024) typedef int (compress_blob_t)(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); typedef int (decompress_blob_t)(const void *src, uint64_t src_size, void **dst, size_t* dst_size, size_t dst_max); @@ -62,7 +62,7 @@ _unused_ static void test_compress_decompress( log_info("/* testing %s %s blob compression/decompression */", compression, data); - r = compress(data, data_len, compressed, sizeof(compressed), &csize); + r = compress(data, data_len, compressed, sizeof(compressed), &csize, /* level = */ -1); if (r == -ENOBUFS) { log_info_errno(r, "compression failed: %m"); assert_se(may_fail); @@ -111,14 +111,14 @@ _unused_ static void test_decompress_startswith(const char *compression, compressed = compressed1 = malloc(BUFSIZE_1); assert_se(compressed1); - r = compress(data, data_len, compressed, BUFSIZE_1, &csize); + r = compress(data, data_len, compressed, BUFSIZE_1, &csize, /* level = */ -1); if (r == -ENOBUFS) { log_info_errno(r, "compression failed: %m"); assert_se(may_fail); compressed = compressed2 = malloc(BUFSIZE_2); assert_se(compressed2); - r = compress(data, data_len, compressed, BUFSIZE_2, &csize); + r = compress(data, data_len, compressed, BUFSIZE_2, &csize, /* level = */ -1); } assert_se(r >= 0); @@ -150,7 +150,7 @@ _unused_ static void test_decompress_startswith_short(const char *compression, log_info("/* %s with %s */", __func__, compression); - r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize); + r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize, /* level = */ -1); assert_se(r >= 0); for (size_t i = 1; i < strlen(TEXT); i++) { diff --git a/test/units/TEST-04-JOURNAL.journal-remote.sh b/test/units/TEST-04-JOURNAL.journal-remote.sh index c7b99b11fb..df39a50b04 100755 --- a/test/units/TEST-04-JOURNAL.journal-remote.sh +++ b/test/units/TEST-04-JOURNAL.journal-remote.sh @@ -97,7 +97,7 @@ rm -rf /var/log/journal/remote/* echo "$TEST_MESSAGE" | systemd-cat -t "$TEST_TAG" journalctl --sync -mkdir /run/systemd/remote-pki +mkdir -p /run/systemd/remote-pki cat >/run/systemd/remote-pki/ca.conf </run/systemd/journal-remote.conf.d/99-test.conf </run/systemd/journal-upload.conf.d/99-test.conf <