/* * Copyright (C) 2018 Red Hat Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA * 02111-1307, USA. * */ #include "config.h" #include "grd-vnc-pipewire-stream.h" #include #include #include #include #include #include "grd-context.h" #include "grd-egl-thread.h" #include "grd-pipewire-utils.h" #include "grd-utils.h" #include "grd-vnc-cursor.h" #define MAX_FORMAT_PARAMS 2 enum { CLOSED, N_SIGNALS }; static guint signals[N_SIGNALS]; typedef struct _GrdVncFrame GrdVncFrame; typedef void (* GrdVncFrameReadyCallback) (GrdVncPipeWireStream *stream, GrdVncFrame *frame, gboolean success, gpointer user_data); typedef struct { GMutex buffer_mutex; gboolean is_locked; } BufferContext; struct _GrdVncFrame { gatomicrefcount refcount; void *data; GrdVncPipeWireStream *stream; GrdVncFrameReadyCallback callback; gpointer callback_user_data; }; typedef struct { rfbCursorPtr rfb_cursor; gboolean cursor_moved; int cursor_x; int cursor_y; } VncPointer; struct _GrdVncPipeWireStream { GObject parent; GrdSessionVnc *session; GrdEglThreadSlot egl_slot; GMutex dequeue_mutex; gboolean dequeuing_disallowed; GSource *pipewire_source; struct pw_context *pipewire_context; struct pw_core *pipewire_core; struct spa_hook pipewire_core_listener; GMutex frame_mutex; GrdVncFrame *pending_frame; GSource *pending_frame_source; GMutex pointer_mutex; VncPointer *pending_pointer; GSource *pending_pointer_source; struct pw_stream *pipewire_stream; struct spa_hook pipewire_stream_listener; GHashTable *pipewire_buffers; uint32_t src_node_id; struct spa_video_info_raw spa_format; }; G_DEFINE_TYPE (GrdVncPipeWireStream, grd_vnc_pipewire_stream, G_TYPE_OBJECT) static void grd_vnc_frame_unref (GrdVncFrame *frame); G_DEFINE_AUTOPTR_CLEANUP_FUNC (GrdVncFrame, grd_vnc_frame_unref) static BufferContext * buffer_context_new (void) { BufferContext *buffer_context; buffer_context = g_new0 (BufferContext, 1); g_mutex_init (&buffer_context->buffer_mutex); return buffer_context; } static void buffer_context_free (BufferContext *buffer_context) { g_mutex_clear (&buffer_context->buffer_mutex); g_free (buffer_context); } static void acquire_pipewire_buffer_lock (GrdVncPipeWireStream *stream, struct pw_buffer *buffer) { BufferContext *buffer_context = NULL; if (!g_hash_table_lookup_extended (stream->pipewire_buffers, buffer, NULL, (gpointer *) &buffer_context)) g_assert_not_reached (); g_mutex_lock (&buffer_context->buffer_mutex); g_assert (!buffer_context->is_locked); buffer_context->is_locked = TRUE; } static void maybe_release_pipewire_buffer_lock (GrdVncPipeWireStream *stream, struct pw_buffer *buffer) { BufferContext *buffer_context = NULL; if (!g_hash_table_lookup_extended (stream->pipewire_buffers, buffer, NULL, (gpointer *) &buffer_context)) g_assert_not_reached (); if (!buffer_context->is_locked) return; buffer_context->is_locked = FALSE; g_mutex_unlock (&buffer_context->buffer_mutex); } static void vnc_pointer_free (VncPointer *vnc_pointer) { g_clear_pointer (&vnc_pointer->rfb_cursor, rfbFreeCursor); g_free (vnc_pointer); } static void add_common_format_params (struct spa_pod_builder *pod_builder, enum spa_video_format spa_format, const GrdVncVirtualMonitor *virtual_monitor) { struct spa_rectangle min_rect; struct spa_rectangle max_rect; struct spa_fraction min_framerate; struct spa_fraction max_framerate; min_rect = SPA_RECTANGLE (1, 1); max_rect = SPA_RECTANGLE (INT32_MAX, INT32_MAX); min_framerate = SPA_FRACTION (1, 1); max_framerate = SPA_FRACTION (30, 1); spa_pod_builder_add (pod_builder, SPA_FORMAT_mediaType, SPA_POD_Id (SPA_MEDIA_TYPE_video), 0); spa_pod_builder_add (pod_builder, SPA_FORMAT_mediaSubtype, SPA_POD_Id (SPA_MEDIA_SUBTYPE_raw), 0); spa_pod_builder_add (pod_builder, SPA_FORMAT_VIDEO_format, SPA_POD_Id (spa_format), 0); if (virtual_monitor) { struct spa_rectangle virtual_monitor_rect; virtual_monitor_rect = SPA_RECTANGLE (virtual_monitor->width, virtual_monitor->height); spa_pod_builder_add (pod_builder, SPA_FORMAT_VIDEO_size, SPA_POD_Rectangle (&virtual_monitor_rect), 0); } else { spa_pod_builder_add (pod_builder, SPA_FORMAT_VIDEO_size, SPA_POD_CHOICE_RANGE_Rectangle (&min_rect, &min_rect, &max_rect), 0); } spa_pod_builder_add (pod_builder, SPA_FORMAT_VIDEO_framerate, SPA_POD_Fraction (&SPA_FRACTION (0, 1)), 0); spa_pod_builder_add (pod_builder, SPA_FORMAT_VIDEO_maxFramerate, SPA_POD_CHOICE_RANGE_Fraction (&max_framerate, &min_framerate, &max_framerate), 0); } static uint32_t add_format_params (GrdVncPipeWireStream *stream, const GrdVncVirtualMonitor *virtual_monitor, struct spa_pod_builder *pod_builder, const struct spa_pod **params, uint32_t n_available_params) { GrdSession *session = GRD_SESSION (stream->session); GrdContext *context = grd_session_get_context (session); GrdEglThread *egl_thread = grd_context_get_egl_thread (context); struct spa_pod_frame format_frame; enum spa_video_format spa_format = SPA_VIDEO_FORMAT_BGRx; gboolean need_fallback_format = FALSE; uint32_t n_params = 0; g_assert (n_available_params >= 2); spa_pod_builder_push_object (pod_builder, &format_frame, SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat); add_common_format_params (pod_builder, spa_format, virtual_monitor); if (egl_thread) { uint32_t drm_format; int n_modifiers; g_autofree uint64_t *modifiers = NULL; grd_get_spa_format_details (spa_format, &drm_format, NULL); if (grd_egl_thread_get_modifiers_for_format (egl_thread, drm_format, &n_modifiers, &modifiers)) { struct spa_pod_frame modifier_frame; int i; spa_pod_builder_prop (pod_builder, SPA_FORMAT_VIDEO_modifier, (SPA_POD_PROP_FLAG_MANDATORY | SPA_POD_PROP_FLAG_DONT_FIXATE)); spa_pod_builder_push_choice (pod_builder, &modifier_frame, SPA_CHOICE_Enum, 0); spa_pod_builder_long (pod_builder, modifiers[0]); for (i = 0; i < n_modifiers; i++) { uint64_t modifier = modifiers[i]; spa_pod_builder_long (pod_builder, modifier); } spa_pod_builder_long (pod_builder, DRM_FORMAT_MOD_INVALID); spa_pod_builder_pop (pod_builder, &modifier_frame); need_fallback_format = TRUE; } } params[n_params++] = spa_pod_builder_pop (pod_builder, &format_frame); if (need_fallback_format) { spa_pod_builder_push_object (pod_builder, &format_frame, SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat); add_common_format_params (pod_builder, spa_format, virtual_monitor); params[n_params++] = spa_pod_builder_pop (pod_builder, &format_frame); } return n_params; } void grd_vnc_pipewire_stream_resize (GrdVncPipeWireStream *stream, GrdVncVirtualMonitor *virtual_monitor) { uint8_t params_buffer[1024]; struct spa_pod_builder pod_builder; const struct spa_pod *params[MAX_FORMAT_PARAMS] = {}; uint32_t n_params = 0; pod_builder = SPA_POD_BUILDER_INIT (params_buffer, sizeof (params_buffer)); n_params += add_format_params (stream, virtual_monitor, &pod_builder, params, MAX_FORMAT_PARAMS); g_assert (n_params > 0); pw_stream_update_params (stream->pipewire_stream, params, n_params); } static void on_stream_state_changed (void *user_data, enum pw_stream_state old, enum pw_stream_state state, const char *error) { g_debug ("Pipewire stream state changed from %s to %s", pw_stream_state_as_string (old), pw_stream_state_as_string (state)); switch (state) { case PW_STREAM_STATE_ERROR: g_warning ("PipeWire stream error: %s", error); break; case PW_STREAM_STATE_PAUSED: case PW_STREAM_STATE_STREAMING: case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_CONNECTING: break; } } static void on_stream_param_changed (void *user_data, uint32_t id, const struct spa_pod *format) { GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (user_data); GrdSession *session = GRD_SESSION (stream->session); GrdContext *context = grd_session_get_context (session); uint8_t params_buffer[1024]; struct spa_pod_builder pod_builder; int width; int height; enum spa_data_type allowed_buffer_types; const struct spa_pod *params[3]; if (grd_session_vnc_is_client_gone (stream->session)) return; if (!format || id != SPA_PARAM_Format) return; spa_format_video_raw_parse (format, &stream->spa_format); pod_builder = SPA_POD_BUILDER_INIT (params_buffer, sizeof (params_buffer)); width = stream->spa_format.size.width; height = stream->spa_format.size.height; g_debug ("[VNC] Stream parameters changed. New monitor size: [%u, %u]", width, height); grd_session_vnc_queue_resize_framebuffer (stream->session, width, height); allowed_buffer_types = 1 << SPA_DATA_MemFd; if (grd_context_get_egl_thread (context)) allowed_buffer_types |= 1 << SPA_DATA_DmaBuf; params[0] = spa_pod_builder_add_object ( &pod_builder, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int (8, 1, 8), SPA_PARAM_BUFFERS_dataType, SPA_POD_Int (allowed_buffer_types), 0); params[1] = spa_pod_builder_add_object ( &pod_builder, SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, SPA_PARAM_META_type, SPA_POD_Id (SPA_META_Header), SPA_PARAM_META_size, SPA_POD_Int (sizeof (struct spa_meta_header)), 0); params[2] = spa_pod_builder_add_object( &pod_builder, SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, SPA_PARAM_META_type, SPA_POD_Id (SPA_META_Cursor), SPA_PARAM_META_size, SPA_POD_CHOICE_RANGE_Int (CURSOR_META_SIZE (384, 384), CURSOR_META_SIZE (1,1), CURSOR_META_SIZE (384, 384)), 0); pw_stream_update_params (stream->pipewire_stream, params, G_N_ELEMENTS (params)); } static void on_stream_add_buffer (void *user_data, struct pw_buffer *buffer) { GrdVncPipeWireStream *stream = user_data; g_hash_table_insert (stream->pipewire_buffers, buffer, buffer_context_new ()); } static void on_stream_remove_buffer (void *user_data, struct pw_buffer *buffer) { GrdVncPipeWireStream *stream = user_data; BufferContext *buffer_context = NULL; g_autoptr (GMutexLocker) locker = NULL; if (!g_hash_table_lookup_extended (stream->pipewire_buffers, buffer, NULL, (gpointer *) &buffer_context)) g_assert_not_reached (); locker = g_mutex_locker_new (&stream->dequeue_mutex); /* Ensure buffer is not locked any more */ g_mutex_lock (&buffer_context->buffer_mutex); g_mutex_unlock (&buffer_context->buffer_mutex); g_hash_table_remove (stream->pipewire_buffers, buffer); } static GrdVncFrame * grd_vnc_frame_new (GrdVncPipeWireStream *stream, GrdVncFrameReadyCallback callback, gpointer callback_user_data) { GrdVncFrame *frame; frame = g_new0 (GrdVncFrame, 1); g_atomic_ref_count_init (&frame->refcount); frame->stream = stream; frame->callback = callback; frame->callback_user_data = callback_user_data; return frame; } static GrdVncFrame * grd_vnc_frame_ref (GrdVncFrame *frame) { g_atomic_ref_count_inc (&frame->refcount); return frame; } static void grd_vnc_frame_unref (GrdVncFrame *frame) { if (g_atomic_ref_count_dec (&frame->refcount)) { g_free (frame->data); g_free (frame); } } static gboolean render_frame (gpointer user_data) { GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (user_data); GrdVncFrame *frame; g_mutex_lock (&stream->frame_mutex); frame = g_steal_pointer (&stream->pending_frame); g_mutex_unlock (&stream->frame_mutex); if (!frame) return G_SOURCE_CONTINUE; if (grd_session_vnc_is_client_gone (stream->session)) { grd_vnc_frame_unref (frame); return G_SOURCE_CONTINUE; } if (frame->data) { grd_session_vnc_take_buffer (stream->session, g_steal_pointer (&frame->data)); } else { grd_session_vnc_flush (stream->session); } grd_vnc_frame_unref (frame); return G_SOURCE_CONTINUE; } static gboolean render_mouse_pointer (gpointer user_data) { GrdVncPipeWireStream *stream = user_data; g_autoptr (GMutexLocker) locker = NULL; VncPointer *vnc_pointer; locker = g_mutex_locker_new (&stream->pointer_mutex); if (!stream->pending_pointer) return G_SOURCE_CONTINUE; vnc_pointer = g_steal_pointer (&stream->pending_pointer); g_clear_pointer (&locker, g_mutex_locker_free); if (vnc_pointer->rfb_cursor) { grd_session_vnc_set_cursor (stream->session, g_steal_pointer (&vnc_pointer->rfb_cursor)); } if (vnc_pointer->cursor_moved) { grd_session_vnc_move_cursor (stream->session, vnc_pointer->cursor_x, vnc_pointer->cursor_y); } grd_session_vnc_flush (stream->session); vnc_pointer_free (vnc_pointer); return G_SOURCE_CONTINUE; } static void process_mouse_pointer_bitmap (GrdVncPipeWireStream *stream, struct spa_buffer *buffer, VncPointer **vnc_pointer) { struct spa_meta_cursor *spa_meta_cursor; struct spa_meta_bitmap *spa_meta_bitmap; GrdPixelFormat format; spa_meta_cursor = spa_buffer_find_meta_data (buffer, SPA_META_Cursor, sizeof *spa_meta_cursor); g_assert (spa_meta_cursor); g_assert (spa_meta_cursor_is_valid (spa_meta_cursor)); g_assert (spa_meta_cursor->bitmap_offset); spa_meta_bitmap = SPA_MEMBER (spa_meta_cursor, spa_meta_cursor->bitmap_offset, struct spa_meta_bitmap); if (spa_meta_bitmap && spa_meta_bitmap->size.width > 0 && spa_meta_bitmap->size.height > 0 && grd_spa_pixel_format_to_grd_pixel_format (spa_meta_bitmap->format, &format)) { uint8_t *buf; rfbCursorPtr rfb_cursor; buf = SPA_MEMBER (spa_meta_bitmap, spa_meta_bitmap->offset, uint8_t); rfb_cursor = grd_vnc_create_cursor (spa_meta_bitmap->size.width, spa_meta_bitmap->size.height, spa_meta_bitmap->stride, format, buf); rfb_cursor->xhot = spa_meta_cursor->hotspot.x; rfb_cursor->yhot = spa_meta_cursor->hotspot.y; if (!(*vnc_pointer)) *vnc_pointer = g_new0 (VncPointer, 1); (*vnc_pointer)->rfb_cursor = rfb_cursor; } else if (spa_meta_bitmap) { if (!(*vnc_pointer)) *vnc_pointer = g_new0 (VncPointer, 1); (*vnc_pointer)->rfb_cursor = grd_vnc_create_empty_cursor (1, 1); } } static void on_frame_ready (GrdVncPipeWireStream *stream, GrdVncFrame *frame, gboolean success, gpointer user_data) { struct pw_buffer *buffer = user_data; g_assert (frame); g_assert (buffer); if (!success) goto out; g_mutex_lock (&stream->frame_mutex); g_clear_pointer (&stream->pending_frame, grd_vnc_frame_unref); stream->pending_frame = g_steal_pointer (&frame); g_mutex_unlock (&stream->frame_mutex); g_source_set_ready_time (stream->pending_frame_source, 0); out: pw_stream_queue_buffer (stream->pipewire_stream, buffer); maybe_release_pipewire_buffer_lock (stream, buffer); g_clear_pointer (&frame, grd_vnc_frame_unref); } static void copy_frame_data (GrdVncFrame *frame, uint8_t *src_data, int width, int height, int dst_stride, int src_stride, int bpp) { int y; frame->data = g_malloc (height * dst_stride); for (y = 0; y < height; y++) { memcpy (((uint8_t *) frame->data) + y * dst_stride, ((uint8_t *) src_data) + y * src_stride, width * bpp); } } static void on_dma_buf_downloaded (gboolean success, gpointer user_data) { GrdVncFrame *frame = user_data; frame->callback (frame->stream, frame, success, frame->callback_user_data); } static void process_frame_data (GrdVncPipeWireStream *stream, struct pw_buffer *pw_buffer) { struct spa_buffer *buffer = pw_buffer->buffer; g_autoptr (GrdVncFrame) frame = NULL; GrdVncFrameReadyCallback callback; gpointer user_data; int dst_stride; uint32_t drm_format; int bpp; int width; int height; g_assert (buffer->datas[0].chunk->size > 0); height = stream->spa_format.size.height; width = stream->spa_format.size.width; dst_stride = grd_session_vnc_get_stride_for_width (stream->session, width); grd_get_spa_format_details (stream->spa_format.format, &drm_format, &bpp); frame = grd_vnc_frame_new (stream, on_frame_ready, pw_buffer); callback = frame->callback; user_data = frame->callback_user_data; if (buffer->datas[0].type == SPA_DATA_MemFd) { size_t size; uint8_t *map; int src_stride; uint8_t *src_data; size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset; map = mmap (NULL, size, PROT_READ, MAP_PRIVATE, buffer->datas[0].fd, 0); if (map == MAP_FAILED) { g_warning ("Failed to mmap buffer: %s", g_strerror (errno)); callback (stream, g_steal_pointer (&frame), FALSE, user_data); return; } src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t); src_stride = buffer->datas[0].chunk->stride; copy_frame_data (frame, src_data, width, height, dst_stride, src_stride, bpp); munmap (map, size); callback (stream, g_steal_pointer (&frame), TRUE, user_data); } else if (buffer->datas[0].type == SPA_DATA_DmaBuf) { GrdSession *session = GRD_SESSION (stream->session); GrdContext *context = grd_session_get_context (session); GrdEglThread *egl_thread = grd_context_get_egl_thread (context); int row_width; int *fds; uint32_t *offsets; uint32_t *strides; uint64_t *modifiers = NULL; uint32_t n_planes; unsigned int i; uint8_t *dst_data; row_width = dst_stride / bpp; n_planes = buffer->n_datas; fds = g_alloca (sizeof (int) * n_planes); offsets = g_alloca (sizeof (uint32_t) * n_planes); strides = g_alloca (sizeof (uint32_t) * n_planes); if (stream->spa_format.modifier != DRM_FORMAT_MOD_INVALID) modifiers = g_alloca (sizeof (uint64_t) * n_planes); for (i = 0; i < n_planes; i++) { fds[i] = buffer->datas[i].fd; offsets[i] = buffer->datas[i].chunk->offset; strides[i] = buffer->datas[i].chunk->stride; if (modifiers) modifiers[i] = stream->spa_format.modifier; } dst_data = g_malloc0 (height * dst_stride); frame->data = dst_data; acquire_pipewire_buffer_lock (stream, pw_buffer); grd_egl_thread_download (egl_thread, stream->egl_slot, dst_data, row_width, drm_format, width, height, n_planes, fds, strides, offsets, modifiers, on_dma_buf_downloaded, grd_vnc_frame_ref (g_steal_pointer (&frame)), (GDestroyNotify) grd_vnc_frame_unref); } else { g_assert_not_reached (); } } static gboolean render_source_dispatch (GSource *source, GSourceFunc callback, gpointer user_data) { g_source_set_ready_time (source, -1); return callback (user_data); } static GSourceFuncs render_source_funcs = { .dispatch = render_source_dispatch, }; static void maybe_consume_pointer_position (struct pw_buffer *buffer, gboolean *cursor_moved, int *cursor_x, int *cursor_y) { struct spa_meta_cursor *spa_meta_cursor; spa_meta_cursor = spa_buffer_find_meta_data (buffer->buffer, SPA_META_Cursor, sizeof *spa_meta_cursor); if (spa_meta_cursor && spa_meta_cursor_is_valid (spa_meta_cursor)) { *cursor_x = spa_meta_cursor->position.x; *cursor_y = spa_meta_cursor->position.y; *cursor_moved = TRUE; } } static void on_stream_process (void *user_data) { GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (user_data); g_autoptr (GMutexLocker) locker = NULL; g_autoptr (GrdVncFrame) frame = NULL; struct pw_buffer *last_pointer_buffer = NULL; struct pw_buffer *last_frame_buffer = NULL; struct pw_buffer *next_buffer; VncPointer *vnc_pointer = NULL; gboolean cursor_moved = FALSE; int cursor_x = 0; int cursor_y = 0; locker = g_mutex_locker_new (&stream->dequeue_mutex); if (stream->dequeuing_disallowed) return; while ((next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream))) { struct spa_meta_header *spa_meta_header; spa_meta_header = spa_buffer_find_meta_data (next_buffer->buffer, SPA_META_Header, sizeof (struct spa_meta_header)); if (spa_meta_header && spa_meta_header->flags & SPA_META_HEADER_FLAG_CORRUPTED) { pw_stream_queue_buffer (stream->pipewire_stream, next_buffer); continue; } maybe_consume_pointer_position (next_buffer, &cursor_moved, &cursor_x, &cursor_y); if (grd_pipewire_buffer_has_pointer_bitmap (next_buffer)) { if (last_pointer_buffer == last_frame_buffer) last_pointer_buffer = NULL; if (last_pointer_buffer) pw_stream_queue_buffer (stream->pipewire_stream, last_pointer_buffer); last_pointer_buffer = next_buffer; } if (grd_pipewire_buffer_has_frame_data (next_buffer)) { if (last_pointer_buffer == last_frame_buffer) last_frame_buffer = NULL; if (last_frame_buffer) pw_stream_queue_buffer (stream->pipewire_stream, last_frame_buffer); last_frame_buffer = next_buffer; } if (next_buffer != last_pointer_buffer && next_buffer != last_frame_buffer) pw_stream_queue_buffer (stream->pipewire_stream, next_buffer); } if (!last_pointer_buffer && !last_frame_buffer && !cursor_moved) return; if (cursor_moved) { vnc_pointer = g_new0 (VncPointer, 1); vnc_pointer->cursor_moved = cursor_moved; vnc_pointer->cursor_x = cursor_x; vnc_pointer->cursor_y = cursor_y; } if (last_pointer_buffer) { process_mouse_pointer_bitmap (stream, last_pointer_buffer->buffer, &vnc_pointer); if (last_pointer_buffer != last_frame_buffer) pw_stream_queue_buffer (stream->pipewire_stream, last_pointer_buffer); } if (vnc_pointer) { g_mutex_lock (&stream->pointer_mutex); g_clear_pointer (&stream->pending_pointer, vnc_pointer_free); stream->pending_pointer = vnc_pointer; g_mutex_unlock (&stream->pointer_mutex); g_source_set_ready_time (stream->pending_pointer_source, 0); } if (!last_frame_buffer) return; process_frame_data (stream, last_frame_buffer); } static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_stream_state_changed, .param_changed = on_stream_param_changed, .add_buffer = on_stream_add_buffer, .remove_buffer = on_stream_remove_buffer, .process = on_stream_process, }; static gboolean connect_to_stream (GrdVncPipeWireStream *stream, const GrdVncVirtualMonitor *virtual_monitor, GError **error) { struct pw_stream *pipewire_stream; uint8_t params_buffer[1024]; struct spa_pod_builder pod_builder; const struct spa_pod *params[MAX_FORMAT_PARAMS] = {}; uint32_t n_params = 0; int ret; pipewire_stream = pw_stream_new (stream->pipewire_core, "grd-vnc-pipewire-stream", NULL); pod_builder = SPA_POD_BUILDER_INIT (params_buffer, sizeof (params_buffer)); n_params += add_format_params (stream, virtual_monitor, &pod_builder, params, MAX_FORMAT_PARAMS); stream->pipewire_stream = pipewire_stream; pw_stream_add_listener (pipewire_stream, &stream->pipewire_stream_listener, &stream_events, stream); g_assert (n_params > 0); ret = pw_stream_connect (stream->pipewire_stream, PW_DIRECTION_INPUT, stream->src_node_id, (PW_STREAM_FLAG_RT_PROCESS | PW_STREAM_FLAG_AUTOCONNECT), params, n_params); if (ret < 0) { g_set_error_literal (error, G_IO_ERROR, g_io_error_from_errno (-ret), strerror (-ret)); return FALSE; } return TRUE; } static void on_core_error (void *user_data, uint32_t id, int seq, int res, const char *message) { GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (user_data); g_warning ("Pipewire core error: id:%u %s", id, message); if (id == PW_ID_CORE && res == -EPIPE) g_signal_emit (stream, signals[CLOSED], 0); } static const struct pw_core_events core_events = { PW_VERSION_CORE_EVENTS, .error = on_core_error, }; GrdVncPipeWireStream * grd_vnc_pipewire_stream_new (GrdSessionVnc *session_vnc, uint32_t src_node_id, const GrdVncVirtualMonitor *virtual_monitor, GError **error) { GrdSession *session = GRD_SESSION (session_vnc); GrdContext *context = grd_session_get_context (session); GrdEglThread *egl_thread = grd_context_get_egl_thread (context); g_autoptr (GrdVncPipeWireStream) stream = NULL; GrdPipeWireSource *pipewire_source; GSource *source; stream = g_object_new (GRD_TYPE_VNC_PIPEWIRE_STREAM, NULL); stream->session = session_vnc; stream->src_node_id = src_node_id; if (egl_thread) stream->egl_slot = grd_egl_thread_acquire_slot (egl_thread); pw_init (NULL, NULL); pipewire_source = grd_attached_pipewire_source_new ("VNC", error); if (!pipewire_source) return NULL; stream->pipewire_source = (GSource *) pipewire_source; stream->pipewire_context = pw_context_new (pipewire_source->pipewire_loop, NULL, 0); if (!stream->pipewire_context) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to create PipeWire context"); return NULL; } stream->pipewire_core = pw_context_connect (stream->pipewire_context, NULL, 0); if (!stream->pipewire_core) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to connect PipeWire context"); return NULL; } source = g_source_new (&render_source_funcs, sizeof (GSource)); stream->pending_frame_source = source; g_source_set_callback (source, render_frame, stream, NULL); g_source_attach (source, NULL); g_source_unref (source); source = g_source_new (&render_source_funcs, sizeof (GSource)); stream->pending_pointer_source = source; g_source_set_callback (source, render_mouse_pointer, stream, NULL); g_source_attach (source, NULL); g_source_unref (source); pw_core_add_listener (stream->pipewire_core, &stream->pipewire_core_listener, &core_events, stream); if (!connect_to_stream (stream, virtual_monitor, error)) return NULL; return g_steal_pointer (&stream); } static void on_sync_complete (gboolean success, gpointer user_data) { GrdSyncPoint *sync_point = user_data; grd_sync_point_complete (sync_point, success); } static void grd_vnc_pipewire_stream_finalize (GObject *object) { GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (object); GrdSession *session = GRD_SESSION (stream->session); GrdContext *context = grd_session_get_context (session); GrdEglThread *egl_thread; g_mutex_lock (&stream->dequeue_mutex); stream->dequeuing_disallowed = TRUE; g_mutex_unlock (&stream->dequeue_mutex); egl_thread = grd_context_get_egl_thread (context); if (egl_thread) { GrdSyncPoint sync_point = {}; grd_sync_point_init (&sync_point); grd_egl_thread_sync (egl_thread, on_sync_complete, &sync_point, NULL); grd_sync_point_wait_for_completion (&sync_point); grd_sync_point_clear (&sync_point); } g_clear_pointer (&stream->pipewire_stream, pw_stream_destroy); g_clear_pointer (&stream->pipewire_core, pw_core_disconnect); g_clear_pointer (&stream->pipewire_context, pw_context_destroy); g_clear_pointer (&stream->pending_pointer_source, g_source_destroy); g_clear_pointer (&stream->pending_frame_source, g_source_destroy); if (stream->pipewire_source) { g_source_destroy (stream->pipewire_source); g_clear_pointer (&stream->pipewire_source, g_source_unref); } g_clear_pointer (&stream->pending_pointer, vnc_pointer_free); g_clear_pointer (&stream->pending_frame, grd_vnc_frame_unref); g_mutex_clear (&stream->pointer_mutex); g_mutex_clear (&stream->frame_mutex); g_mutex_clear (&stream->dequeue_mutex); g_clear_pointer (&stream->pipewire_buffers, g_hash_table_unref); pw_deinit (); if (egl_thread) grd_egl_thread_release_slot (egl_thread, stream->egl_slot); G_OBJECT_CLASS (grd_vnc_pipewire_stream_parent_class)->finalize (object); } static void grd_vnc_pipewire_stream_init (GrdVncPipeWireStream *stream) { stream->pipewire_buffers = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) buffer_context_free); g_mutex_init (&stream->dequeue_mutex); g_mutex_init (&stream->frame_mutex); g_mutex_init (&stream->pointer_mutex); } static void grd_vnc_pipewire_stream_class_init (GrdVncPipeWireStreamClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); object_class->finalize = grd_vnc_pipewire_stream_finalize; signals[CLOSED] = g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0); }