1From e1e2d8d58c1e09e065849cdb1f6466c0537a7c51 Mon Sep 17 00:00:00 2001
2From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= <sebastian@centricular.com>
3Date: Tue, 21 Jun 2022 11:51:35 +0300
4Subject: [PATCH] bin: Fix race conditions in tests
5
6The latency messages are non-deterministic and can arrive before/after
7async-done or during state-changes as they are posted by e.g. sinks from
8their streaming thread but bins are finishing asynchronous state changes
9from a secondary helper thread.
10
11To solve this, expect latency messages at any time and assert that we
12receive one at some point during the test.
13
14Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2643>
15
16Upstream-Status: Backport [https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2643]
17Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
18---
19 .../gstreamer/tests/check/gst/gstbin.c        | 132 ++++++++++++------
20 1 file changed, 92 insertions(+), 40 deletions(-)
21
22diff --git a/subprojects/gstreamer/tests/check/gst/gstbin.c b/subprojects/gstreamer/tests/check/gst/gstbin.c
23index e366d5fe20f..88ff44db0c3 100644
24--- a/subprojects/gstreamer/tests/check/gst/gstbin.c
25+++ b/subprojects/gstreamer/tests/check/gst/gstbin.c
26@@ -27,50 +27,95 @@
27 #include <gst/base/gstbasesrc.h>
28
29 static void
30-pop_async_done (GstBus * bus)
31+pop_async_done (GstBus * bus, gboolean * had_latency)
32 {
33   GstMessage *message;
34+  GstMessageType types = GST_MESSAGE_ASYNC_DONE;
35+
36+  if (!*had_latency)
37+    types |= GST_MESSAGE_LATENCY;
38
39   GST_DEBUG ("popping async-done message");
40-  message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1);
41
42-  fail_unless (message && GST_MESSAGE_TYPE (message)
43-      == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
44+  do {
45+    message = gst_bus_poll (bus, types, -1);
46
47-  gst_message_unref (message);
48-  GST_DEBUG ("popped message");
49+    fail_unless (message);
50+    GST_DEBUG ("popped message %s",
51+        gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
52+
53+    if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_LATENCY) {
54+      fail_unless (*had_latency == FALSE);
55+      *had_latency = TRUE;
56+      gst_clear_message (&message);
57+      types &= ~GST_MESSAGE_LATENCY;
58+      continue;
59+    }
60+
61+    fail_unless (GST_MESSAGE_TYPE (message)
62+        == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
63+
64+    gst_clear_message (&message);
65+    break;
66+  } while (TRUE);
67 }
68
69 static void
70-pop_latency (GstBus * bus)
71+pop_latency (GstBus * bus, gboolean * had_latency)
72 {
73   GstMessage *message;
74
75-  GST_DEBUG ("popping async-done message");
76+  if (*had_latency)
77+    return;
78+
79+  GST_DEBUG ("popping latency message");
80   message = gst_bus_poll (bus, GST_MESSAGE_LATENCY, -1);
81
82-  fail_unless (message && GST_MESSAGE_TYPE (message)
83+  fail_unless (message);
84+  fail_unless (GST_MESSAGE_TYPE (message)
85       == GST_MESSAGE_LATENCY, "did not get GST_MESSAGE_LATENCY");
86
87-  gst_message_unref (message);
88-  GST_DEBUG ("popped message");
89+  GST_DEBUG ("popped message %s",
90+      gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
91+  gst_clear_message (&message);
92+
93+  *had_latency = TRUE;
94 }
95
96 static void
97-pop_state_changed (GstBus * bus, int count)
98+pop_state_changed (GstBus * bus, int count, gboolean * had_latency)
99 {
100   GstMessage *message;
101-
102+  GstMessageType types = GST_MESSAGE_STATE_CHANGED;
103   int i;
104
105+  if (!*had_latency)
106+    types |= GST_MESSAGE_LATENCY;
107+
108   GST_DEBUG ("popping %d messages", count);
109   for (i = 0; i < count; ++i) {
110-    message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
111-
112-    fail_unless (message && GST_MESSAGE_TYPE (message)
113-        == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
114-
115-    gst_message_unref (message);
116+    do {
117+      message = gst_bus_poll (bus, types, -1);
118+
119+      fail_unless (message);
120+      GST_DEBUG ("popped message %s",
121+          gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
122+
123+      if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_LATENCY) {
124+        fail_unless (*had_latency == FALSE);
125+        *had_latency = TRUE;
126+        gst_clear_message (&message);
127+        types &= ~GST_MESSAGE_LATENCY;
128+        continue;
129+      }
130+
131+      fail_unless (GST_MESSAGE_TYPE (message)
132+          == GST_MESSAGE_STATE_CHANGED,
133+          "did not get GST_MESSAGE_STATE_CHANGED");
134+
135+      gst_message_unref (message);
136+      break;
137+    } while (TRUE);
138   }
139   GST_DEBUG ("popped %d messages", count);
140 }
141@@ -538,6 +583,7 @@ GST_START_TEST (test_message_state_changed_children)
142   GstBus *bus;
143   GstStateChangeReturn ret;
144   GstState current, pending;
145+  gboolean had_latency = FALSE;
146
147   pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
148   fail_unless (pipeline != NULL, "Could not create pipeline");
149@@ -576,7 +622,7 @@ GST_START_TEST (test_message_state_changed_children)
150   ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
151   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
152
153-  pop_state_changed (bus, 3);
154+  pop_state_changed (bus, 3, &had_latency);
155   fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
156
157   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
158@@ -619,9 +665,9 @@ GST_START_TEST (test_message_state_changed_children)
159    * its state_change message */
160   ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4);
161
162-  pop_state_changed (bus, 3);
163-  pop_async_done (bus);
164-  pop_latency (bus);
165+  pop_state_changed (bus, 3, &had_latency);
166+  pop_async_done (bus, &had_latency);
167+  pop_latency (bus, &had_latency);
168   fail_if ((gst_bus_pop (bus)) != NULL);
169
170   ASSERT_OBJECT_REFCOUNT_BETWEEN (bus, "bus", 2, 3);
171@@ -648,7 +694,7 @@ GST_START_TEST (test_message_state_changed_children)
172   ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
173   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
174
175-  pop_state_changed (bus, 3);
176+  pop_state_changed (bus, 3, &had_latency);
177   fail_if ((gst_bus_pop (bus)) != NULL);
178
179   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
180@@ -669,7 +715,7 @@ GST_START_TEST (test_message_state_changed_children)
181   ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 3, 4);
182   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
183
184-  pop_state_changed (bus, 6);
185+  pop_state_changed (bus, 6, &had_latency);
186   fail_if ((gst_bus_pop (bus)) != NULL);
187
188   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
189@@ -696,6 +742,7 @@ GST_START_TEST (test_watch_for_state_change)
190   GstElement *src, *sink, *bin;
191   GstBus *bus;
192   GstStateChangeReturn ret;
193+  gboolean had_latency = FALSE;
194
195   bin = gst_element_factory_make ("bin", NULL);
196   fail_unless (bin != NULL, "Could not create bin");
197@@ -722,9 +769,9 @@ GST_START_TEST (test_watch_for_state_change)
198       GST_CLOCK_TIME_NONE);
199   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
200
201-  pop_state_changed (bus, 6);
202-  pop_async_done (bus);
203-  pop_latency (bus);
204+  pop_state_changed (bus, 6, &had_latency);
205+  pop_async_done (bus, &had_latency);
206+  pop_latency (bus, &had_latency);
207
208   fail_unless (gst_bus_have_pending (bus) == FALSE,
209       "Unexpected messages on bus");
210@@ -732,16 +779,17 @@ GST_START_TEST (test_watch_for_state_change)
211   ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING);
212   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
213
214-  pop_state_changed (bus, 3);
215+  pop_state_changed (bus, 3, &had_latency);
216
217+  had_latency = FALSE;
218   /* this one might return either SUCCESS or ASYNC, likely SUCCESS */
219   ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
220   gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
221
222-  pop_state_changed (bus, 3);
223+  pop_state_changed (bus, 3, &had_latency);
224   if (ret == GST_STATE_CHANGE_ASYNC) {
225-    pop_async_done (bus);
226-    pop_latency (bus);
227+    pop_async_done (bus, &had_latency);
228+    pop_latency (bus, &had_latency);
229   }
230
231   fail_unless (gst_bus_have_pending (bus) == FALSE,
232@@ -898,6 +946,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
233   GstStateChangeReturn ret;
234   GstState current, pending;
235   GstBus *bus;
236+  gboolean had_latency = FALSE;
237
238   pipeline = gst_pipeline_new (NULL);
239   fail_unless (pipeline != NULL, "Could not create pipeline");
240@@ -951,10 +1000,11 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
241   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
242 #else
243
244-  pop_state_changed (bus, 2);   /* pop remaining ready => paused messages off the bus */
245+  pop_state_changed (bus, 2, &had_latency);     /* pop remaining ready => paused messages off the bus */
246   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
247       108);
248-  pop_async_done (bus);
249+  pop_async_done (bus, &had_latency);
250+  pop_latency (bus, &had_latency);
251 #endif
252   /* PAUSED => PLAYING */
253   GST_DEBUG ("popping PAUSED -> PLAYING messages");
254@@ -972,8 +1022,8 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
255   fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
256
257   /* TODO: do we need to check downwards state change order as well? */
258-  pop_state_changed (bus, 4);   /* pop playing => paused messages off the bus */
259-  pop_state_changed (bus, 4);   /* pop paused => ready messages off the bus */
260+  pop_state_changed (bus, 4, &had_latency);     /* pop playing => paused messages off the bus */
261+  pop_state_changed (bus, 4, &had_latency);     /* pop paused => ready messages off the bus */
262
263   while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
264     THREAD_SWITCH ();
265@@ -1002,6 +1052,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
266   GstStateChangeReturn ret;
267   GstState current, pending;
268   GstBus *bus;
269+  gboolean had_latency = FALSE;
270
271   /* (2) Now again, but check other code path where we don't have
272    *     a proper sink correctly flagged as such, but a 'semi-sink' */
273@@ -1056,10 +1107,11 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
274   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
275   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
276 #else
277-  pop_state_changed (bus, 2);   /* pop remaining ready => paused messages off the bus */
278+  pop_state_changed (bus, 2, &had_latency);     /* pop remaining ready => paused messages off the bus */
279   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
280       208);
281-  pop_async_done (bus);
282+  pop_async_done (bus, &had_latency);
283+  pop_latency (bus, &had_latency);
284
285   /* PAUSED => PLAYING */
286   GST_DEBUG ("popping PAUSED -> PLAYING messages");
287@@ -1076,8 +1128,8 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
288   fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
289
290   /* TODO: do we need to check downwards state change order as well? */
291-  pop_state_changed (bus, 4);   /* pop playing => paused messages off the bus */
292-  pop_state_changed (bus, 4);   /* pop paused => ready messages off the bus */
293+  pop_state_changed (bus, 4, &had_latency);     /* pop playing => paused messages off the bus */
294+  pop_state_changed (bus, 4, &had_latency);     /* pop paused => ready messages off the bus */
295
296   GST_DEBUG ("waiting for pipeline to reach refcount 1");
297   while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
298--
299GitLab
300
301