pipewire: add patch to fix random sendmsg errors 44/22544/2
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Tue, 1 Oct 2019 09:05:15 +0000 (12:05 +0300)
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Tue, 1 Oct 2019 11:42:11 +0000 (14:42 +0300)
Sometimes we get an error from sendmsg when too many objects
exist in the graph and when this happens, clients start to
silently fail to do things in a weird way. These patches fix
that by handling the situation where sendmsg returns EAGAIN
and trying again when the socket is unblocked. Previously,
data would silently be dropped, which is what caused the weird
behaviors.

See also https://github.com/PipeWire/pipewire/issues/111
Bug-AGL: SPEC-2837

Change-Id: Ie30083545629114f10a28e628f54d85e22d13058
Signed-off-by: George Kiagiadakis <george.kiagiadakis@collabora.com>
meta-pipewire/recipes-multimedia/pipewire/pipewire/0015-connection-move-remaining-data-and-fds.patch [new file with mode: 0644]
meta-pipewire/recipes-multimedia/pipewire/pipewire/0016-protocol-improve-flushing.patch [new file with mode: 0644]
meta-pipewire/recipes-multimedia/pipewire/pipewire_git.bb

diff --git a/meta-pipewire/recipes-multimedia/pipewire/pipewire/0015-connection-move-remaining-data-and-fds.patch b/meta-pipewire/recipes-multimedia/pipewire/pipewire/0015-connection-move-remaining-data-and-fds.patch
new file mode 100644 (file)
index 0000000..925dff7
--- /dev/null
@@ -0,0 +1,61 @@
+From 1b739247dcab62d1da45109cf805b8a840307ccc Mon Sep 17 00:00:00 2001
+From: Wim Taymans <wtaymans@redhat.com>
+Date: Tue, 1 Oct 2019 10:43:48 +0200
+Subject: [PATCH] connection: move remaining data and fds
+
+If we can't send all of the data, move the remaining data to the
+start of the buffer so that we can send it again later.
+
+See #111
+
+Upstream-Status: Backport [3d48ba8394396fc8d8cadb1bff3514217ddd70e6]
+---
+ .../module-protocol-native/connection.c       | 23 +++++++++++--------
+ 1 file changed, 14 insertions(+), 9 deletions(-)
+
+diff --git a/src/modules/module-protocol-native/connection.c b/src/modules/module-protocol-native/connection.c
+index f51129df..7b6cf112 100644
+--- a/src/modules/module-protocol-native/connection.c
++++ b/src/modules/module-protocol-native/connection.c
+@@ -500,8 +500,12 @@ int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *co
+                       if (sent < 0) {
+                               if (errno == EINTR)
+                                       continue;
+-                              else
+-                                      goto send_error;
++                              else {
++                                      res = -errno;
++                                      pw_log_error("could not sendmsg on fd:%d n_fds:%d: %s",
++                                                      conn->fd, n_fds, spa_strerror(res));
++                                      goto exit;
++                              }
+                       }
+                       break;
+               }
+@@ -513,15 +517,16 @@ int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *co
+               n_fds -= outfds;
+               fds += outfds;
+       }
+-      buf->buffer_size = size;
+-      buf->n_fds = n_fds;
+-      return 0;
++      res = 0;
+-      /* ERRORS */
+-send_error:
+-      res = -errno;
+-      pw_log_error("could not sendmsg on fd:%d n_fds:%d: %s", conn->fd, n_fds, strerror(errno));
++exit:
++      if (size > 0)
++              memmove(buf->buffer_data, data, size);
++      buf->buffer_size = size;
++      if (n_fds > 0)
++              memmove(buf->fds, fds, n_fds * sizeof(int));
++      buf->n_fds = n_fds;
+       return res;
+ }
+-- 
+2.23.0
+
diff --git a/meta-pipewire/recipes-multimedia/pipewire/pipewire/0016-protocol-improve-flushing.patch b/meta-pipewire/recipes-multimedia/pipewire/pipewire/0016-protocol-improve-flushing.patch
new file mode 100644 (file)
index 0000000..124434e
--- /dev/null
@@ -0,0 +1,243 @@
+From dffeeeb74f5b0a0385c4131b7ff349ff04fcebce Mon Sep 17 00:00:00 2001
+From: Wim Taymans <wtaymans@redhat.com>
+Date: Tue, 1 Oct 2019 12:53:56 +0200
+Subject: [PATCH] protocol: improve flushing
+
+Use the IO_OUT flag to schedule flushing instead of a flush_event.
+
+Handle EGAIN and wait for IO_OUT to try again.
+
+Fixes #111
+
+Upstream-Status: Backport [cc8e992cd155b4f19312a5036c7b744fc547410f]
+---
+ src/modules/module-protocol-native.c          | 99 ++++++++++++-------
+ .../module-protocol-native/connection.c       |  2 -
+ 2 files changed, 62 insertions(+), 39 deletions(-)
+
+diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c
+index cc8501d5..be2d4f57 100644
+--- a/src/modules/module-protocol-native.c
++++ b/src/modules/module-protocol-native.c
+@@ -82,9 +82,8 @@ struct client {
+       struct pw_protocol_native_connection *connection;
+       struct spa_hook conn_listener;
+-      struct spa_source *flush_event;
+       unsigned int disconnecting:1;
+-      unsigned int flush_signaled:1;
++      unsigned int flushing:1;
+ };
+ struct server {
+@@ -106,6 +105,7 @@ struct client_data {
+       struct spa_source *source;
+       struct pw_protocol_native_connection *connection;
+       unsigned int busy:1;
++      unsigned int need_flush:1;
+ };
+ static void
+@@ -200,12 +200,14 @@ client_busy_changed(void *data, bool busy)
+ {
+       struct client_data *c = data;
+       struct pw_client *client = c->client;
+-      uint32_t mask = SPA_IO_ERR | SPA_IO_HUP;
++      uint32_t mask = c->source->mask;
+       c->busy = busy;
+-      if (!busy)
+-              mask |= SPA_IO_IN;
++      if (busy)
++              SPA_FLAG_UNSET(mask, SPA_IO_IN);
++      else
++              SPA_FLAG_SET(mask, SPA_IO_IN);
+       pw_log_debug(NAME" %p: busy changed %d", client->protocol, busy);
+       pw_loop_update_io(client->core->main_loop, c->source, mask);
+@@ -220,13 +222,32 @@ connection_data(void *data, int fd, uint32_t mask)
+ {
+       struct client_data *this = data;
+       struct pw_client *client = this->client;
++      int res;
+-      if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
++      if (mask & SPA_IO_HUP) {
+               pw_log_info(NAME" %p: client %p disconnected", client->protocol, client);
+               pw_client_destroy(client);
+               return;
+       }
+-
++      if (mask & SPA_IO_ERR) {
++              pw_log_error(NAME" %p: client %p error", client->protocol, client);
++              pw_client_destroy(client);
++              return;
++      }
++      if (mask & SPA_IO_OUT) {
++              res = pw_protocol_native_connection_flush(this->connection);
++              if (res >= 0) {
++                      int mask = this->source->mask;
++                      SPA_FLAG_UNSET(mask, SPA_IO_OUT);
++                      pw_loop_update_io(client->protocol->core->main_loop,
++                                      this->source, mask);
++              } else if (res != EAGAIN) {
++                      pw_log_error("client %p: could not flush: %s",
++                                      client, spa_strerror(res));
++                      pw_client_destroy(client);
++                      return;
++              }
++      }
+       if (mask & SPA_IO_IN)
+               process_messages(this);
+ }
+@@ -296,7 +317,8 @@ static struct pw_client *client_new(struct server *s, int fd)
+       this->client = client;
+       this->source = pw_loop_add_io(pw_core_get_main_loop(core),
+-                                    fd, SPA_IO_ERR | SPA_IO_HUP, true, connection_data, this);
++                                    fd, SPA_IO_ERR | SPA_IO_HUP, true,
++                                    connection_data, this);
+       if (this->source == NULL)
+               goto cleanup_client;
+@@ -408,7 +430,7 @@ socket_data(void *data, int fd, uint32_t mask)
+       if (!client->busy)
+               pw_loop_update_io(client->protocol->core->main_loop,
+-                        c->source, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
++                              c->source, c->source->mask | SPA_IO_IN);
+ }
+ static int add_socket(struct pw_protocol *protocol, struct server *s)
+@@ -514,6 +536,17 @@ on_remote_data(void *data, int fd, uint32_t mask)
+               res = -EPIPE;
+               goto error;
+       }
++      if (mask & SPA_IO_OUT) {
++              res = pw_protocol_native_connection_flush(conn);
++              if (res >= 0) {
++                      int mask = impl->source->mask;
++                      SPA_FLAG_UNSET(mask, SPA_IO_OUT);
++                      pw_loop_update_io(core->main_loop,
++                                      impl->source, mask);
++                      impl->flushing = false;
++              } else if (res != EAGAIN)
++                      goto error;
++      }
+         if (mask & SPA_IO_IN) {
+               const struct pw_protocol_native_message *msg;
+@@ -588,23 +621,17 @@ error:
+ }
+-static void do_flush_event(void *data, uint64_t count)
+-{
+-      struct client *impl = data;
+-      impl->flush_signaled = false;
+-      if (impl->connection)
+-              if (pw_protocol_native_connection_flush(impl->connection) < 0)
+-                      impl->this.disconnect(&impl->this);
+-}
+-
+ static void on_need_flush(void *data)
+ {
+         struct client *impl = data;
+         struct pw_remote *remote = impl->this.remote;
+-      if (!impl->flush_signaled) {
+-              impl->flush_signaled = true;
+-              pw_loop_signal_event(remote->core->main_loop, impl->flush_event);
++      if (!impl->flushing) {
++              int mask = impl->source->mask;
++              impl->flushing = true;
++              SPA_FLAG_SET(mask, SPA_IO_OUT);
++              pw_loop_update_io(remote->core->main_loop,
++                                      impl->source, mask);
+       }
+ }
+@@ -669,12 +696,9 @@ static void impl_disconnect(struct pw_protocol_client *client)
+ static void impl_destroy(struct pw_protocol_client *client)
+ {
+       struct client *impl = SPA_CONTAINER_OF(client, struct client, this);
+-      struct pw_remote *remote = client->remote;
+       impl_disconnect(client);
+-      pw_loop_destroy_source(remote->core->main_loop, impl->flush_event);
+-
+       spa_list_remove(&client->link);
+       free(impl);
+ }
+@@ -687,7 +711,6 @@ impl_new_client(struct pw_protocol *protocol,
+       struct client *impl;
+       struct pw_protocol_client *this;
+       const char *str = NULL;
+-      int res;
+       if ((impl = calloc(1, sizeof(struct client))) == NULL)
+               return NULL;
+@@ -711,20 +734,9 @@ impl_new_client(struct pw_protocol *protocol,
+       this->disconnect = impl_disconnect;
+       this->destroy = impl_destroy;
+-      impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl);
+-      if (impl->flush_event == NULL) {
+-              res = -errno;
+-              goto error_cleanup;
+-      }
+-
+       spa_list_append(&protocol->client_list, &this->link);
+       return this;
+-
+-error_cleanup:
+-      free(impl);
+-      errno = -res;
+-      return NULL;
+ }
+ static void destroy_server(struct pw_protocol_server *server)
+@@ -757,10 +769,23 @@ static void on_before_hook(void *_data)
+       struct pw_protocol_server *this = &server->this;
+       struct pw_client *client, *tmp;
+       struct client_data *data;
++      int res;
+       spa_list_for_each_safe(client, tmp, &this->client_list, protocol_link) {
+               data = client->user_data;
+-              pw_protocol_native_connection_flush(data->connection);
++
++              res = pw_protocol_native_connection_flush(data->connection);
++              if (res == -EAGAIN) {
++                      int mask = data->source->mask;
++                      SPA_FLAG_SET(mask, SPA_IO_OUT);
++                      pw_loop_update_io(client->protocol->core->main_loop,
++                                      data->source, mask);
++              } else if (res < 0) {
++                      pw_log_warn("client %p: could not flush: %s",
++                                      data->client, spa_strerror(res));
++                      pw_client_destroy(client);
++              }
++
+       }
+ }
+diff --git a/src/modules/module-protocol-native/connection.c b/src/modules/module-protocol-native/connection.c
+index 7b6cf112..116457e3 100644
+--- a/src/modules/module-protocol-native/connection.c
++++ b/src/modules/module-protocol-native/connection.c
+@@ -502,8 +502,6 @@ int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *co
+                                       continue;
+                               else {
+                                       res = -errno;
+-                                      pw_log_error("could not sendmsg on fd:%d n_fds:%d: %s",
+-                                                      conn->fd, n_fds, spa_strerror(res));
+                                       goto exit;
+                               }
+                       }
+-- 
+2.23.0
+
index 54ec92b..e006f14 100644 (file)
@@ -15,6 +15,8 @@ SRC_URI = "gitsm://github.com/PipeWire/pipewire;protocol=https;branch=work \
     file://0012-extensions-implement-new-session-manager-extension.patch \
     file://0013-pipewire-cli-add-support-for-printing-endpoint-info-.patch \
     file://0014-daemon-config-remote-load-module-session-manager-by-.patch \
+    file://0015-connection-move-remaining-data-and-fds.patch \
+    file://0016-protocol-improve-flushing.patch \
     "
 
 SRCREV = "d3c7acb137134bddff3bc8a8964600252d3fb674"