1 From f2e6a0a324106b40195f88953e55a355875d2b1b Mon Sep 17 00:00:00 2001
2 From: George Kiagiadakis <george.kiagiadakis@collabora.com>
3 Date: Fri, 4 Oct 2019 20:51:24 +0300
4 Subject: [PATCH] utils: add a gstreamer helper application for interconnection
7 Unfortunately, the bluez-alsa PCM plugin does not work correctly
8 when it is used through pipewire (or gstreamer, or anywhere really...).
10 Thanfully, the bluez-alsa PCM plugin is only a simple client that
11 reads/writes on a file descriptor that was opened by bluealsa.
12 This allows us to use bluealsa without the PCM plugin, just like it
13 is done in the aplay.c util.
15 This one uses GStreamer to implement the plumbing between pipewire
16 and the file descriptor. On the reading side we are also doing some
17 tricks to ensure a smooth stream, which is not the case for the
18 stream that is coming out of bluealsa.
20 This helper is implemented as a patch to bluez-alsa so that it can
21 use its internal private API. In the future this needs some re-thinking.
23 Upstream-Status: Inappropriate
26 utils/Makefile.am | 20 +++
27 utils/gst-helper.c | 432 +++++++++++++++++++++++++++++++++++++++++++++
28 3 files changed, 459 insertions(+)
29 create mode 100644 utils/gst-helper.c
31 diff --git a/configure.ac b/configure.ac
32 index 4825afa..9125871 100644
35 @@ -141,6 +141,13 @@ AM_COND_IF([ENABLE_HCITOP], [
36 PKG_CHECK_MODULES([NCURSES], [ncurses])
39 +AC_ARG_ENABLE([gsthelper],
40 + [AS_HELP_STRING([--enable-gsthelper], [enable building of gsthelper tool])])
41 +AM_CONDITIONAL([ENABLE_GSTHELPER], [test "x$enable_gsthelper" = "xyes"])
42 +AM_COND_IF([ENABLE_GSTHELPER], [
43 + PKG_CHECK_MODULES([GST], [gstreamer-1.0 glib-2.0])
47 [AS_HELP_STRING([--enable-test], [enable unit test])])
48 AM_CONDITIONAL([ENABLE_TEST], [test "x$enable_test" = "xyes"])
49 diff --git a/utils/Makefile.am b/utils/Makefile.am
50 index 9057f2c..9790474 100644
51 --- a/utils/Makefile.am
52 +++ b/utils/Makefile.am
53 @@ -47,3 +47,23 @@ hcitop_LDADD = \
59 +bin_PROGRAMS += bluealsa-gst-helper
60 +bluealsa_gst_helper_SOURCES = \
61 + ../src/shared/dbus-client.c \
62 + ../src/shared/ffb.c \
63 + ../src/shared/log.c \
65 +bluealsa_gst_helper_CFLAGS = \
66 + -I$(top_srcdir)/src \
71 +bluealsa_gst_helper_LDADD = \
77 diff --git a/utils/gst-helper.c b/utils/gst-helper.c
79 index 0000000..de1d47c
81 +++ b/utils/gst-helper.c
83 +/* Bluez-Alsa PipeWire integration GStreamer helper
85 + * Copyright © 2016-2019 Arkadiusz Bokowy
86 + * Copyright © 2019 Collabora Ltd.
87 + * @author George Kiagiadakis <george.kiagiadakis@collabora.com>
89 + * SPDX-License-Identifier: MIT
101 +#include <stdbool.h>
108 +#include <bluetooth/bluetooth.h>
109 +#include <dbus/dbus.h>
110 +#include <gst/gst.h>
112 +#include "shared/dbus-client.h"
113 +#include "shared/defs.h"
114 +#include "shared/ffb.h"
115 +#include "shared/log.h"
118 + /* used BlueALSA PCM device */
119 + struct ba_pcm ba_pcm;
120 + /* file descriptor of PCM FIFO */
122 + /* file descriptor of PCM control */
123 + int ba_pcm_ctrl_fd;
124 + /* the gstreamer pipelines (sink & source) */
125 + GstElement *pipeline[2];
126 + /* the queue & pwaudiosink of the sink pipeline */
128 + GstElement *pwelem;
131 +static struct ba_dbus_ctx dbus_ctx;
132 +static GHashTable *workers;
133 +static bool main_loop_on = true;
136 +main_loop_stop(int sig)
138 + /* Call to this handler restores the default action, so on the
139 + * second call the program will be forcefully terminated. */
141 + struct sigaction sigact = { .sa_handler = SIG_DFL };
142 + sigaction(sig, &sigact, NULL);
144 + main_loop_on = false;
147 +static GstBusSyncReply
148 +bus_sync_handler(GstBus *bus, GstMessage *message, gpointer user_data)
150 + struct worker *w = user_data;
153 + switch (GST_MESSAGE_TYPE (message)) {
154 + case GST_MESSAGE_REQUEST_STATE:
155 + gst_message_parse_request_state (message, &s);
157 + debug ("corked: %d", (s == GST_STATE_PAUSED));
159 + /* drop queue data when corked */
160 + g_object_set (w->queue,
161 + "leaky", (s == GST_STATE_PAUSED) ? 2 /* downstream */ : 0 /* no */,
163 + gst_element_set_state (w->pwelem, s);
165 + /* flush the queue when resuming */
166 + if (s == GST_STATE_PLAYING) {
167 + gst_element_send_event (w->queue, gst_event_new_flush_start ());
168 + gst_element_send_event (w->queue, gst_event_new_flush_stop (FALSE));
175 + gst_message_unref (message);
176 + return GST_BUS_DROP;
180 +worker_start_pipeline(struct worker *w, int id, int mode, int profile)
182 + GError *gerr = NULL;
183 + DBusError err = DBUS_ERROR_INIT;
184 + const gchar * role = NULL;
186 + if (w->pipeline[id])
189 + if (!bluealsa_dbus_pcm_open(&dbus_ctx, w->ba_pcm.pcm_path, mode,
190 + &w->ba_pcm_fd, &w->ba_pcm_ctrl_fd, &err)) {
191 + error("Couldn't open PCM: %s", err.message);
192 + dbus_error_free(&err);
196 + if (mode == BA_PCM_FLAG_SINK) {
197 + debug("sink start");
198 + w->pipeline[id] = gst_parse_launch(
199 + /* add a silent live source to ensure a perfect live stream on the
200 + output, even when the bt device is not sending or has gaps;
201 + this also effectively changes the clock to be the system clock,
202 + which is the same clock used by bluez-alsa on the sending side */
203 + "audiotestsrc is-live=true wave=silence ! capsfilter name=capsf "
204 + "! audiomixer name=m "
205 + /* mix the input from bluez-alsa using fdsrc; rawaudioparse
206 + is necessary to convert bytes to time and align the buffers */
207 + "fdsrc name=fdelem do-timestamp=true ! capsfilter name=capsf2 "
208 + "! rawaudioparse use-sink-caps=true ! m. "
209 + /* take the mixer output, convert and push to pipewire */
210 + "m.src ! capsfilter name=capsf3 ! audioconvert ! audioresample "
211 + "! audio/x-raw,format=F32LE,rate=48000 ! identity sync=true "
212 + "! queue name=queue leaky=no max-size-time=0 max-size-buffers=0 max-size-bytes=192000 "
213 + "! pwaudiosink name=pwelem",
216 + /* a2dp is for music, sco is for calls */
217 + role = (profile == BA_PCM_FLAG_PROFILE_A2DP) ? "Multimedia" : "Communication";
219 + else if (mode == BA_PCM_FLAG_SOURCE && profile == BA_PCM_FLAG_PROFILE_SCO) {
220 + debug("source start");
221 + w->pipeline[id] = gst_parse_launch(
222 + /* read from pipewire and put the buffers on a leaky queue, which
223 + will essentially allow pwaudiosrc to continue working while
224 + the fdsink is blocked (when there is no phone call in progress).
225 + 9600 bytes = 50ms @ F32LE/1ch/48000
227 + "pwaudiosrc name=pwelem ! audio/x-raw,format=F32LE,rate=48000 "
228 + "! queue name=queue leaky=downstream max-size-time=0 max-size-buffers=0 max-size-bytes=9600 "
229 + "! audioconvert ! audioresample ! capsfilter name=capsf "
230 + "! fdsink name=fdelem", &gerr);
232 + role = "Communication";
236 + error("Failed to start pipeline: %s", gerr->message);
237 + g_error_free(gerr);
241 + if (w->pipeline[id]) {
242 + g_autofree gchar *capsstr = NULL;
243 + g_autoptr (GstElement) fdelem = gst_bin_get_by_name(GST_BIN(w->pipeline[id]), "fdelem");
244 + g_autoptr (GstElement) pwelem = gst_bin_get_by_name(GST_BIN(w->pipeline[id]), "pwelem");
245 + g_autoptr (GstElement) queue = gst_bin_get_by_name(GST_BIN(w->pipeline[id]), "queue");
246 + g_autoptr (GstElement) capsf = gst_bin_get_by_name(GST_BIN(w->pipeline[id]), "capsf");
247 + g_autoptr (GstElement) capsf2 = gst_bin_get_by_name(GST_BIN(w->pipeline[id]), "capsf2");
248 + g_autoptr (GstElement) capsf3 = gst_bin_get_by_name(GST_BIN(w->pipeline[id]), "capsf3");
249 + g_autoptr (GstCaps) caps = gst_caps_new_simple("audio/x-raw",
250 + "format", G_TYPE_STRING, "S16LE",
251 + "layout", G_TYPE_STRING, "interleaved",
252 + "channels", G_TYPE_INT, w->ba_pcm.channels,
253 + "rate", G_TYPE_INT, w->ba_pcm.sampling,
255 + g_autoptr (GstStructure) stream_props = gst_structure_new("props",
256 + "media.role", G_TYPE_STRING, role,
257 + "bluealsa.profile", G_TYPE_STRING,
258 + (profile == BA_PCM_FLAG_PROFILE_SCO) ? "sco" : "a2dp",
261 + g_object_set(capsf, "caps", caps, NULL);
263 + g_object_set(capsf2, "caps", caps, NULL);
265 + g_object_set(capsf3, "caps", caps, NULL);
267 + capsstr = gst_caps_to_string (caps);
268 + debug(" caps: %s", capsstr);
270 + g_object_set(fdelem, "fd", w->ba_pcm_fd, NULL);
271 + g_object_set(pwelem, "stream-properties", stream_props, NULL);
273 + if (mode == BA_PCM_FLAG_SINK) {
274 + g_autoptr (GstBus) bus = gst_pipeline_get_bus(GST_PIPELINE(w->pipeline[id]));
275 + gst_bus_set_sync_handler(bus, bus_sync_handler, w, NULL);
277 + w->pwelem = pwelem;
280 + gst_element_set_state(w->pipeline[id], GST_STATE_PLAYING);
285 + g_clear_object(&w->pipeline[id]);
290 +worker_start(struct worker *w)
292 + int mode = w->ba_pcm.flags & (BA_PCM_FLAG_SOURCE | BA_PCM_FLAG_SINK);
293 + int profile = w->ba_pcm.flags & (BA_PCM_FLAG_PROFILE_A2DP | BA_PCM_FLAG_PROFILE_SCO);
294 + /* human-readable BT address */
297 + g_return_val_if_fail (profile != 0 && profile != (BA_PCM_FLAG_PROFILE_A2DP | BA_PCM_FLAG_PROFILE_SCO), -1);
299 + ba2str(&w->ba_pcm.addr, addr);
300 + debug("%p: worker start addr:%s, mode:0x%x, profile:0x%x", w, addr, mode, profile);
302 + if (mode & BA_PCM_FLAG_SINK)
303 + worker_start_pipeline(w, 0, BA_PCM_FLAG_SINK, profile);
304 + if (mode & BA_PCM_FLAG_SOURCE)
305 + worker_start_pipeline(w, 1, BA_PCM_FLAG_SOURCE, profile);
309 +worker_stop(struct worker *w)
311 + debug("stop worker %p", w);
312 + if (w->pipeline[0]) {
313 + gst_element_set_state(w->pipeline[0], GST_STATE_NULL);
314 + g_clear_object(&w->pipeline[0]);
316 + if (w->pipeline[1]) {
317 + gst_element_set_state(w->pipeline[1], GST_STATE_NULL);
318 + g_clear_object(&w->pipeline[1]);
320 + if (w->ba_pcm_fd != -1) {
321 + close(w->ba_pcm_fd);
324 + if (w->ba_pcm_ctrl_fd != -1) {
325 + close(w->ba_pcm_ctrl_fd);
326 + w->ba_pcm_ctrl_fd = -1;
332 +supervise_pcm_worker(struct worker *worker)
334 + if (worker == NULL)
338 + if (worker->ba_pcm.flags & (BA_PCM_FLAG_SOURCE | BA_PCM_FLAG_SINK) == 0)
342 + if (worker->ba_pcm.flags & (BA_PCM_FLAG_PROFILE_A2DP | BA_PCM_FLAG_PROFILE_SCO) == 0)
345 + /* check whether SCO has selected codec */
346 + if (worker->ba_pcm.flags & BA_PCM_FLAG_PROFILE_SCO &&
347 + worker->ba_pcm.codec == 0) {
348 + debug("Skipping SCO with codec not selected");
353 + return worker_start(worker);
355 + return worker_stop(worker);
359 +worker_new(struct ba_pcm *pcm)
361 + struct worker *w = g_slice_new0 (struct worker);
362 + memcpy(&w->ba_pcm, pcm, sizeof(struct ba_pcm));
364 + w->ba_pcm_ctrl_fd = -1;
365 + g_hash_table_insert(workers, w->ba_pcm.pcm_path, w);
366 + supervise_pcm_worker(w);
369 +static DBusHandlerResult
370 +dbus_signal_handler(DBusConnection *conn, DBusMessage *message, void *data)
375 + const char *path = dbus_message_get_path(message);
376 + const char *interface = dbus_message_get_interface(message);
377 + const char *signal = dbus_message_get_member(message);
379 + DBusMessageIter iter;
380 + struct worker *worker;
382 + if (strcmp(interface, BLUEALSA_INTERFACE_MANAGER) == 0) {
384 + if (strcmp(signal, "PCMAdded") == 0) {
386 + if (!dbus_message_iter_init(message, &iter) ||
387 + !bluealsa_dbus_message_iter_get_pcm(&iter, NULL, &pcm)) {
388 + error("Couldn't add new PCM: %s", "Invalid signal signature");
392 + return DBUS_HANDLER_RESULT_HANDLED;
395 + if (strcmp(signal, "PCMRemoved") == 0) {
396 + if (!dbus_message_iter_init(message, &iter) ||
397 + dbus_message_iter_get_arg_type(&iter) != DBUS_TYPE_OBJECT_PATH) {
398 + error("Couldn't remove PCM: %s", "Invalid signal signature");
401 + dbus_message_iter_get_basic(&iter, &path);
402 + g_hash_table_remove(workers, path);
403 + return DBUS_HANDLER_RESULT_HANDLED;
408 + if (strcmp(interface, DBUS_INTERFACE_PROPERTIES) == 0) {
409 + worker = g_hash_table_lookup(workers, path);
412 + if (!dbus_message_iter_init(message, &iter) ||
413 + dbus_message_iter_get_arg_type(&iter) != DBUS_TYPE_STRING) {
414 + error("Couldn't update PCM: %s", "Invalid signal signature");
417 + dbus_message_iter_get_basic(&iter, &interface);
418 + dbus_message_iter_next(&iter);
419 + if (!bluealsa_dbus_message_iter_get_pcm_props(&iter, NULL, &worker->ba_pcm))
421 + supervise_pcm_worker(worker);
422 + return DBUS_HANDLER_RESULT_HANDLED;
426 + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
430 +destroy_worker(void *worker)
432 + struct worker *w = worker;
434 + g_slice_free(struct worker, w);
438 +main(int argc, char *argv[])
440 + int ret = EXIT_SUCCESS;
442 + log_open(argv[0], false, false);
443 + gst_init(&argc, &argv);
444 + dbus_threads_init_default();
446 + DBusError err = DBUS_ERROR_INIT;
447 + if (!bluealsa_dbus_connection_ctx_init(&dbus_ctx, BLUEALSA_SERVICE, &err)) {
448 + error("Couldn't initialize D-Bus context: %s", err.message);
449 + return EXIT_FAILURE;
452 + bluealsa_dbus_connection_signal_match_add(&dbus_ctx,
453 + BLUEALSA_SERVICE, NULL, BLUEALSA_INTERFACE_MANAGER, "PCMAdded", NULL);
454 + bluealsa_dbus_connection_signal_match_add(&dbus_ctx,
455 + BLUEALSA_SERVICE, NULL, BLUEALSA_INTERFACE_MANAGER, "PCMRemoved", NULL);
456 + bluealsa_dbus_connection_signal_match_add(&dbus_ctx,
457 + BLUEALSA_SERVICE, NULL, DBUS_INTERFACE_PROPERTIES, "PropertiesChanged",
458 + "arg0='"BLUEALSA_INTERFACE_PCM"'");
460 + if (!dbus_connection_add_filter(dbus_ctx.conn, dbus_signal_handler, NULL, NULL)) {
461 + error("Couldn't add D-Bus filter: %s", err.message);
462 + return EXIT_FAILURE;
465 + workers = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, destroy_worker);
468 + struct ba_pcm *pcms = NULL;
469 + size_t pcms_count = 0, i;
471 + if (!bluealsa_dbus_get_pcms(&dbus_ctx, &pcms, &pcms_count, &err))
472 + warn("Couldn't get BlueALSA PCM list: %s", err.message);
474 + for (i = 0; i < pcms_count; i++) {
475 + worker_new(&pcms[i]);
481 + struct sigaction sigact = { .sa_handler = main_loop_stop };
482 + sigaction(SIGTERM, &sigact, NULL);
483 + sigaction(SIGINT, &sigact, NULL);
485 + /* Ignore SIGPIPE, which may be received when writing to the bluealsa
486 + socket when it is closed on the remote end */
487 + signal(SIGPIPE, SIG_IGN);
489 + debug("Starting main loop");
490 + while (main_loop_on) {
492 + struct pollfd pfds[10];
493 + nfds_t pfds_len = ARRAYSIZE(pfds);
495 + if (!bluealsa_dbus_connection_poll_fds(&dbus_ctx, pfds, &pfds_len)) {
496 + error("Couldn't get D-Bus connection file descriptors");
497 + ret = EXIT_FAILURE;
501 + if (poll(pfds, pfds_len, -1) == -1 &&
505 + if (bluealsa_dbus_connection_poll_dispatch(&dbus_ctx, pfds, pfds_len))
506 + while (dbus_connection_dispatch(dbus_ctx.conn) == DBUS_DISPATCH_DATA_REMAINS)
512 + g_hash_table_unref(workers);