libnice/tests/test-io-stream-common.c
Ole André Vadla Ravnås 216909011e test-io-stream-common: Avoid CPU starvation by yielding
Busy-looping is not a good idea, especially not when run under Valgrind,
where such a thread may result in well-behaved threads running thousands
of times slower. While passing --fair-sched=yes to Valgrind avoids the
issue, it's still better to make our busy-looping less aggressive.
2021-11-22 21:53:08 +00:00

606 lines
17 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* This file is part of the Nice GLib ICE library.
*
* (C) 2014 Collabora Ltd.
* Contact: Philip Withnall
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is the Nice GLib ICE library.
*
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
* Corporation. All Rights Reserved.
*
* Contributors:
* Philip Withnall, Collabora Ltd.
*
* Alternatively, the contents of this file may be used under the terms of the
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
* case the provisions of LGPL are applicable instead of those above. If you
* wish to allow use of your version of this file only under the terms of the
* LGPL and not to allow others to use your version of this file under the
* MPL, indicate your decision by deleting the provisions above and replace
* them with the notice and other provisions required by the LGPL. If you do
* not delete the provisions above, a recipient may use your version of this
* file under either the MPL or the LGPL.
*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "agent.h"
#include "test-io-stream-common.h"
#include <stdlib.h>
#include <string.h>
#ifndef G_OS_WIN32
#include <unistd.h>
#endif
GMutex start_mutex;
GCond start_cond;
gboolean started;
/* Waits about 10 seconds for @var to be NULL/FALSE */
#define WAIT_UNTIL_UNSET(var, context) \
if (var) \
{ \
int i; \
\
for (i = 0; i < 13 && (var); i++) \
{ \
g_usleep (1000 * (1 << i)); \
g_main_context_iteration (context, FALSE); \
} \
\
g_assert_true (!(var)); \
}
static gboolean timer_cb (gpointer pointer)
{
g_debug ("test-thread:%s: %p", G_STRFUNC, pointer);
/* note: should not be reached, abort */
g_debug ("ERROR: test has got stuck, aborting...");
abort();
exit (-1);
}
static void
wait_for_start (TestIOStreamThreadData *data)
{
g_mutex_lock (data->start_mutex);
(*data->start_count)--;
g_cond_broadcast (data->start_cond);
while (*data->start_count > 0)
g_cond_wait (data->start_cond, data->start_mutex);
g_mutex_unlock (data->start_mutex);
}
static gpointer
write_thread_cb (gpointer user_data)
{
TestIOStreamThreadData *data = user_data;
GMainContext *main_context;
GOutputStream *output_stream = NULL;
main_context = g_main_context_new ();
g_main_context_push_thread_default (main_context);
/* Synchronise thread starting. */
wait_for_start (data);
/* Wait for the stream to be writeable. */
g_mutex_lock (&data->write_mutex);
while (!(data->stream_open && data->stream_ready))
g_cond_wait (&data->write_cond, &data->write_mutex);
g_mutex_unlock (&data->write_mutex);
if (data->reliable)
output_stream = g_io_stream_get_output_stream (data->io_stream);
data->callbacks->write_thread (output_stream, data);
g_main_context_pop_thread_default (main_context);
g_main_context_unref (main_context);
return NULL;
}
static gpointer
read_thread_cb (gpointer user_data)
{
TestIOStreamThreadData *data = user_data;
GMainContext *main_context;
GInputStream *input_stream = NULL;
main_context = g_main_context_new ();
g_main_context_push_thread_default (main_context);
/* Synchronise thread starting. */
wait_for_start (data);
if (data->reliable)
input_stream = g_io_stream_get_input_stream (data->io_stream);
data->callbacks->read_thread (input_stream, data);
g_main_context_pop_thread_default (main_context);
g_main_context_unref (main_context);
return NULL;
}
static gpointer
main_thread_cb (gpointer user_data)
{
TestIOStreamThreadData *data = user_data;
g_main_context_push_thread_default (data->main_context);
/* Synchronise thread starting. */
wait_for_start (data);
/* Run the main context. */
g_main_loop_run (data->main_loop);
g_main_context_pop_thread_default (data->main_context);
return NULL;
}
static void
candidate_gathering_done_cb (NiceAgent *agent, guint stream_id,
gpointer user_data)
{
NiceAgent *other = g_object_get_data (G_OBJECT (agent), "other-agent");
gchar *ufrag = NULL, *password = NULL;
GSList *cands, *i;
guint id, other_id;
gpointer tmp;
tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
id = GPOINTER_TO_UINT (tmp);
tmp = g_object_get_data (G_OBJECT (other), "stream-id");
other_id = GPOINTER_TO_UINT (tmp);
nice_agent_get_local_credentials (agent, id, &ufrag, &password);
nice_agent_set_remote_credentials (other,
other_id, ufrag, password);
g_free (ufrag);
g_free (password);
cands = nice_agent_get_local_candidates (agent, id, 1);
g_assert_true (cands != NULL);
nice_agent_set_remote_candidates (other, other_id, 1, cands);
for (i = cands; i; i = i->next)
nice_candidate_free ((NiceCandidate *) i->data);
g_slist_free (cands);
}
static void
reliable_transport_writable_cb (NiceAgent *agent, guint stream_id,
guint component_id, gpointer user_data)
{
TestIOStreamThreadData *data = user_data;
g_assert_true (data->reliable);
/* Signal writeability. */
g_mutex_lock (&data->write_mutex);
data->stream_open = TRUE;
g_cond_broadcast (&data->write_cond);
g_mutex_unlock (&data->write_mutex);
if (data->callbacks->reliable_transport_writable != NULL) {
GIOStream *io_stream;
GOutputStream *output_stream;
io_stream = g_object_get_data (G_OBJECT (agent), "io-stream");
g_assert_true (io_stream != NULL);
output_stream = g_io_stream_get_output_stream (io_stream);
data->callbacks->reliable_transport_writable (output_stream, agent,
stream_id, component_id, data);
}
}
static void
component_state_changed_cb (NiceAgent *agent, guint stream_id,
guint component_id, guint state, gpointer user_data)
{
TestIOStreamThreadData *data = user_data;
if (state != NICE_COMPONENT_STATE_READY)
return;
/* Signal stream state. */
g_mutex_lock (&data->write_mutex);
data->stream_ready = TRUE;
g_cond_broadcast (&data->write_cond);
g_mutex_unlock (&data->write_mutex);
}
static void
new_selected_pair_cb (NiceAgent *agent, guint stream_id, guint component_id,
gchar *lfoundation, gchar *rfoundation, gpointer user_data)
{
TestIOStreamThreadData *data = user_data;
if (data->callbacks->new_selected_pair != NULL) {
data->callbacks->new_selected_pair (agent, stream_id, component_id,
lfoundation, rfoundation, data);
}
}
static NiceAgent *
create_agent (gboolean controlling_mode, TestIOStreamThreadData *data,
GMainContext **main_context, GMainLoop **main_loop,
TestIOStreamOption flags)
{
NiceAgent *agent;
NiceAddress base_addr;
const gchar *stun_server, *stun_server_port;
/* Create main contexts. */
*main_context = g_main_context_new ();
*main_loop = g_main_loop_new (*main_context, FALSE);
if (data->reliable)
agent = nice_agent_new_reliable (*main_context, NICE_COMPATIBILITY_RFC5245);
else
agent = nice_agent_new (*main_context, NICE_COMPATIBILITY_RFC5245);
g_object_set (G_OBJECT (agent),
"controlling-mode", controlling_mode,
"upnp", FALSE,
NULL);
if (flags & TEST_IO_STREAM_OPTION_TCP_ONLY) {
g_object_set (G_OBJECT (agent),
"ice-udp", FALSE,
"ice-tcp", TRUE,
NULL);
}
if (flags & TEST_IO_STREAM_OPTION_BYTESTREAM_TCP) {
g_object_set (G_OBJECT (agent),
"bytestream-tcp", TRUE,
NULL);
}
/* Specify which local interface to use. */
g_assert_true (nice_address_set_from_string (&base_addr, "127.0.0.1"));
nice_agent_add_local_address (agent, &base_addr);
/* Hook up signals. */
g_signal_connect (G_OBJECT (agent), "candidate-gathering-done",
(GCallback) candidate_gathering_done_cb,
GUINT_TO_POINTER (controlling_mode));
g_signal_connect (G_OBJECT (agent), "new-selected-pair",
(GCallback) new_selected_pair_cb, data);
g_signal_connect (G_OBJECT (agent), "component-state-changed",
(GCallback) component_state_changed_cb, data);
if (data->reliable) {
g_signal_connect (G_OBJECT (agent), "reliable-transport-writable",
(GCallback) reliable_transport_writable_cb, data);
} else {
data->stream_open = TRUE;
}
/* Configure the STUN server. */
stun_server = g_getenv ("NICE_STUN_SERVER");
stun_server_port = g_getenv ("NICE_STUN_SERVER_PORT");
if (stun_server != NULL) {
g_object_set (G_OBJECT (agent),
"stun-server", stun_server,
"stun-server-port", atoi (stun_server_port),
NULL);
}
return agent;
}
static void
add_stream (NiceAgent *agent)
{
guint stream_id;
stream_id = nice_agent_add_stream (agent, 2);
g_assert_cmpuint (stream_id, >, 0);
g_object_set_data (G_OBJECT (agent), "stream-id",
GUINT_TO_POINTER (stream_id));
}
static void
swap_credentials (NiceAgent *agent)
{
guint stream_id;
gchar *ufrag, *password;
NiceAgent *other_agent;
guint other_stream_id;
stream_id = GPOINTER_TO_UINT (
g_object_get_data (G_OBJECT (agent), "stream-id"));
nice_agent_get_local_credentials (agent, stream_id, &ufrag, &password);
other_agent = g_object_get_data (G_OBJECT (agent), "other-agent");
other_stream_id = GPOINTER_TO_UINT (
g_object_get_data (G_OBJECT (other_agent), "stream-id"));
nice_agent_set_remote_credentials (other_agent, other_stream_id, ufrag,
password);
g_free (ufrag);
g_free (password);
}
static void
run_agent (TestIOStreamThreadData *data, NiceAgent *agent)
{
guint stream_id;
gpointer tmp;
tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
stream_id = GPOINTER_TO_UINT (tmp);
nice_agent_gather_candidates (agent, stream_id);
if (data->reliable) {
data->io_stream =
G_IO_STREAM (nice_agent_get_io_stream (agent, stream_id, 1));
g_object_set_data (G_OBJECT (agent), "io-stream", data->io_stream);
} else {
data->io_stream = NULL;
}
}
GThread *
spawn_thread (const gchar *thread_name, GThreadFunc thread_func,
gpointer user_data)
{
GThread *thread;
thread = g_thread_new (thread_name, thread_func, user_data);
g_assert_true (thread);
return thread;
}
void
run_io_stream_test (guint deadlock_timeout, gboolean reliable,
const TestIOStreamCallbacks *callbacks,
gpointer l_user_data, GDestroyNotify l_user_data_free,
gpointer r_user_data, GDestroyNotify r_user_data_free,
TestIOStreamOption flags)
{
GMainLoop *error_loop;
GThread *l_main_thread, *r_main_thread;
GThread *l_write_thread, *l_read_thread, *r_write_thread, *r_read_thread;
TestIOStreamThreadData l_data = { NULL }, r_data = { NULL };
GMutex mutex;
GCond cond;
guint start_count = 6;
guint stream_id;
g_mutex_init (&mutex);
g_cond_init (&cond);
error_loop = g_main_loop_new (NULL, FALSE);
/* Set up data structures. */
l_data.reliable = reliable;
l_data.error_loop = error_loop;
l_data.callbacks = callbacks;
l_data.user_data = l_user_data;
l_data.user_data_free = l_user_data_free;
g_cond_init (&l_data.write_cond);
g_mutex_init (&l_data.write_mutex);
l_data.stream_open = FALSE;
l_data.stream_ready = FALSE;
l_data.start_mutex = &mutex;
l_data.start_cond = &cond;
l_data.start_count = &start_count;
r_data.reliable = reliable;
r_data.error_loop = error_loop;
r_data.callbacks = callbacks;
r_data.user_data = r_user_data;
r_data.user_data_free = r_user_data_free;
g_cond_init (&r_data.write_cond);
g_mutex_init (&r_data.write_mutex);
r_data.stream_open = FALSE;
r_data.stream_ready = FALSE;
r_data.start_mutex = &mutex;
r_data.start_cond = &cond;
r_data.start_count = &start_count;
l_data.other = &r_data;
r_data.other = &l_data;
/* Create the L and R agents. */
l_data.agent = create_agent (TRUE, &l_data,
&l_data.main_context, &l_data.main_loop, flags);
r_data.agent = create_agent (FALSE, &r_data,
&r_data.main_context, &r_data.main_loop, flags);
g_object_set_data (G_OBJECT (l_data.agent), "other-agent", r_data.agent);
g_object_set_data (G_OBJECT (r_data.agent), "other-agent", l_data.agent);
/* Add a timer to catch deadlocks. */
g_timeout_add_seconds (deadlock_timeout, timer_cb, NULL);
l_main_thread = spawn_thread ("libnice L main", main_thread_cb, &l_data);
r_main_thread = spawn_thread ("libnice R main", main_thread_cb, &r_data);
add_stream (l_data.agent);
add_stream (r_data.agent);
swap_credentials (l_data.agent);
swap_credentials (r_data.agent);
run_agent (&l_data, l_data.agent);
run_agent (&r_data, r_data.agent);
l_read_thread = spawn_thread ("libnice L read", read_thread_cb, &l_data);
r_read_thread = spawn_thread ("libnice R read", read_thread_cb, &r_data);
if (callbacks->write_thread != NULL) {
l_write_thread = spawn_thread ("libnice L write", write_thread_cb, &l_data);
r_write_thread = spawn_thread ("libnice R write", write_thread_cb, &r_data);
} else {
g_mutex_lock (&mutex);
start_count -= 2;
g_cond_broadcast (&cond);
g_mutex_unlock (&mutex);
l_write_thread = NULL;
r_write_thread = NULL;
}
/* Run loop for error timer */
g_main_loop_run (error_loop);
/* Clean up the main loops and threads. */
stop_main_loop (l_data.main_loop);
stop_main_loop (r_data.main_loop);
g_thread_join (l_read_thread);
g_thread_join (r_read_thread);
if (l_write_thread != NULL)
g_thread_join (l_write_thread);
if (r_write_thread != NULL)
g_thread_join (r_write_thread);
g_thread_join (l_main_thread);
g_thread_join (r_main_thread);
/* Free things. */
if (r_data.user_data_free != NULL)
r_data.user_data_free (r_data.user_data);
if (l_data.user_data_free != NULL)
l_data.user_data_free (l_data.user_data);
g_cond_clear (&r_data.write_cond);
g_mutex_clear (&r_data.write_mutex);
g_cond_clear (&l_data.write_cond);
g_mutex_clear (&l_data.write_mutex);
if (r_data.io_stream != NULL)
g_object_unref (r_data.io_stream);
if (l_data.io_stream != NULL)
g_object_unref (l_data.io_stream);
stream_id =
GPOINTER_TO_UINT (g_object_get_data (G_OBJECT (r_data.agent), "stream-id"));
if (stream_id != 0)
nice_agent_remove_stream (r_data.agent, stream_id);
stream_id =
GPOINTER_TO_UINT (g_object_get_data (G_OBJECT (l_data.agent), "stream-id"));
if (stream_id != 0)
nice_agent_remove_stream (l_data.agent, stream_id);
g_object_add_weak_pointer (G_OBJECT (r_data.agent),
(gpointer *) &r_data.agent);
g_object_add_weak_pointer (G_OBJECT (l_data.agent),
(gpointer *) &l_data.agent);
g_object_unref (r_data.agent);
g_object_unref (l_data.agent);
WAIT_UNTIL_UNSET (r_data.agent, r_data.main_context);
WAIT_UNTIL_UNSET (l_data.agent, l_data.main_context);
g_main_loop_unref (r_data.main_loop);
g_main_loop_unref (l_data.main_loop);
g_main_context_unref (r_data.main_context);
g_main_context_unref (l_data.main_context);
g_main_loop_unref (error_loop);
g_mutex_clear (&mutex);
g_cond_clear (&cond);
}
/* Once weve received all the expected bytes, wait to finish sending all bytes,
* then send and wait for the close message. Finally, remove the stream.
*
* This must only be called from the read thread implementation. */
void
check_for_termination (TestIOStreamThreadData *data, gsize *recv_count,
gsize *other_recv_count, volatile gsize *send_count, gsize expected_recv_count)
{
guint stream_id;
gpointer tmp;
GError *error = NULL;
/* Wait for transmission to complete. */
while (*send_count < expected_recv_count) {
if (data->callbacks->wait_transmission_cb) {
data->callbacks->wait_transmission_cb (data->agent);
} else {
g_thread_yield ();
}
}
/* Send a close message. */
tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
stream_id = GPOINTER_TO_UINT (tmp);
/* Can't be certain enough to test for termination on non-reliable streams.
* There may be packet losses, etc
*/
if (data->io_stream) {
gssize len;
g_output_stream_close (g_io_stream_get_output_stream (data->io_stream),
NULL, &error);
g_assert_no_error (error);
len = g_input_stream_skip (g_io_stream_get_input_stream (data->io_stream),
1024 * 1024, NULL, &error);
g_assert_no_error (error);
g_assert_cmpint (len, ==, 0);
}
/* Remove the stream and run away. */
nice_agent_remove_stream (data->agent, stream_id);
g_object_set_data (G_OBJECT (data->agent), "stream-id", GUINT_TO_POINTER (0));
g_clear_object (&data->io_stream);
data->done = TRUE;
if (data->other->done)
g_main_loop_quit (data->error_loop);
}
static gboolean
stop_main_loop_when_idle (gpointer data)
{
GMainLoop *loop = data;
g_main_loop_quit (loop);
return G_SOURCE_REMOVE;
}
void
stop_main_loop (GMainLoop *loop)
{
GSource *src = g_idle_source_new ();
g_source_set_callback (src, stop_main_loop_when_idle,
g_main_loop_ref (loop), (GDestroyNotify) g_main_loop_unref);
g_source_attach (src, g_main_loop_get_context (loop));
g_source_unref (src);
}