diff options
Diffstat (limited to 'libs/gst/net/gstnettimeprovider.c')
-rw-r--r-- | libs/gst/net/gstnettimeprovider.c | 361 |
1 files changed, 131 insertions, 230 deletions
diff --git a/libs/gst/net/gstnettimeprovider.c b/libs/gst/net/gstnettimeprovider.c index c1b4cac..f008c00 100644 --- a/libs/gst/net/gstnettimeprovider.c +++ b/libs/gst/net/gstnettimeprovider.c @@ -42,39 +42,14 @@ #include "gstnettimeprovider.h" #include "gstnettimepacket.h" -#include <glib.h> - -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif - -#if defined (_MSC_VER) && _MSC_VER >= 1400 -#include <io.h> -#endif - -#ifndef G_OS_WIN32 -#include <sys/ioctl.h> -#endif - -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include <sys/filio.h> -#endif - GST_DEBUG_CATEGORY_STATIC (ntp_debug); #define GST_CAT_DEFAULT (ntp_debug) -#ifdef G_OS_WIN32 -#define close(sock) closesocket(sock) -#endif - #define DEFAULT_ADDRESS "0.0.0.0" #define DEFAULT_PORT 5637 -#define IS_ACTIVE(self) (g_atomic_int_get (&((self)->active.active))) +#define IS_ACTIVE(self) (g_atomic_int_get (&((self)->priv->active))) -#ifdef G_OS_WIN32 -#define setsockopt(sock, sol_flags, reuse_flags, ru, sizeofru) setsockopt (sock, sol_flags, reuse_flags, (char *)ru, sizeofru) -#endif enum { PROP_0, @@ -82,7 +57,6 @@ enum PROP_ADDRESS, PROP_CLOCK, PROP_ACTIVE - /* FILL ME */ }; #define GST_NET_TIME_PROVIDER_GET_PRIVATE(obj) \ @@ -90,8 +64,17 @@ enum struct _GstNetTimeProviderPrivate { - GstPollFD sock; - GstPoll *fdset; + gchar *address; + int port; + + GThread *thread; + + GstClock *clock; + + gboolean active; /* ATOMIC */ + + GSocket *socket; + GCancellable *cancel; }; static gboolean gst_net_time_provider_start (GstNetTimeProvider * bself); @@ -112,18 +95,6 @@ static void gst_net_time_provider_get_property (GObject * object, guint prop_id, G_DEFINE_TYPE_WITH_CODE (GstNetTimeProvider, gst_net_time_provider, GST_TYPE_OBJECT, _do_init); -#ifdef G_OS_WIN32 -static int -inet_aton (const char *c, struct in_addr *paddr) -{ - paddr->s_addr = inet_addr (c); - if (paddr->s_addr == INADDR_NONE) - return 0; - - return 1; -} -#endif - static void gst_net_time_provider_class_init (GstNetTimeProviderClass * klass) { @@ -160,24 +131,12 @@ gst_net_time_provider_class_init (GstNetTimeProviderClass * klass) static void gst_net_time_provider_init (GstNetTimeProvider * self) { -#ifdef G_OS_WIN32 - WSADATA w; - int error = WSAStartup (0x0202, &w); - - if (error) { - GST_DEBUG_OBJECT (self, "Error on WSAStartup"); - } - if (w.wVersion != 0x0202) { - WSACleanup (); - } -#endif self->priv = GST_NET_TIME_PROVIDER_GET_PRIVATE (self); - self->port = DEFAULT_PORT; - self->priv->sock.fd = -1; - self->address = g_strdup (DEFAULT_ADDRESS); - self->thread = NULL; - self->active.active = TRUE; + self->priv->port = DEFAULT_PORT; + self->priv->address = g_strdup (DEFAULT_ADDRESS); + self->priv->thread = NULL; + self->priv->active = TRUE; } static void @@ -185,26 +144,17 @@ gst_net_time_provider_finalize (GObject * object) { GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object); - if (self->thread) { + if (self->priv->thread) { gst_net_time_provider_stop (self); - g_assert (self->thread == NULL); - } - - if (self->priv->fdset) { - gst_poll_free (self->priv->fdset); - self->priv->fdset = NULL; + g_assert (self->priv->thread == NULL); } - g_free (self->address); - self->address = NULL; - - if (self->clock) - gst_object_unref (self->clock); - self->clock = NULL; + g_free (self->priv->address); + self->priv->address = NULL; -#ifdef G_OS_WIN32 - WSACleanup (); -#endif + if (self->priv->clock) + gst_object_unref (self->priv->clock); + self->priv->clock = NULL; G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -213,75 +163,56 @@ static gpointer gst_net_time_provider_thread (gpointer data) { GstNetTimeProvider *self = data; - struct sockaddr_in tmpaddr; - socklen_t len; + GCancellable *cancel = self->priv->cancel; + GSocket *socket = self->priv->socket; GstNetTimePacket *packet; - gint ret; - - while (TRUE) { - GST_LOG_OBJECT (self, "doing select"); - ret = gst_poll_wait (self->priv->fdset, GST_CLOCK_TIME_NONE); - GST_LOG_OBJECT (self, "select returned %d", ret); - - if (ret <= 0) { - if (errno == EBUSY) { - GST_LOG_OBJECT (self, "stop"); - goto stopped; - } else if (errno != EAGAIN && errno != EINTR) - goto select_error; - else - continue; - } else { - /* got data in */ - len = sizeof (struct sockaddr); + GError *err = NULL; - packet = gst_net_time_packet_receive (self->priv->sock.fd, - (struct sockaddr *) &tmpaddr, &len); + GST_INFO_OBJECT (self, "time provider thread is running"); - if (!packet) - goto receive_error; - - if (IS_ACTIVE (self)) { - /* do what we were asked to and send the packet back */ - packet->remote_time = gst_clock_get_time (self->clock); + while (TRUE) { + GSocketAddress *sender_addr = NULL; - /* ignore errors */ - gst_net_time_packet_send (packet, self->priv->sock.fd, - (struct sockaddr *) &tmpaddr, len); - } + GST_LOG_OBJECT (self, "waiting on socket"); + if (!g_socket_condition_wait (socket, G_IO_IN, cancel, &err)) { + GST_INFO_OBJECT (self, "socket error: %s", err->message); - g_free (packet); + if (err->code == G_IO_ERROR_CANCELLED) + break; + /* try again */ + g_usleep (G_USEC_PER_SEC / 10); + g_error_free (err); + err = NULL; continue; } - g_assert_not_reached (); + /* got data in */ + packet = gst_net_time_packet_receive (socket, &sender_addr, &err); - /* log errors and keep going */ - select_error: - { - GST_DEBUG_OBJECT (self, "select error %d: %s (%d)", ret, - g_strerror (errno), errno); - continue; - } - stopped: - { - GST_DEBUG_OBJECT (self, "shutting down"); - /* close socket */ - return NULL; - } - receive_error: - { - GST_DEBUG_OBJECT (self, "receive error"); + if (err != NULL) { + GST_DEBUG_OBJECT (self, "receive error: %s", err->message); + g_usleep (G_USEC_PER_SEC / 10); + g_error_free (err); + err = NULL; continue; } - g_assert_not_reached (); + if (IS_ACTIVE (self)) { + /* do what we were asked to and send the packet back */ + packet->remote_time = gst_clock_get_time (self->priv->clock); + /* ignore errors */ + gst_net_time_packet_send (packet, socket, sender_addr, NULL); + g_object_unref (sender_addr); + g_free (packet); + } } - g_assert_not_reached (); + if (err != NULL) + g_error_free (err); + GST_INFO_OBJECT (self, "time provider thread is stopping"); return NULL; } @@ -290,25 +221,25 @@ gst_net_time_provider_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object); - GstClock **clock_p = &self->clock; + GstClock **clock_p = &self->priv->clock; switch (prop_id) { case PROP_PORT: - self->port = g_value_get_int (value); + self->priv->port = g_value_get_int (value); break; case PROP_ADDRESS: - g_free (self->address); + g_free (self->priv->address); if (g_value_get_string (value) == NULL) - self->address = g_strdup (DEFAULT_ADDRESS); + self->priv->address = g_strdup (DEFAULT_ADDRESS); else - self->address = g_strdup (g_value_get_string (value)); + self->priv->address = g_strdup (g_value_get_string (value)); break; case PROP_CLOCK: gst_object_replace ((GstObject **) clock_p, (GstObject *) g_value_get_object (value)); break; case PROP_ACTIVE: - g_atomic_int_set (&self->active.active, g_value_get_boolean (value)); + g_atomic_int_set (&self->priv->active, g_value_get_boolean (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -324,13 +255,13 @@ gst_net_time_provider_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PORT: - g_value_set_int (value, self->port); + g_value_set_int (value, self->priv->port); break; case PROP_ADDRESS: - g_value_set_string (value, self->address); + g_value_set_string (value, self->priv->address); break; case PROP_CLOCK: - g_value_set_object (value, self->clock); + g_value_set_object (value, self->priv->clock); break; case PROP_ACTIVE: g_value_set_boolean (value, IS_ACTIVE (self)); @@ -344,112 +275,86 @@ gst_net_time_provider_get_property (GObject * object, guint prop_id, static gboolean gst_net_time_provider_start (GstNetTimeProvider * self) { - gint ru; - struct sockaddr_in my_addr; - socklen_t len; + GSocketAddress *socket_addr, *bound_addr; + GInetAddress *inet_addr; + GSocket *socket; + GError *err = NULL; int port; - gint ret; - GError *error; - if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0) + if (self->priv->address) { + inet_addr = g_inet_address_new_from_string (self->priv->address); + if (inet_addr == NULL) + goto invalid_address; + } else { + inet_addr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4); + } + + GST_LOG_OBJECT (self, "creating socket"); + socket = g_socket_new (g_inet_address_get_family (inet_addr), + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err); + + if (err != NULL) goto no_socket; - self->priv->sock.fd = ret; - - ru = 1; - ret = - setsockopt (self->priv->sock.fd, SOL_SOCKET, SO_REUSEADDR, &ru, - sizeof (ru)); - if (ret < 0) - goto setsockopt_error; - - memset (&my_addr, 0, sizeof (my_addr)); - my_addr.sin_family = AF_INET; /* host byte order */ - my_addr.sin_port = htons ((gint16) self->port); /* short, network byte order */ - my_addr.sin_addr.s_addr = INADDR_ANY; - if (self->address) { - ret = inet_aton (self->address, &my_addr.sin_addr); - if (ret == 0) - goto invalid_address_error; - } + GST_DEBUG_OBJECT (self, "binding on port %d", self->priv->port); + socket_addr = g_inet_socket_address_new (inet_addr, self->priv->port); + g_socket_bind (socket, socket_addr, TRUE, &err); + g_object_unref (socket_addr); + g_object_unref (inet_addr); - GST_DEBUG_OBJECT (self, "binding on port %d", self->port); - ret = - bind (self->priv->sock.fd, (struct sockaddr *) &my_addr, - sizeof (my_addr)); - if (ret < 0) + if (err != NULL) goto bind_error; - len = sizeof (my_addr); - ret = getsockname (self->priv->sock.fd, (struct sockaddr *) &my_addr, &len); - if (ret < 0) - goto getsockname_error; - - port = ntohs (my_addr.sin_port); - GST_DEBUG_OBJECT (self, "bound, on port %d", port); + bound_addr = g_socket_get_local_address (socket, NULL); + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (bound_addr)); + GST_DEBUG_OBJECT (self, "bound on UDP port %d", port); + g_object_unref (bound_addr); - if (port != self->port) { - self->port = port; - GST_DEBUG_OBJECT (self, "notifying %d", port); + if (port != self->priv->port) { + self->priv->port = port; + GST_DEBUG_OBJECT (self, "notifying port %d", port); g_object_notify (G_OBJECT (self), "port"); } - gst_poll_add_fd (self->priv->fdset, &self->priv->sock); - gst_poll_fd_ctl_read (self->priv->fdset, &self->priv->sock, TRUE); + self->priv->socket = socket; + self->priv->cancel = g_cancellable_new (); - self->thread = g_thread_create (gst_net_time_provider_thread, self, TRUE, - &error); - if (!self->thread) + self->priv->thread = g_thread_try_new ("GstNetTimeProvider", + gst_net_time_provider_thread, self, &err); + + if (err != NULL) goto no_thread; return TRUE; /* ERRORS */ -no_socket: +invalid_address: { - GST_ERROR_OBJECT (self, "socket failed %d: %s (%d)", ret, - g_strerror (errno), errno); + GST_ERROR_OBJECT (self, "invalid address: %s", self->priv->address); return FALSE; } -setsockopt_error: - { - close (self->priv->sock.fd); - self->priv->sock.fd = -1; - GST_ERROR_OBJECT (self, "setsockopt failed %d: %s (%d)", ret, - g_strerror (errno), errno); - return FALSE; - } -invalid_address_error: +no_socket: { - close (self->priv->sock.fd); - self->priv->sock.fd = -1; - GST_ERROR_OBJECT (self, "invalid network address %s: %s (%d)", - self->address, g_strerror (errno), errno); + GST_ERROR_OBJECT (self, "could not create socket: %s", err->message); + g_error_free (err); + g_object_unref (inet_addr); return FALSE; } bind_error: { - close (self->priv->sock.fd); - self->priv->sock.fd = -1; - GST_ERROR_OBJECT (self, "bind failed %d: %s (%d)", ret, - g_strerror (errno), errno); - return FALSE; - } -getsockname_error: - { - close (self->priv->sock.fd); - self->priv->sock.fd = -1; - GST_ERROR_OBJECT (self, "getsockname failed %d: %s (%d)", ret, - g_strerror (errno), errno); + GST_ERROR_OBJECT (self, "bind failed: %s", err->message); + g_error_free (err); + g_object_unref (socket); return FALSE; } no_thread: { - gst_poll_remove_fd (self->priv->fdset, &self->priv->sock); - close (self->priv->sock.fd); - self->priv->sock.fd = -1; - GST_ERROR_OBJECT (self, "could not create thread: %s", error->message); - g_error_free (error); + GST_ERROR_OBJECT (self, "could not create thread: %s", err->message); + g_error_free (err); + g_object_unref (self->priv->socket); + self->priv->socket = NULL; + g_object_unref (self->priv->cancel); + self->priv->cancel = NULL; return FALSE; } } @@ -457,15 +362,21 @@ no_thread: static void gst_net_time_provider_stop (GstNetTimeProvider * self) { - gst_poll_set_flushing (self->priv->fdset, TRUE); - g_thread_join (self->thread); - self->thread = NULL; - - if (self->priv->sock.fd != -1) { - gst_poll_remove_fd (self->priv->fdset, &self->priv->sock); - close (self->priv->sock.fd); - self->priv->sock.fd = -1; - } + g_return_if_fail (self->priv->thread != NULL); + + GST_INFO_OBJECT (self, "stopping.."); + g_cancellable_cancel (self->priv->cancel); + + g_thread_join (self->priv->thread); + self->priv->thread = NULL; + + g_object_unref (self->priv->cancel); + self->priv->cancel = NULL; + + g_object_unref (self->priv->socket); + self->priv->socket = NULL; + + GST_INFO_OBJECT (self, "stopped"); } /** @@ -490,22 +401,12 @@ gst_net_time_provider_new (GstClock * clock, const gchar * address, gint port) ret = g_object_new (GST_TYPE_NET_TIME_PROVIDER, "clock", clock, "address", address, "port", port, NULL); - if ((ret->priv->fdset = gst_poll_new (TRUE)) == NULL) - goto no_fdset; - if (!gst_net_time_provider_start (ret)) goto failed_start; /* all systems go, cap'n */ return ret; -no_fdset: - { - GST_ERROR_OBJECT (ret, "could not create an fdset: %s (%d)", - g_strerror (errno), errno); - gst_object_unref (ret); - return NULL; - } failed_start: { /* already printed a nice error */ |