diff options
Diffstat (limited to 'ext/curl/gstcurlbasesink.c')
-rw-r--r-- | ext/curl/gstcurlbasesink.c | 1183 |
1 files changed, 1183 insertions, 0 deletions
diff --git a/ext/curl/gstcurlbasesink.c b/ext/curl/gstcurlbasesink.c new file mode 100644 index 00000000..680be023 --- /dev/null +++ b/ext/curl/gstcurlbasesink.c @@ -0,0 +1,1183 @@ +/* GStreamer + * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-curlsink + * @short_description: sink that uploads data to a server using libcurl + * @see_also: + * + * This is a network sink that uses libcurl as a client to upload data to + * a server (e.g. a HTTP/FTP server). + * + * <refsect2> + * <title>Example launch line (upload a JPEG file to an HTTP server)</title> + * |[ + * gst-launch filesrc location=image.jpg ! jpegparse ! curlsink \ + * file-name=image.jpg \ + * location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/ \ + * user=test passwd=test \ + * content-type=image/jpeg \ + * use-content-length=false + * ]| + * </refsect2> + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <curl/curl.h> +#include <string.h> +#include <stdio.h> + +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "gstcurlbasesink.h" + +/* Default values */ +#define GST_CAT_DEFAULT gst_curl_base_sink_debug +#define DEFAULT_URL "localhost:5555" +#define DEFAULT_TIMEOUT 30 +#define DEFAULT_QOS_DSCP 0 + +#define DSCP_MIN 0 +#define DSCP_MAX 63 + + +/* Plugin specific settings */ +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug); + +enum +{ + PROP_0, + PROP_LOCATION, + PROP_USER_NAME, + PROP_USER_PASSWD, + PROP_FILE_NAME, + PROP_TIMEOUT, + PROP_QOS_DSCP +}; + +/* Object class function declarations */ +static void gst_curl_base_sink_finalize (GObject * gobject); +static void gst_curl_base_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_curl_base_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +/* BaseSink class function declarations */ +static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink, + GstBuffer * buf); +static gboolean gst_curl_base_sink_event (GstBaseSink * bsink, + GstEvent * event); +static gboolean gst_curl_base_sink_start (GstBaseSink * bsink); +static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink); +static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink); +static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink); + +/* private functions */ + +static gboolean gst_curl_base_sink_transfer_setup_unlocked + (GstCurlBaseSink * sink); +static gboolean gst_curl_base_sink_transfer_start_unlocked + (GstCurlBaseSink * sink); +static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink); +static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size, + size_t nmemb, void *stream); +static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size, + size_t nmemb, void *stream); +static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink, + void *curl_ptr, size_t block_size, guint * last_chunk); +static int gst_curl_base_sink_transfer_socket_cb (void *clientp, + curl_socket_t curlfd, curlsocktype purpose); +static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data); +static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink); +static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink); + +static gboolean gst_curl_base_sink_wait_for_data_unlocked + (GstCurlBaseSink * sink); +static void gst_curl_base_sink_new_file_notify_unlocked + (GstCurlBaseSink * sink); +static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked + (GstCurlBaseSink * sink); +static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink); +static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink); +static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink); + +static void handle_transfer (GstCurlBaseSink * sink); +static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf, + size_t max_bytes_to_send, guint * last_chunk); + +#define parent_class gst_curl_base_sink_parent_class +G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK); + +static void +gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass; + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0, + "curl base sink element"); + GST_DEBUG_OBJECT (klass, "class_init"); + + gst_element_class_set_details_simple (element_class, + "Curl base sink", + "Sink/Network", + "Upload data over the network to a server using libcurl", + "Patricia Muscalu <patricia@axis.com>"); + + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event); + gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render); + gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock); + gstbasesink_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop); + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize); + + gobject_class->set_property = gst_curl_base_sink_set_property; + gobject_class->get_property = gst_curl_base_sink_get_property; + + klass->handle_transfer = handle_transfer; + klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb; + klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer; + + /* FIXME: check against souphttpsrc and use same names for same properties */ + g_object_class_install_property (gobject_class, PROP_LOCATION, + g_param_spec_string ("location", "Location", + "URI location to write to", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USER_NAME, + g_param_spec_string ("user", "User name", + "User name to use for server authentication", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USER_PASSWD, + g_param_spec_string ("passwd", "User password", + "User password to use for server authentication", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_FILE_NAME, + g_param_spec_string ("file-name", "Base file name", + "The base file name for the uploaded images", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_TIMEOUT, + g_param_spec_int ("timeout", "Timeout", + "Number of seconds waiting to write before timeout", + 0, G_MAXINT, DEFAULT_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_QOS_DSCP, + g_param_spec_int ("qos-dscp", + "QoS diff srv code point", + "Quality of Service, differentiated services code point (0 default)", + DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&sinktemplate)); +} + +static void +gst_curl_base_sink_init (GstCurlBaseSink * sink) +{ + sink->transfer_buf = g_malloc (sizeof (TransferBuffer)); + sink->transfer_cond = g_malloc (sizeof (TransferCondition)); + sink->transfer_cond->cond = g_cond_new (); + sink->transfer_cond->data_sent = FALSE; + sink->transfer_cond->data_available = FALSE; + sink->transfer_cond->wait_for_response = FALSE; + sink->timeout = DEFAULT_TIMEOUT; + sink->qos_dscp = DEFAULT_QOS_DSCP; + sink->url = g_strdup (DEFAULT_URL); + sink->transfer_thread_close = FALSE; + sink->new_file = TRUE; + sink->flow_ret = GST_FLOW_OK; + sink->is_live = FALSE; +} + +static void +gst_curl_base_sink_finalize (GObject * gobject) +{ + GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject); + + GST_DEBUG ("finalizing curlsink"); + if (this->transfer_thread != NULL) { + g_thread_join (this->transfer_thread); + } + + gst_curl_base_sink_transfer_cleanup (this); + g_cond_free (this->transfer_cond->cond); + g_free (this->transfer_cond); + g_free (this->transfer_buf); + + g_free (this->url); + g_free (this->user); + g_free (this->passwd); + g_free (this->file_name); + if (this->fdset != NULL) { + gst_poll_free (this->fdset); + this->fdset = NULL; + } + G_OBJECT_CLASS (parent_class)->finalize (gobject); +} + +void +gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink) +{ + GST_LOG ("more data to send"); + + sink->transfer_cond->data_available = TRUE; + sink->transfer_cond->data_sent = FALSE; + sink->transfer_cond->wait_for_response = TRUE; + g_cond_signal (sink->transfer_cond->cond); +} + +void +gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink) +{ + GST_OBJECT_LOCK (sink); + GST_LOG_OBJECT (sink, "setting transfer thread close flag"); + sink->transfer_thread_close = TRUE; + g_cond_signal (sink->transfer_cond->cond); + GST_OBJECT_UNLOCK (sink); + + if (sink->transfer_thread != NULL) { + GST_LOG_OBJECT (sink, "waiting for transfer thread to finish"); + g_thread_join (sink->transfer_thread); + sink->transfer_thread = NULL; + } +} + +void +gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live) +{ + g_return_if_fail (GST_IS_CURL_BASE_SINK (sink)); + + GST_OBJECT_LOCK (sink); + sink->is_live = live; + GST_OBJECT_UNLOCK (sink); +} + +gboolean +gst_curl_base_sink_is_live (GstCurlBaseSink * sink) +{ + gboolean result; + + g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE); + + GST_OBJECT_LOCK (sink); + result = sink->is_live; + GST_OBJECT_UNLOCK (sink); + + return result; +} + +static GstFlowReturn +gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf) +{ + GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink); + GstMapInfo map; + guint8 *data; + size_t size; + GstFlowReturn ret; + + GST_LOG ("enter render"); + + sink = GST_CURL_BASE_SINK (bsink); + gst_buffer_map (buf, &map, GST_MAP_READ); + data = map.data; + size = map.size; + + GST_OBJECT_LOCK (sink); + + /* check if the transfer thread has encountered problems while the + * pipeline thread was working elsewhere */ + if (sink->flow_ret != GST_FLOW_OK) { + goto done; + } + + g_assert (sink->transfer_cond->data_available == FALSE); + + /* if there is no transfer thread created, lets create one */ + if (sink->transfer_thread == NULL) { + if (!gst_curl_base_sink_transfer_start_unlocked (sink)) { + sink->flow_ret = GST_FLOW_ERROR; + goto done; + } + } + + /* make data available for the transfer thread and notify */ + sink->transfer_buf->ptr = data; + sink->transfer_buf->len = size; + sink->transfer_buf->offset = 0; + gst_curl_base_sink_transfer_thread_notify_unlocked (sink); + + /* wait for the transfer thread to send the data. This will be notified + * either when transfer is completed by the curl read callback or by + * the thread function if an error has occured. */ + gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink); + +done: + ret = sink->flow_ret; + GST_OBJECT_UNLOCK (sink); + gst_buffer_unmap (buf, &map); + + GST_LOG ("exit render"); + + return ret; +} + +static gboolean +gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event) +{ + GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink); + GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + + switch (event->type) { + case GST_EVENT_EOS: + GST_DEBUG_OBJECT (sink, "received EOS"); + gst_curl_base_sink_transfer_thread_close (sink); + gst_curl_base_sink_wait_for_response (sink); + break; + case GST_EVENT_CAPS: + if (klass->set_mime_type) { + GstCaps *caps; + gst_event_parse_caps (event, &caps); + klass->set_mime_type (sink, caps); + } + break; + default: + break; + } + return TRUE; +} + +static gboolean +gst_curl_base_sink_start (GstBaseSink * bsink) +{ + GstCurlBaseSink *sink; + + sink = GST_CURL_BASE_SINK (bsink); + + /* reset flags */ + sink->transfer_cond->data_sent = FALSE; + sink->transfer_cond->data_available = FALSE; + sink->transfer_cond->wait_for_response = FALSE; + sink->transfer_thread_close = FALSE; + sink->new_file = TRUE; + sink->flow_ret = GST_FLOW_OK; + + if ((sink->fdset = gst_poll_new (TRUE)) == NULL) { + GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, + ("gst_poll_new failed: %s", g_strerror (errno)), (NULL)); + return FALSE; + } + + return TRUE; +} + +static gboolean +gst_curl_base_sink_stop (GstBaseSink * bsink) +{ + GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink); + + gst_curl_base_sink_transfer_thread_close (sink); + if (sink->fdset != NULL) { + gst_poll_free (sink->fdset); + sink->fdset = NULL; + } + + return TRUE; +} + +static gboolean +gst_curl_base_sink_unlock (GstBaseSink * bsink) +{ + GstCurlBaseSink *sink; + + sink = GST_CURL_BASE_SINK (bsink); + + GST_LOG_OBJECT (sink, "Flushing"); + gst_poll_set_flushing (sink->fdset, TRUE); + + return TRUE; +} + +static gboolean +gst_curl_base_sink_unlock_stop (GstBaseSink * bsink) +{ + GstCurlBaseSink *sink; + + sink = GST_CURL_BASE_SINK (bsink); + + GST_LOG_OBJECT (sink, "No longer flushing"); + gst_poll_set_flushing (sink->fdset, FALSE); + + return TRUE; +} + +static void +gst_curl_base_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstCurlBaseSink *sink; + GstState cur_state; + + g_return_if_fail (GST_IS_CURL_BASE_SINK (object)); + sink = GST_CURL_BASE_SINK (object); + + gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0); + if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) { + GST_OBJECT_LOCK (sink); + + switch (prop_id) { + case PROP_LOCATION: + g_free (sink->url); + sink->url = g_value_dup_string (value); + GST_DEBUG_OBJECT (sink, "url set to %s", sink->url); + break; + case PROP_USER_NAME: + g_free (sink->user); + sink->user = g_value_dup_string (value); + GST_DEBUG_OBJECT (sink, "user set to %s", sink->user); + break; + case PROP_USER_PASSWD: + g_free (sink->passwd); + sink->passwd = g_value_dup_string (value); + GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd); + break; + case PROP_FILE_NAME: + g_free (sink->file_name); + sink->file_name = g_value_dup_string (value); + GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name); + break; + case PROP_TIMEOUT: + sink->timeout = g_value_get_int (value); + GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout); + break; + case PROP_QOS_DSCP: + sink->qos_dscp = g_value_get_int (value); + gst_curl_base_sink_setup_dscp_unlocked (sink); + GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp); + break; + default: + GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id); + break; + } + + GST_OBJECT_UNLOCK (sink); + + return; + } + + /* in PLAYING or PAUSED state */ + GST_OBJECT_LOCK (sink); + + switch (prop_id) { + case PROP_FILE_NAME: + g_free (sink->file_name); + sink->file_name = g_value_dup_string (value); + GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name); + gst_curl_base_sink_new_file_notify_unlocked (sink); + break; + case PROP_TIMEOUT: + sink->timeout = g_value_get_int (value); + GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout); + break; + case PROP_QOS_DSCP: + sink->qos_dscp = g_value_get_int (value); + gst_curl_base_sink_setup_dscp_unlocked (sink); + GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp); + break; + default: + GST_WARNING_OBJECT (sink, "cannot set property when PLAYING"); + break; + } + + GST_OBJECT_UNLOCK (sink); +} + +static void +gst_curl_base_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstCurlBaseSink *sink; + + g_return_if_fail (GST_IS_CURL_BASE_SINK (object)); + sink = GST_CURL_BASE_SINK (object); + + switch (prop_id) { + case PROP_LOCATION: + g_value_set_string (value, sink->url); + break; + case PROP_USER_NAME: + g_value_set_string (value, sink->user); + break; + case PROP_USER_PASSWD: + g_value_set_string (value, sink->passwd); + break; + case PROP_FILE_NAME: + g_value_set_string (value, sink->file_name); + break; + case PROP_TIMEOUT: + g_value_set_int (value, sink->timeout); + break; + case PROP_QOS_DSCP: + g_value_set_int (value, sink->qos_dscp); + break; + default: + GST_DEBUG_OBJECT (sink, "invalid property id"); + break; + } +} + +static gboolean +gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink) +{ + GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + +#ifdef DEBUG + curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1); +#endif + + curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url); + + curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout); + + /* using signals in a multithreaded application is dangeous */ + curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1); + + /* socket settings */ + curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink); + curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION, + gst_curl_base_sink_transfer_socket_cb); + + curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION, klass->transfer_read_cb); + curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink); + curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION, + gst_curl_base_sink_transfer_write_cb); + curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink); + + return TRUE; +} + +static gboolean +gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink) +{ + gboolean res = FALSE; + GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + + gst_curl_base_sink_transfer_set_common_options_unlocked (sink); + + /* authentication settings */ + if (sink->user != NULL && strlen (sink->user)) { + curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user); + curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd); + } + + if (klass->set_options_unlocked) { + res = klass->set_options_unlocked (sink); + } + + return res; +} + +static size_t +transfer_data_buffer (void *curl_ptr, TransferBuffer * buf, + size_t max_bytes_to_send, guint * last_chunk) +{ + guint buf_len = buf->len; + size_t bytes_to_send = MIN (max_bytes_to_send, buf->len); + + memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send); + buf->offset = buf->offset + bytes_to_send; + buf->len = buf->len - bytes_to_send; + + /* the last data chunk */ + if (bytes_to_send == buf_len) { + buf->offset = 0; + buf->len = 0; + *last_chunk = 1; + } + + GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send); + + return bytes_to_send; +} + +static size_t +gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink, + void *curl_ptr, size_t block_size, guint * last_chunk) +{ + TransferBuffer *buffer; + size_t bytes_to_send; + + buffer = sink->transfer_buf; + GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT, + buffer->len, buffer->offset); + + if (buffer->len <= 0) { + GST_WARNING ("got zero- or negative-length buffer"); + + return 0; + } + + /* more data in buffer(s) */ + bytes_to_send = transfer_data_buffer (curl_ptr, sink->transfer_buf, + block_size, last_chunk); + + return bytes_to_send; +} + +static size_t +gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb, + void *stream) +{ + GstCurlBaseSink *sink; + GstCurlBaseSinkClass *klass; + size_t max_bytes_to_send; + size_t bytes_to_send; + guint last_chunk = 0; + + sink = (GstCurlBaseSink *) stream; + klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + + max_bytes_to_send = size * nmemb; + + /* wait for data to come available, if new file or thread close is set + * then zero will be returned to indicate end of current transfer */ + GST_OBJECT_LOCK (sink); + if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) { + if (klass->flush_data_unlocked) { + bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr, + max_bytes_to_send, sink->new_file); + + GST_OBJECT_UNLOCK (sink); + + return bytes_to_send; + } + + GST_OBJECT_UNLOCK (sink); + GST_LOG ("returning 0, no more data to send in this file"); + + return 0; + } + + GST_OBJECT_UNLOCK (sink); + + bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr, + max_bytes_to_send, &last_chunk); + + /* the last data chunk */ + if (last_chunk) { + gst_curl_base_sink_data_sent_notify (sink); + } + + return bytes_to_send; +} + +static size_t +gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size, + size_t nmemb, void G_GNUC_UNUSED * stream) +{ + GstCurlBaseSink *sink; + GstCurlBaseSinkClass *klass; + size_t realsize = size * nmemb; + + sink = (GstCurlBaseSink *) stream; + klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + + if (klass->transfer_verify_response_code) { + if (!klass->transfer_verify_response_code (sink)) { + GST_DEBUG_OBJECT (sink, "response error"); + GST_OBJECT_LOCK (sink); + sink->flow_ret = GST_FLOW_ERROR; + GST_OBJECT_UNLOCK (sink); + } + } + + GST_DEBUG ("response %s", (gchar *) ptr); + + return realsize; +} + +CURLcode +gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink) +{ + CURLcode code = CURLE_OK; + CURL *easy; + CURLMsg *msg; + gint msgs_left; + gchar *eff_url = NULL; + + do { + easy = NULL; + while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) { + if (msg->msg == CURLMSG_DONE) { + easy = msg->easy_handle; + code = msg->data.result; + break; + } + } + if (easy) { + curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url); + GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url, + curl_easy_strerror (code), code); + } + } while (easy); + + return code; +} + +static void +handle_transfer (GstCurlBaseSink * sink) +{ + GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + gint retval; + gint activated_fds; + gint running_handles; + gint timeout; + CURLMcode m_code; + CURLcode e_code; + + GST_OBJECT_LOCK (sink); + timeout = sink->timeout; + GST_OBJECT_UNLOCK (sink); + + /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data + available to send or receive - call simply curl_multi_perform before + poll() on more actions */ + do { + m_code = curl_multi_perform (sink->multi_handle, &running_handles); + } while (m_code == CURLM_CALL_MULTI_PERFORM); + + while (running_handles && (m_code == CURLM_OK)) { + if (klass->transfer_prepare_poll_wait) { + klass->transfer_prepare_poll_wait (sink); + } + + activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND); + if (G_UNLIKELY (activated_fds == -1)) { + if (errno == EAGAIN || errno == EINTR) { + GST_DEBUG_OBJECT (sink, "interrupted by signal"); + } else if (errno == EBUSY) { + GST_DEBUG_OBJECT (sink, "poll stopped"); + retval = GST_FLOW_EOS; + goto fail; + } else { + GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno)); + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll failed"), (NULL)); + retval = GST_FLOW_ERROR; + goto fail; + } + } else if (G_UNLIKELY (activated_fds == 0)) { + GST_DEBUG_OBJECT (sink, "poll timed out"); + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll timed out"), (NULL)); + retval = GST_FLOW_ERROR; + goto fail; + } + + /* readable/writable sockets */ + do { + m_code = curl_multi_perform (sink->multi_handle, &running_handles); + } while (m_code == CURLM_CALL_MULTI_PERFORM); + } + + if (m_code != CURLM_OK) { + GST_DEBUG_OBJECT (sink, "curl multi error"); + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", + curl_multi_strerror (m_code)), (NULL)); + retval = GST_FLOW_ERROR; + goto fail; + } + + /* problems still might have occurred on individual transfers even when + * curl_multi_perform returns CURLM_OK */ + if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) { + GST_DEBUG_OBJECT (sink, "curl easy error"); + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", + curl_easy_strerror (e_code)), (NULL)); + retval = GST_FLOW_ERROR; + goto fail; + } + + gst_curl_base_sink_got_response_notify (sink); + + return; + +fail: + GST_OBJECT_LOCK (sink); + if (sink->flow_ret == GST_FLOW_OK) { + sink->flow_ret = retval; + } + GST_OBJECT_UNLOCK (sink); + return; +} + +/* This function gets called by libcurl after the socket() call but before + * the connect() call. */ +static int +gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd, + curlsocktype G_GNUC_UNUSED purpose) +{ + GstCurlBaseSink *sink; + gboolean ret = TRUE; + + sink = (GstCurlBaseSink *) clientp; + + g_assert (sink); + + if (curlfd < 0) { + /* signal an unrecoverable error to the library which will close the socket + and return CURLE_COULDNT_CONNECT + */ + return 1; + } + + gst_poll_fd_init (&sink->fd); + sink->fd.fd = curlfd; + + ret = ret && gst_poll_add_fd (sink->fdset, &sink->fd); + ret = ret && gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE); + ret = ret && gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE); + GST_DEBUG ("fd: %d", sink->fd.fd); + GST_OBJECT_LOCK (sink); + gst_curl_base_sink_setup_dscp_unlocked (sink); + GST_OBJECT_UNLOCK (sink); + + /* success */ + if (ret) { + return 0; + } else { + return 1; + } +} + +static gboolean +gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink) +{ + GError *error = NULL; + gboolean ret = TRUE; + + GST_LOG ("creating transfer thread"); + sink->transfer_thread_close = FALSE; + sink->new_file = TRUE; + sink->transfer_thread = + g_thread_create ((GThreadFunc) gst_curl_base_sink_transfer_thread_func, + sink, TRUE, &error); + + if (sink->transfer_thread == NULL || error != NULL) { + ret = FALSE; + if (error) { + GST_ERROR_OBJECT (sink, "could not create thread %s", error->message); + g_error_free (error); + } else { + GST_ERROR_OBJECT (sink, "could not create thread for unknown reason"); + } + } + + return ret; +} + +static gpointer +gst_curl_base_sink_transfer_thread_func (gpointer data) +{ + GstCurlBaseSink *sink = (GstCurlBaseSink *) data; + GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + GstFlowReturn ret; + gboolean data_available; + + GST_LOG ("transfer thread started"); + GST_OBJECT_LOCK (sink); + if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) { + GST_DEBUG_OBJECT (sink, "curl setup error"); + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("curl setup error"), (NULL)); + sink->flow_ret = GST_FLOW_ERROR; + goto done; + } + + while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) { + /* we are working on a new file, clearing flag and setting a new file + * name */ + sink->new_file = FALSE; + + /* wait for data to arrive for this new file, if we get a new file name + * again before getting data we will simply skip transfering anything + * for this file and go directly to the new file */ + data_available = gst_curl_base_sink_wait_for_data_unlocked (sink); + if (data_available) { + if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) { + sink->flow_ret = GST_FLOW_ERROR; + GST_OBJECT_UNLOCK (sink); + GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, ("Unexpected state."), + (NULL)); + GST_OBJECT_LOCK (sink); + goto done; + } + } + + /* stay unlocked while handling the actual transfer */ + GST_OBJECT_UNLOCK (sink); + + if (data_available) { + if (!gst_curl_base_sink_is_live (sink)) { + /* prepare transfer if needed */ + if (klass->prepare_transfer) { + GST_OBJECT_LOCK (sink); + if (!klass->prepare_transfer (sink)) { + sink->flow_ret = GST_FLOW_ERROR; + goto done; + } + GST_OBJECT_UNLOCK (sink); + } + curl_multi_add_handle (sink->multi_handle, sink->curl); + } + + /* Start driving the transfer. */ + klass->handle_transfer (sink); + + /* easy handle will be possibly re-used for next transfer, thus it needs to + * be removed from the multi stack and re-added again */ + if (!gst_curl_base_sink_is_live (sink)) { + curl_multi_remove_handle (sink->multi_handle, sink->curl); + } + } + + /* lock again before looping to check the thread closed flag */ + GST_OBJECT_LOCK (sink); + } + + if (sink->is_live) { + curl_multi_remove_handle (sink->multi_handle, sink->curl); + } + +done: + /* extract the error code so the lock does not have to be + * taken when calling the functions below that take the lock + * on their own */ + ret = sink->flow_ret; + GST_OBJECT_UNLOCK (sink); + + /* if there is a flow error, always notify the render function so it + * can return the flow error up along the pipeline. as an error has + * occurred there is no response to receive, so notify the event function + * so it doesn't block indefinitely waiting for a response. */ + if (ret != GST_FLOW_OK) { + gst_curl_base_sink_data_sent_notify (sink); + gst_curl_base_sink_got_response_notify (sink); + } + + GST_DEBUG ("exit thread func - transfer thread close flag: %d", + sink->transfer_thread_close); + + return NULL; +} + +static gboolean +gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink) +{ + g_assert (sink); + + if (sink->curl == NULL) { + /* curl_easy_init automatically calls curl_global_init(3) */ + if ((sink->curl = curl_easy_init ()) == NULL) { + g_warning ("Failed to init easy handle"); + return FALSE; + } + } + + if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) { + g_warning ("Failed to setup easy handle"); + GST_OBJECT_UNLOCK (sink); + return FALSE; + } + + /* init a multi stack (non-blocking interface to liburl) */ + if (sink->multi_handle == NULL) { + if ((sink->multi_handle = curl_multi_init ()) == NULL) { + return FALSE; + } + } + + return TRUE; +} + +static void +gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink) +{ + if (sink->curl != NULL) { + if (sink->multi_handle != NULL) { + curl_multi_remove_handle (sink->multi_handle, sink->curl); + } + curl_easy_cleanup (sink->curl); + sink->curl = NULL; + } + + if (sink->multi_handle != NULL) { + curl_multi_cleanup (sink->multi_handle); + sink->multi_handle = NULL; + } +} + +static gboolean +gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink) +{ + gboolean data_available = FALSE; + + GST_LOG ("waiting for data"); + while (!sink->transfer_cond->data_available && + !sink->transfer_thread_close && !sink->new_file) { + g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink)); + } + + if (sink->transfer_thread_close) { + GST_LOG ("wait for data aborted due to thread close"); + } else if (sink->new_file) { + GST_LOG ("wait for data aborted due to new file name"); + } else { + GST_LOG ("wait for data completed"); + data_available = TRUE; + } + + return data_available; +} + +static void +gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink) +{ + GST_LOG ("new file name"); + sink->new_file = TRUE; + g_cond_signal (sink->transfer_cond->cond); +} + +static void + gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked + (GstCurlBaseSink * sink) +{ + GST_LOG ("waiting for buffer send to complete"); + + /* this function should not check if the transfer thread is set to be closed + * since that flag only can be set by the EoS event (by the pipeline thread). + * This can therefore never happen while this function is running since this + * function also is called by the pipeline thread (in the render function) */ + while (!sink->transfer_cond->data_sent) { + g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink)); + } + GST_LOG ("buffer send completed"); +} + +static void +gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink) +{ + GST_LOG ("transfer completed"); + GST_OBJECT_LOCK (sink); + sink->transfer_cond->data_available = FALSE; + sink->transfer_cond->data_sent = TRUE; + g_cond_signal (sink->transfer_cond->cond); + GST_OBJECT_UNLOCK (sink); +} + +static void +gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink) +{ + GST_LOG ("waiting for remote to send response code"); + + GST_OBJECT_LOCK (sink); + while (sink->transfer_cond->wait_for_response) { + g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink)); + } + GST_OBJECT_UNLOCK (sink); + + GST_LOG ("response code received"); +} + +static void +gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink) +{ + GST_LOG ("got response code"); + + GST_OBJECT_LOCK (sink); + sink->transfer_cond->wait_for_response = FALSE; + g_cond_signal (sink->transfer_cond->cond); + GST_OBJECT_UNLOCK (sink); +} + +static gint +gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink) +{ + gint tos; + gint af; + gint ret = -1; + union + { + struct sockaddr sa; + struct sockaddr_in6 sa_in6; + struct sockaddr_storage sa_stor; + } sa; + socklen_t slen = sizeof (sa); + + if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) { + GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); + return ret; + } + af = sa.sa.sa_family; + + /* if this is an IPv4-mapped address then do IPv4 QoS */ + if (af == AF_INET6) { + GST_DEBUG_OBJECT (sink, "check IP6 socket"); + if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) { + GST_DEBUG_OBJECT (sink, "mapped to IPV4"); + af = AF_INET; + } + } + /* extract and shift 6 bits of the DSCP */ + tos = (sink->qos_dscp & 0x3f) << 2; + + switch (af) { + case AF_INET: + ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)); + break; + case AF_INET6: +#ifdef IPV6_TCLASS + ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, + sizeof (tos)); + break; +#endif + default: + GST_ERROR_OBJECT (sink, "unsupported AF"); + break; + } + if (ret) { + GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno)); + } + + return ret; +} |