renatofilho@787: /* GStreamer renatofilho@787: * Copyright (C) 2006 Edward Hervey renatofilho@787: * Copyright (C) 2007 Jan Schmidt renatofilho@787: * Copyright (C) 2007 Wim Taymans renatofilho@787: * renatofilho@787: * gstmultiqueue.c: renatofilho@787: * renatofilho@787: * This library is free software; you can redistribute it and/or renatofilho@787: * modify it under the terms of the GNU Library General Public renatofilho@787: * License as published by the Free Software Foundation; either renatofilho@787: * version 2 of the License, or (at your option) any later version. renatofilho@787: * renatofilho@787: * This library is distributed in the hope that it will be useful, renatofilho@787: * but WITHOUT ANY WARRANTY; without even the implied warranty of renatofilho@787: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU renatofilho@787: * Library General Public License for more details. renatofilho@787: * renatofilho@787: * You should have received a copy of the GNU Library General Public renatofilho@787: * License along with this library; if not, write to the renatofilho@787: * Free Software Foundation, Inc., 59 Temple Place - Suite 330, renatofilho@787: * Boston, MA 02111-1307, USA. renatofilho@787: */ renatofilho@787: renatofilho@787: #ifdef HAVE_CONFIG_H renatofilho@787: # include "config.h" renatofilho@787: #endif renatofilho@787: renatofilho@787: #include renatofilho@787: #include "gstmultiqueue.h" renatofilho@787: renatofilho@787: /** renatofilho@787: * GstSingleQueue: renatofilho@787: * @sinkpad: associated sink #GstPad renatofilho@787: * @srcpad: associated source #GstPad renatofilho@787: * renatofilho@787: * Structure containing all information and properties about renatofilho@787: * a single queue. renatofilho@787: */ renatofilho@787: typedef struct _GstSingleQueue GstSingleQueue; renatofilho@787: renatofilho@787: struct _GstSingleQueue renatofilho@787: { renatofilho@787: /* unique identifier of the queue */ renatofilho@787: guint id; renatofilho@787: renatofilho@787: GstMultiQueue *mqueue; renatofilho@787: renatofilho@787: GstPad *sinkpad; renatofilho@787: GstPad *srcpad; renatofilho@787: renatofilho@787: /* flowreturn of previous srcpad push */ renatofilho@787: GstFlowReturn srcresult; renatofilho@787: GstSegment sink_segment; renatofilho@787: GstSegment src_segment; renatofilho@787: renatofilho@787: /* queue of data */ renatofilho@787: GstDataQueue *queue; renatofilho@787: GstDataQueueSize max_size, extra_size; renatofilho@787: GstClockTime cur_time; renatofilho@787: gboolean is_eos; renatofilho@787: gboolean inextra; /* TRUE if the queue is currently in extradata mode */ renatofilho@787: renatofilho@787: /* Protected by global lock */ renatofilho@787: guint32 nextid; /* ID of the next object waiting to be pushed */ renatofilho@787: guint32 oldid; /* ID of the last object pushed (last in a series) */ renatofilho@787: GCond *turn; /* SingleQueue turn waiting conditional */ renatofilho@787: }; renatofilho@787: renatofilho@787: renatofilho@787: /* Extension of GstDataQueueItem structure for our usage */ renatofilho@787: typedef struct _GstMultiQueueItem GstMultiQueueItem; renatofilho@787: renatofilho@787: struct _GstMultiQueueItem renatofilho@787: { renatofilho@787: GstMiniObject *object; renatofilho@787: guint size; renatofilho@787: guint64 duration; renatofilho@787: gboolean visible; renatofilho@787: renatofilho@787: GDestroyNotify destroy; renatofilho@787: guint32 posid; renatofilho@787: }; renatofilho@787: renatofilho@787: static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); renatofilho@787: static void gst_single_queue_free (GstSingleQueue * squeue); renatofilho@787: renatofilho@787: static void wake_up_next_non_linked (GstMultiQueue * mq); renatofilho@787: static void compute_high_id (GstMultiQueue * mq); renatofilho@787: renatofilho@787: static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", renatofilho@787: GST_PAD_SINK, renatofilho@787: GST_PAD_REQUEST, renatofilho@787: GST_STATIC_CAPS_ANY); renatofilho@787: renatofilho@787: static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d", renatofilho@787: GST_PAD_SRC, renatofilho@787: GST_PAD_SOMETIMES, renatofilho@787: GST_STATIC_CAPS_ANY); renatofilho@787: renatofilho@787: GST_DEBUG_CATEGORY_STATIC (multi_queue_debug); renatofilho@787: #define GST_CAT_DEFAULT (multi_queue_debug) renatofilho@787: renatofilho@787: /* default limits, we try to keep up to 2 seconds of data and if there is not renatofilho@787: * time, up to 10 MB. The number of buffers is dynamically scaled to make sure renatofilho@787: * there is data in the queues. Normally, the byte and time limits are not hit renatofilho@787: * in theses conditions. */ renatofilho@787: #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ renatofilho@787: #define DEFAULT_MAX_SIZE_BUFFERS 5 renatofilho@787: #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND renatofilho@787: renatofilho@787: /* second limits. When we hit one of the above limits we are probably dealing renatofilho@787: * with a badly muxed file and we scale the limits to these emergency values. renatofilho@787: * This is currently not yet implemented. */ renatofilho@787: #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ renatofilho@787: #define DEFAULT_EXTRA_SIZE_BUFFERS 5 renatofilho@787: #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND renatofilho@787: renatofilho@787: /* Signals and args */ renatofilho@787: enum renatofilho@787: { renatofilho@787: SIGNAL_UNDERRUN, renatofilho@787: SIGNAL_OVERRUN, renatofilho@787: LAST_SIGNAL renatofilho@787: }; renatofilho@787: renatofilho@787: enum renatofilho@787: { renatofilho@787: ARG_0, renatofilho@787: ARG_EXTRA_SIZE_BYTES, renatofilho@787: ARG_EXTRA_SIZE_BUFFERS, renatofilho@787: ARG_EXTRA_SIZE_TIME, renatofilho@787: ARG_MAX_SIZE_BYTES, renatofilho@787: ARG_MAX_SIZE_BUFFERS, renatofilho@787: ARG_MAX_SIZE_TIME, renatofilho@787: }; renatofilho@787: renatofilho@787: renatofilho@787: static const GstElementDetails gst_multiqueue_details = renatofilho@787: GST_ELEMENT_DETAILS ("MultiQueue", renatofilho@787: "Generic", renatofilho@787: "Multiple data queue", renatofilho@787: "Edward Hervey "); renatofilho@787: renatofilho@787: renatofilho@787: #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ renatofilho@787: g_mutex_lock (q->qlock); \ renatofilho@787: } G_STMT_END renatofilho@787: renatofilho@787: #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ renatofilho@787: g_mutex_unlock (q->qlock); \ renatofilho@787: } G_STMT_END renatofilho@787: renatofilho@787: static void gst_multi_queue_finalize (GObject * object); renatofilho@787: static void gst_multi_queue_set_property (GObject * object, renatofilho@787: guint prop_id, const GValue * value, GParamSpec * pspec); renatofilho@787: static void gst_multi_queue_get_property (GObject * object, renatofilho@787: guint prop_id, GValue * value, GParamSpec * pspec); renatofilho@787: renatofilho@787: static GstPad *gst_multi_queue_request_new_pad (GstElement * element, renatofilho@787: GstPadTemplate * temp, const gchar * name); renatofilho@787: static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad); renatofilho@787: renatofilho@787: static void gst_multi_queue_loop (GstPad * pad); renatofilho@787: renatofilho@787: #define _do_init(bla) \ renatofilho@787: GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element"); renatofilho@787: renatofilho@787: GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement, renatofilho@787: GST_TYPE_ELEMENT, _do_init); renatofilho@787: renatofilho@787: static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 }; renatofilho@787: renatofilho@787: renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_base_init (gpointer g_class) renatofilho@787: { renatofilho@787: GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); renatofilho@787: renatofilho@787: gst_element_class_set_details (gstelement_class, &gst_multiqueue_details); renatofilho@787: gst_element_class_add_pad_template (gstelement_class, renatofilho@787: gst_static_pad_template_get (&sinktemplate)); renatofilho@787: gst_element_class_add_pad_template (gstelement_class, renatofilho@787: gst_static_pad_template_get (&srctemplate)); renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_class_init (GstMultiQueueClass * klass) renatofilho@787: { renatofilho@787: GObjectClass *gobject_class = G_OBJECT_CLASS (klass); renatofilho@787: GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); renatofilho@787: renatofilho@787: gobject_class->set_property = renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); renatofilho@787: gobject_class->get_property = renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); renatofilho@787: renatofilho@787: /* SIGNALS */ renatofilho@787: gst_multi_queue_signals[SIGNAL_UNDERRUN] = renatofilho@787: g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, renatofilho@787: G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, renatofilho@787: g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); renatofilho@787: renatofilho@787: gst_multi_queue_signals[SIGNAL_OVERRUN] = renatofilho@787: g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, renatofilho@787: G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, renatofilho@787: g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); renatofilho@787: renatofilho@787: /* PROPERTIES */ renatofilho@787: renatofilho@787: g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, renatofilho@787: g_param_spec_uint ("max-size-bytes", "Max. size (kB)", renatofilho@787: "Max. amount of data in the queue (bytes, 0=disable)", renatofilho@787: 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); renatofilho@787: g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, renatofilho@787: g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", renatofilho@787: "Max. number of buffers in the queue (0=disable)", renatofilho@787: 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); renatofilho@787: g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, renatofilho@787: g_param_spec_uint64 ("max-size-time", "Max. size (ns)", renatofilho@787: "Max. amount of data in the queue (in ns, 0=disable)", renatofilho@787: 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); renatofilho@787: renatofilho@787: g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, renatofilho@787: g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", renatofilho@787: "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", renatofilho@787: 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE)); renatofilho@787: g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, renatofilho@787: g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", renatofilho@787: "Amount of buffers the queues can grow if one of them is empty (0=disable)", renatofilho@787: 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE)); renatofilho@787: g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, renatofilho@787: g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", renatofilho@787: "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", renatofilho@787: 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE)); renatofilho@787: renatofilho@787: gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); renatofilho@787: renatofilho@787: gstelement_class->request_new_pad = renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); renatofilho@787: gstelement_class->release_pad = renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad); renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) renatofilho@787: { renatofilho@787: mqueue->nbqueues = 0; renatofilho@787: mqueue->queues = NULL; renatofilho@787: renatofilho@787: mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES; renatofilho@787: mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS; renatofilho@787: mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME; renatofilho@787: renatofilho@787: mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES; renatofilho@787: mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS; renatofilho@787: mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME; renatofilho@787: renatofilho@787: mqueue->counter = 1; renatofilho@787: mqueue->highid = -1; renatofilho@787: mqueue->nextnotlinked = -1; renatofilho@787: renatofilho@787: mqueue->qlock = g_mutex_new (); renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_finalize (GObject * object) renatofilho@787: { renatofilho@787: GstMultiQueue *mqueue = GST_MULTI_QUEUE (object); renatofilho@787: renatofilho@787: g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL); renatofilho@787: g_list_free (mqueue->queues); renatofilho@787: mqueue->queues = NULL; renatofilho@787: renatofilho@787: /* free/unref instance data */ renatofilho@787: g_mutex_free (mqueue->qlock); renatofilho@787: renatofilho@787: G_OBJECT_CLASS (parent_class)->finalize (object); renatofilho@787: } renatofilho@787: renatofilho@787: renatofilho@787: renatofilho@787: #define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ renatofilho@787: GList * tmp = mq->queues; \ renatofilho@787: while (tmp) { \ renatofilho@787: GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ renatofilho@787: q->max_size.format = mq->max_size.format; \ renatofilho@787: tmp = g_list_next(tmp); \ renatofilho@787: }; \ renatofilho@787: } G_STMT_END renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_set_property (GObject * object, guint prop_id, renatofilho@787: const GValue * value, GParamSpec * pspec) renatofilho@787: { renatofilho@787: GstMultiQueue *mq = GST_MULTI_QUEUE (object); renatofilho@787: renatofilho@787: switch (prop_id) { renatofilho@787: case ARG_MAX_SIZE_BYTES: renatofilho@787: mq->max_size.bytes = g_value_get_uint (value); renatofilho@787: SET_CHILD_PROPERTY (mq, bytes); renatofilho@787: break; renatofilho@787: case ARG_MAX_SIZE_BUFFERS: renatofilho@787: mq->max_size.visible = g_value_get_uint (value); renatofilho@787: SET_CHILD_PROPERTY (mq, visible); renatofilho@787: break; renatofilho@787: case ARG_MAX_SIZE_TIME: renatofilho@787: mq->max_size.time = g_value_get_uint64 (value); renatofilho@787: SET_CHILD_PROPERTY (mq, time); renatofilho@787: break; renatofilho@787: case ARG_EXTRA_SIZE_BYTES: renatofilho@787: mq->extra_size.bytes = g_value_get_uint (value); renatofilho@787: break; renatofilho@787: case ARG_EXTRA_SIZE_BUFFERS: renatofilho@787: mq->extra_size.visible = g_value_get_uint (value); renatofilho@787: break; renatofilho@787: case ARG_EXTRA_SIZE_TIME: renatofilho@787: mq->extra_size.time = g_value_get_uint64 (value); renatofilho@787: break; renatofilho@787: default: renatofilho@787: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); renatofilho@787: break; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_get_property (GObject * object, guint prop_id, renatofilho@787: GValue * value, GParamSpec * pspec) renatofilho@787: { renatofilho@787: GstMultiQueue *mq = GST_MULTI_QUEUE (object); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: renatofilho@787: switch (prop_id) { renatofilho@787: case ARG_EXTRA_SIZE_BYTES: renatofilho@787: g_value_set_uint (value, mq->extra_size.bytes); renatofilho@787: break; renatofilho@787: case ARG_EXTRA_SIZE_BUFFERS: renatofilho@787: g_value_set_uint (value, mq->extra_size.visible); renatofilho@787: break; renatofilho@787: case ARG_EXTRA_SIZE_TIME: renatofilho@787: g_value_set_uint64 (value, mq->extra_size.time); renatofilho@787: break; renatofilho@787: case ARG_MAX_SIZE_BYTES: renatofilho@787: g_value_set_uint (value, mq->max_size.bytes); renatofilho@787: break; renatofilho@787: case ARG_MAX_SIZE_BUFFERS: renatofilho@787: g_value_set_uint (value, mq->max_size.visible); renatofilho@787: break; renatofilho@787: case ARG_MAX_SIZE_TIME: renatofilho@787: g_value_set_uint64 (value, mq->max_size.time); renatofilho@787: break; renatofilho@787: default: renatofilho@787: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); renatofilho@787: break; renatofilho@787: } renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: } renatofilho@787: renatofilho@787: GList * renatofilho@787: gst_multi_queue_get_internal_links (GstPad * pad) renatofilho@787: { renatofilho@787: GList *res = NULL; renatofilho@787: GstMultiQueue *mqueue; renatofilho@787: GstSingleQueue *sq = NULL; renatofilho@787: GList *tmp; renatofilho@787: renatofilho@787: g_return_val_if_fail (GST_IS_PAD (pad), NULL); renatofilho@787: renatofilho@787: mqueue = GST_MULTI_QUEUE (GST_PAD_PARENT (pad)); renatofilho@787: if (!mqueue) renatofilho@787: goto no_parent; renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); renatofilho@787: /* Find which single queue it belongs to */ renatofilho@787: for (tmp = mqueue->queues; tmp && !res; tmp = g_list_next (tmp)) { renatofilho@787: sq = (GstSingleQueue *) tmp->data; renatofilho@787: renatofilho@787: if (sq->sinkpad == pad) renatofilho@787: res = g_list_prepend (res, sq->srcpad); renatofilho@787: if (sq->srcpad == pad) renatofilho@787: res = g_list_prepend (res, sq->sinkpad); renatofilho@787: } renatofilho@787: renatofilho@787: if (!res) renatofilho@787: GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); renatofilho@787: renatofilho@787: return res; renatofilho@787: renatofilho@787: no_parent: renatofilho@787: { renatofilho@787: GST_DEBUG_OBJECT (pad, "no parent"); renatofilho@787: return NULL; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: renatofilho@787: /* renatofilho@787: * GstElement methods renatofilho@787: */ renatofilho@787: renatofilho@787: static GstPad * renatofilho@787: gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp, renatofilho@787: const gchar * name) renatofilho@787: { renatofilho@787: GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); renatofilho@787: GstSingleQueue *squeue; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (element, "name : %s", name); renatofilho@787: renatofilho@787: /* Create a new single queue, add the sink and source pad and return the sink pad */ renatofilho@787: squeue = gst_single_queue_new (mqueue); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); renatofilho@787: mqueue->queues = g_list_append (mqueue->queues, squeue); renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); renatofilho@787: renatofilho@787: /* renatofilho@787: GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s", renatofilho@787: GST_DEBUG_PAD_NAME (squeue->sinkpad)); renatofilho@787: */ renatofilho@787: return squeue->sinkpad; renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_release_pad (GstElement * element, GstPad * pad) renatofilho@787: { renatofilho@787: GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); renatofilho@787: GstSingleQueue *sq = NULL; renatofilho@787: GList *tmp; renatofilho@787: renatofilho@787: // GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad)); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); renatofilho@787: /* Find which single queue it belongs to, knowing that it should be a sinkpad */ renatofilho@787: for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) { renatofilho@787: sq = (GstSingleQueue *) tmp->data; renatofilho@787: renatofilho@787: if (sq->sinkpad == pad) renatofilho@787: break; renatofilho@787: } renatofilho@787: renatofilho@787: if (!tmp) { renatofilho@787: GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); renatofilho@787: return; renatofilho@787: } renatofilho@787: renatofilho@787: /* FIXME: The removal of the singlequeue should probably not happen until it renatofilho@787: * finishes draining */ renatofilho@787: renatofilho@787: /* remove it from the list */ renatofilho@787: mqueue->queues = g_list_delete_link (mqueue->queues, tmp); renatofilho@787: renatofilho@787: /* FIXME : recompute next-non-linked */ renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); renatofilho@787: renatofilho@787: /* delete SingleQueue */ renatofilho@787: gst_data_queue_set_flushing (sq->queue, TRUE); renatofilho@787: renatofilho@787: gst_pad_set_active (sq->srcpad, FALSE); renatofilho@787: gst_pad_set_active (sq->sinkpad, FALSE); renatofilho@787: gst_element_remove_pad (element, sq->srcpad); renatofilho@787: gst_element_remove_pad (element, sq->sinkpad); renatofilho@787: gst_single_queue_free (sq); renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) renatofilho@787: { renatofilho@787: gboolean result; renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"), renatofilho@787: sq->id); renatofilho@787: renatofilho@787: if (flush) { renatofilho@787: sq->srcresult = GST_FLOW_WRONG_STATE; renatofilho@787: gst_data_queue_set_flushing (sq->queue, TRUE); renatofilho@787: renatofilho@787: /* wake up non-linked task */ renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", renatofilho@787: sq->id); renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: g_cond_signal (sq->turn); renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); renatofilho@787: result = gst_pad_pause_task (sq->srcpad); renatofilho@787: } else { renatofilho@787: gst_data_queue_flush (sq->queue); renatofilho@787: gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); renatofilho@787: gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); renatofilho@787: /* All pads start off not-linked for a smooth kick-off */ renatofilho@787: sq->srcresult = GST_FLOW_NOT_LINKED; renatofilho@787: sq->cur_time = 0; renatofilho@787: sq->max_size.visible = mq->max_size.visible; renatofilho@787: sq->is_eos = FALSE; renatofilho@787: sq->inextra = FALSE; renatofilho@787: sq->nextid = 0; renatofilho@787: sq->oldid = 0; renatofilho@787: gst_data_queue_set_flushing (sq->queue, FALSE); renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); renatofilho@787: result = renatofilho@787: gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, renatofilho@787: sq->srcpad); renatofilho@787: } renatofilho@787: return result; renatofilho@787: } renatofilho@787: renatofilho@787: /* calculate the diff between running time on the sink and src of the queue. renatofilho@787: * This is the total amount of time in the queue. renatofilho@787: * WITH LOCK TAKEN */ renatofilho@787: static void renatofilho@787: update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) renatofilho@787: { renatofilho@787: gint64 sink_time, src_time; renatofilho@787: renatofilho@787: sink_time = renatofilho@787: gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, renatofilho@787: sq->sink_segment.last_stop); renatofilho@787: renatofilho@787: src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, renatofilho@787: sq->src_segment.last_stop); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, renatofilho@787: GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); renatofilho@787: renatofilho@787: /* This allows for streams with out of order timestamping - sometimes the renatofilho@787: * emerging timestamp is later than the arriving one(s) */ renatofilho@787: if (sink_time >= src_time) renatofilho@787: sq->cur_time = sink_time - src_time; renatofilho@787: else renatofilho@787: sq->cur_time = 0; renatofilho@787: } renatofilho@787: renatofilho@787: /* take a NEWSEGMENT event and apply the values to segment, updating the time renatofilho@787: * level of queue. */ renatofilho@787: static void renatofilho@787: apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, renatofilho@787: GstSegment * segment) renatofilho@787: { renatofilho@787: gboolean update; renatofilho@787: GstFormat format; renatofilho@787: gdouble rate, arate; renatofilho@787: gint64 start, stop, time; renatofilho@787: renatofilho@787: gst_event_parse_new_segment_full (event, &update, &rate, &arate, renatofilho@787: &format, &start, &stop, &time); renatofilho@787: renatofilho@787: /* now configure the values, we use these to track timestamps on the renatofilho@787: * sinkpad. */ renatofilho@787: if (format != GST_FORMAT_TIME) { renatofilho@787: /* non-time format, pretent the current time segment is closed with a renatofilho@787: * 0 start and unknown stop time. */ renatofilho@787: update = FALSE; renatofilho@787: format = GST_FORMAT_TIME; renatofilho@787: start = 0; renatofilho@787: stop = -1; renatofilho@787: time = 0; renatofilho@787: } renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: renatofilho@787: gst_segment_set_newsegment_full (segment, update, renatofilho@787: rate, arate, format, start, stop, time); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment); renatofilho@787: renatofilho@787: /* segment can update the time level of the queue */ renatofilho@787: update_time_level (mq, sq); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: } renatofilho@787: renatofilho@787: /* take a buffer and update segment, updating the time level of the queue. */ renatofilho@787: static void renatofilho@787: apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, renatofilho@787: GstClockTime duration, GstSegment * segment) renatofilho@787: { renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: renatofilho@787: /* if no timestamp is set, assume it's continuous with the previous renatofilho@787: * time */ renatofilho@787: if (timestamp == GST_CLOCK_TIME_NONE) renatofilho@787: timestamp = segment->last_stop; renatofilho@787: renatofilho@787: /* add duration */ renatofilho@787: if (duration != GST_CLOCK_TIME_NONE) renatofilho@787: timestamp += duration; renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT, renatofilho@787: sq->id, GST_TIME_ARGS (timestamp)); renatofilho@787: renatofilho@787: gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp); renatofilho@787: renatofilho@787: /* calc diff with other end */ renatofilho@787: update_time_level (mq, sq); renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: } renatofilho@787: renatofilho@787: static GstFlowReturn renatofilho@787: gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, renatofilho@787: GstMiniObject * object) renatofilho@787: { renatofilho@787: GstFlowReturn result = GST_FLOW_OK; renatofilho@787: renatofilho@787: if (GST_IS_BUFFER (object)) { renatofilho@787: GstBuffer *buffer; renatofilho@787: GstClockTime timestamp, duration; renatofilho@787: renatofilho@787: buffer = GST_BUFFER_CAST (object); renatofilho@787: timestamp = GST_BUFFER_TIMESTAMP (buffer); renatofilho@787: duration = GST_BUFFER_DURATION (buffer); renatofilho@787: renatofilho@787: apply_buffer (mq, sq, timestamp, duration, &sq->src_segment); renatofilho@787: renatofilho@787: /* Applying the buffer may have made the queue non-full again, unblock it if needed */ renatofilho@787: gst_data_queue_limits_changed (sq->queue); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, renatofilho@787: sq->id, buffer, GST_TIME_ARGS (timestamp)); renatofilho@787: renatofilho@787: result = gst_pad_push (sq->srcpad, buffer); renatofilho@787: } else if (GST_IS_EVENT (object)) { renatofilho@787: GstEvent *event; renatofilho@787: renatofilho@787: event = GST_EVENT_CAST (object); renatofilho@787: renatofilho@787: switch (GST_EVENT_TYPE (event)) { renatofilho@787: case GST_EVENT_EOS: renatofilho@787: result = GST_FLOW_UNEXPECTED; renatofilho@787: break; renatofilho@787: case GST_EVENT_NEWSEGMENT: renatofilho@787: apply_segment (mq, sq, event, &sq->src_segment); renatofilho@787: /* Applying the segment may have made the queue non-full again, unblock it if needed */ renatofilho@787: gst_data_queue_limits_changed (sq->queue); renatofilho@787: break; renatofilho@787: default: renatofilho@787: break; renatofilho@787: } renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "SingleQueue %d : Pushing event %p of type %s", renatofilho@787: sq->id, event, GST_EVENT_TYPE_NAME (event)); renatofilho@787: renatofilho@787: gst_pad_push_event (sq->srcpad, event); renatofilho@787: } else { renatofilho@787: g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", renatofilho@787: sq->id); renatofilho@787: } renatofilho@787: return result; renatofilho@787: renatofilho@787: /* ERRORS */ renatofilho@787: } renatofilho@787: renatofilho@787: static GstMiniObject * renatofilho@787: gst_multi_queue_item_steal_object (GstMultiQueueItem * item) renatofilho@787: { renatofilho@787: GstMiniObject *res; renatofilho@787: renatofilho@787: res = item->object; renatofilho@787: item->object = NULL; renatofilho@787: renatofilho@787: return res; renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_multi_queue_item_destroy (GstMultiQueueItem * item) renatofilho@787: { renatofilho@787: if (item->object) renatofilho@787: gst_mini_object_unref (item->object); renatofilho@787: g_free (item); renatofilho@787: } renatofilho@787: renatofilho@787: /* takes ownership of passed mini object! */ renatofilho@787: static GstMultiQueueItem * renatofilho@787: gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) renatofilho@787: { renatofilho@787: GstMultiQueueItem *item; renatofilho@787: renatofilho@787: item = g_new (GstMultiQueueItem, 1); renatofilho@787: item->object = object; renatofilho@787: item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; renatofilho@787: item->posid = curid; renatofilho@787: renatofilho@787: if (GST_IS_BUFFER (object)) { renatofilho@787: item->size = GST_BUFFER_SIZE (object); renatofilho@787: item->duration = GST_BUFFER_DURATION (object); renatofilho@787: if (item->duration == GST_CLOCK_TIME_NONE) renatofilho@787: item->duration = 0; renatofilho@787: item->visible = TRUE; renatofilho@787: } else { renatofilho@787: item->size = 0; renatofilho@787: item->duration = 0; renatofilho@787: item->visible = FALSE; renatofilho@787: } renatofilho@787: return item; renatofilho@787: } renatofilho@787: renatofilho@787: /* Each main loop attempts to push buffers until the return value renatofilho@787: * is not-linked. not-linked pads are not allowed to push data beyond renatofilho@787: * any linked pads, so they don't 'rush ahead of the pack'. renatofilho@787: */ renatofilho@787: static void renatofilho@787: gst_multi_queue_loop (GstPad * pad) renatofilho@787: { renatofilho@787: GstSingleQueue *sq; renatofilho@787: GstMultiQueueItem *item; renatofilho@787: GstDataQueueItem *sitem; renatofilho@787: GstMultiQueue *mq; renatofilho@787: GstMiniObject *object; renatofilho@787: guint32 newid; renatofilho@787: guint32 oldid = -1; renatofilho@787: GstFlowReturn result; renatofilho@787: renatofilho@787: sq = (GstSingleQueue *) gst_pad_get_element_private (pad); renatofilho@787: mq = sq->mqueue; renatofilho@787: renatofilho@787: do { renatofilho@787: GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); renatofilho@787: renatofilho@787: /* Get something from the queue, blocking until that happens, or we get renatofilho@787: * flushed */ renatofilho@787: if (!(gst_data_queue_pop (sq->queue, &sitem))) renatofilho@787: goto out_flushing; renatofilho@787: renatofilho@787: item = (GstMultiQueueItem *) sitem; renatofilho@787: newid = item->posid; renatofilho@787: renatofilho@787: /* steal the object and destroy the item */ renatofilho@787: object = gst_multi_queue_item_steal_object (item); renatofilho@787: gst_multi_queue_item_destroy (item); renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", renatofilho@787: sq->id, newid, oldid); renatofilho@787: renatofilho@787: /* If we're not-linked, we do some extra work because we might need to renatofilho@787: * wait before pushing. If we're linked but there's a gap in the IDs, renatofilho@787: * or it's the first loop, or we just passed the previous highid, renatofilho@787: * we might need to wake some sleeping pad up, so there's extra work renatofilho@787: * there too */ renatofilho@787: if (sq->srcresult == GST_FLOW_NOT_LINKED || renatofilho@787: (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) { renatofilho@787: GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", renatofilho@787: gst_flow_get_name (sq->srcresult)); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: renatofilho@787: /* Update the nextid so other threads know when to wake us up */ renatofilho@787: sq->nextid = newid; renatofilho@787: renatofilho@787: /* Update the oldid (the last ID we output) for highid tracking */ renatofilho@787: if (oldid != -1) renatofilho@787: sq->oldid = oldid; renatofilho@787: renatofilho@787: if (sq->srcresult == GST_FLOW_NOT_LINKED) { renatofilho@787: /* Go to sleep until it's time to push this buffer */ renatofilho@787: renatofilho@787: /* Recompute the highid */ renatofilho@787: compute_high_id (mq); renatofilho@787: while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { renatofilho@787: GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " renatofilho@787: "newid %u and highid %u", sq->id, newid, mq->highid); renatofilho@787: renatofilho@787: renatofilho@787: /* Wake up all non-linked pads before we sleep */ renatofilho@787: wake_up_next_non_linked (mq); renatofilho@787: renatofilho@787: mq->numwaiting++; renatofilho@787: g_cond_wait (sq->turn, mq->qlock); renatofilho@787: mq->numwaiting--; renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " renatofilho@787: "wakeup with newid %u and highid %u", sq->id, newid, mq->highid); renatofilho@787: } renatofilho@787: renatofilho@787: /* Re-compute the high_id in case someone else pushed */ renatofilho@787: compute_high_id (mq); renatofilho@787: } else { renatofilho@787: compute_high_id (mq); renatofilho@787: /* Wake up all non-linked pads */ renatofilho@787: wake_up_next_non_linked (mq); renatofilho@787: } renatofilho@787: /* We're done waiting, we can clear the nextid */ renatofilho@787: sq->nextid = 0; renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: } renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", renatofilho@787: gst_flow_get_name (sq->srcresult)); renatofilho@787: renatofilho@787: /* Try to push out the new object */ renatofilho@787: result = gst_single_queue_push_one (mq, sq, object); renatofilho@787: sq->srcresult = result; renatofilho@787: renatofilho@787: if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED) renatofilho@787: goto out_flushing; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", renatofilho@787: gst_flow_get_name (sq->srcresult)); renatofilho@787: renatofilho@787: oldid = newid; renatofilho@787: } renatofilho@787: while (TRUE); renatofilho@787: renatofilho@787: out_flushing: renatofilho@787: { renatofilho@787: /* Need to make sure wake up any sleeping pads when we exit */ renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: compute_high_id (mq); renatofilho@787: wake_up_next_non_linked (mq); renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: renatofilho@787: gst_data_queue_set_flushing (sq->queue, TRUE); renatofilho@787: gst_pad_pause_task (sq->srcpad); renatofilho@787: GST_CAT_LOG_OBJECT (multi_queue_debug, mq, renatofilho@787: "SingleQueue[%d] task paused, reason:%s", renatofilho@787: sq->id, gst_flow_get_name (sq->srcresult)); renatofilho@787: return; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: /** renatofilho@787: * gst_multi_queue_chain: renatofilho@787: * renatofilho@787: * This is similar to GstQueue's chain function, except: renatofilho@787: * _ we don't have leak behavioures, renatofilho@787: * _ we push with a unique id (curid) renatofilho@787: */ renatofilho@787: static GstFlowReturn renatofilho@787: gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) renatofilho@787: { renatofilho@787: GstSingleQueue *sq; renatofilho@787: GstMultiQueue *mq; renatofilho@787: GstMultiQueueItem *item; renatofilho@787: GstFlowReturn ret = GST_FLOW_OK; renatofilho@787: guint32 curid; renatofilho@787: GstClockTime timestamp, duration; renatofilho@787: renatofilho@787: sq = gst_pad_get_element_private (pad); renatofilho@787: mq = (GstMultiQueue *) gst_pad_get_parent (pad); renatofilho@787: renatofilho@787: /* Get a unique incrementing id */ renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: curid = mq->counter++; renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d", renatofilho@787: sq->id, buffer, curid); renatofilho@787: renatofilho@787: item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid); renatofilho@787: renatofilho@787: timestamp = GST_BUFFER_TIMESTAMP (buffer); renatofilho@787: duration = GST_BUFFER_DURATION (buffer); renatofilho@787: renatofilho@787: if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) renatofilho@787: goto flushing; renatofilho@787: renatofilho@787: /* update time level, we must do this after pushing the data in the queue so renatofilho@787: * that we never end up filling the queue first. */ renatofilho@787: apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment); renatofilho@787: renatofilho@787: done: renatofilho@787: gst_object_unref (mq); renatofilho@787: renatofilho@787: return ret; renatofilho@787: renatofilho@787: /* ERRORS */ renatofilho@787: flushing: renatofilho@787: { renatofilho@787: ret = sq->srcresult; renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", renatofilho@787: sq->id, gst_flow_get_name (ret)); renatofilho@787: gst_multi_queue_item_destroy (item); renatofilho@787: goto done; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) renatofilho@787: { renatofilho@787: GstSingleQueue *sq; renatofilho@787: renatofilho@787: sq = (GstSingleQueue *) gst_pad_get_element_private (pad); renatofilho@787: renatofilho@787: if (active) { renatofilho@787: /* All pads start off not-linked for a smooth kick-off */ renatofilho@787: sq->srcresult = GST_FLOW_NOT_LINKED; renatofilho@787: } else { renatofilho@787: sq->srcresult = GST_FLOW_WRONG_STATE; renatofilho@787: gst_data_queue_flush (sq->queue); renatofilho@787: } renatofilho@787: return TRUE; renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_multi_queue_sink_event (GstPad * pad, GstEvent * event) renatofilho@787: { renatofilho@787: GstSingleQueue *sq; renatofilho@787: GstMultiQueue *mq; renatofilho@787: guint32 curid; renatofilho@787: GstMultiQueueItem *item; renatofilho@787: gboolean res; renatofilho@787: GstEventType type; renatofilho@787: GstEvent *sref = NULL; renatofilho@787: renatofilho@787: sq = (GstSingleQueue *) gst_pad_get_element_private (pad); renatofilho@787: mq = (GstMultiQueue *) gst_pad_get_parent (pad); renatofilho@787: renatofilho@787: type = GST_EVENT_TYPE (event); renatofilho@787: renatofilho@787: switch (type) { renatofilho@787: case GST_EVENT_FLUSH_START: renatofilho@787: GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", renatofilho@787: sq->id); renatofilho@787: renatofilho@787: res = gst_pad_push_event (sq->srcpad, event); renatofilho@787: renatofilho@787: gst_single_queue_flush (mq, sq, TRUE); renatofilho@787: goto done; renatofilho@787: renatofilho@787: case GST_EVENT_FLUSH_STOP: renatofilho@787: GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", renatofilho@787: sq->id); renatofilho@787: renatofilho@787: res = gst_pad_push_event (sq->srcpad, event); renatofilho@787: renatofilho@787: gst_single_queue_flush (mq, sq, FALSE); renatofilho@787: goto done; renatofilho@787: case GST_EVENT_NEWSEGMENT: renatofilho@787: /* take ref because the queue will take ownership and we need the event renatofilho@787: * afterwards to update the segment */ renatofilho@787: sref = gst_event_ref (event); renatofilho@787: break; renatofilho@787: renatofilho@787: default: renatofilho@787: if (!(GST_EVENT_IS_SERIALIZED (event))) { renatofilho@787: res = gst_pad_push_event (sq->srcpad, event); renatofilho@787: goto done; renatofilho@787: } renatofilho@787: break; renatofilho@787: } renatofilho@787: renatofilho@787: /* Get an unique incrementing id */ renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: curid = mq->counter++; renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: renatofilho@787: item = gst_multi_queue_item_new ((GstMiniObject *) event, curid); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "SingleQueue %d : Enqueuing event %p of type %s with id %d", renatofilho@787: sq->id, event, GST_EVENT_TYPE_NAME (event), curid); renatofilho@787: renatofilho@787: if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) renatofilho@787: goto flushing; renatofilho@787: renatofilho@787: /* mark EOS when we received one, we must do that after putting the renatofilho@787: * buffer in the queue because EOS marks the buffer as filled. No need to take renatofilho@787: * a lock, the _check_full happens from this thread only, right before pushing renatofilho@787: * into dataqueue. */ renatofilho@787: switch (type) { renatofilho@787: case GST_EVENT_EOS: renatofilho@787: sq->is_eos = TRUE; renatofilho@787: break; renatofilho@787: case GST_EVENT_NEWSEGMENT: renatofilho@787: apply_segment (mq, sq, sref, &sq->sink_segment); renatofilho@787: gst_event_unref (sref); renatofilho@787: break; renatofilho@787: default: renatofilho@787: break; renatofilho@787: } renatofilho@787: done: renatofilho@787: gst_object_unref (mq); renatofilho@787: return res; renatofilho@787: renatofilho@787: flushing: renatofilho@787: { renatofilho@787: GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", renatofilho@787: sq->id, gst_flow_get_name (sq->srcresult)); renatofilho@787: if (sref) renatofilho@787: gst_event_unref (sref); renatofilho@787: gst_multi_queue_item_destroy (item); renatofilho@787: goto done; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: static GstCaps * renatofilho@787: gst_multi_queue_getcaps (GstPad * pad) renatofilho@787: { renatofilho@787: GstSingleQueue *sq = gst_pad_get_element_private (pad); renatofilho@787: GstPad *otherpad; renatofilho@787: GstCaps *result; renatofilho@787: renatofilho@787: otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad"); renatofilho@787: renatofilho@787: result = gst_pad_peer_get_caps (otherpad); renatofilho@787: if (result == NULL) renatofilho@787: result = gst_caps_new_any (); renatofilho@787: renatofilho@787: return result; renatofilho@787: } renatofilho@787: renatofilho@787: static GstFlowReturn renatofilho@787: gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, renatofilho@787: GstCaps * caps, GstBuffer ** buf) renatofilho@787: { renatofilho@787: GstSingleQueue *sq = gst_pad_get_element_private (pad); renatofilho@787: renatofilho@787: return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf); renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_multi_queue_src_activate_push (GstPad * pad, gboolean active) renatofilho@787: { renatofilho@787: GstMultiQueue *mq; renatofilho@787: GstSingleQueue *sq; renatofilho@787: gboolean result = FALSE; renatofilho@787: renatofilho@787: sq = (GstSingleQueue *) gst_pad_get_element_private (pad); renatofilho@787: mq = sq->mqueue; renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id); renatofilho@787: renatofilho@787: if (active) { renatofilho@787: result = gst_single_queue_flush (mq, sq, FALSE); renatofilho@787: } else { renatofilho@787: result = gst_single_queue_flush (mq, sq, TRUE); renatofilho@787: /* make sure streaming finishes */ renatofilho@787: result |= gst_pad_stop_task (pad); renatofilho@787: } renatofilho@787: return result; renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps) renatofilho@787: { renatofilho@787: return TRUE; renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_multi_queue_src_event (GstPad * pad, GstEvent * event) renatofilho@787: { renatofilho@787: GstSingleQueue *sq = gst_pad_get_element_private (pad); renatofilho@787: renatofilho@787: return gst_pad_push_event (sq->sinkpad, event); renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: gst_multi_queue_src_query (GstPad * pad, GstQuery * query) renatofilho@787: { renatofilho@787: GstSingleQueue *sq = gst_pad_get_element_private (pad); renatofilho@787: GstPad *peerpad; renatofilho@787: gboolean res; renatofilho@787: renatofilho@787: /* FIXME, Handle position offset depending on queue size */ renatofilho@787: renatofilho@787: /* default handling */ renatofilho@787: if (!(peerpad = gst_pad_get_peer (sq->sinkpad))) renatofilho@787: goto no_peer; renatofilho@787: renatofilho@787: res = gst_pad_query (peerpad, query); renatofilho@787: renatofilho@787: gst_object_unref (peerpad); renatofilho@787: renatofilho@787: return res; renatofilho@787: renatofilho@787: /* ERRORS */ renatofilho@787: no_peer: renatofilho@787: { renatofilho@787: GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer"); renatofilho@787: return FALSE; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: /* renatofilho@787: * Next-non-linked functions renatofilho@787: */ renatofilho@787: renatofilho@787: /* WITH LOCK TAKEN */ renatofilho@787: static void renatofilho@787: wake_up_next_non_linked (GstMultiQueue * mq) renatofilho@787: { renatofilho@787: GList *tmp; renatofilho@787: renatofilho@787: /* maybe no-one is waiting */ renatofilho@787: if (mq->numwaiting < 1) renatofilho@787: return; renatofilho@787: renatofilho@787: /* Else figure out which singlequeue(s) need waking up */ renatofilho@787: for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { renatofilho@787: GstSingleQueue *sq = (GstSingleQueue *) tmp->data; renatofilho@787: renatofilho@787: if (sq->srcresult == GST_FLOW_NOT_LINKED) { renatofilho@787: if (sq->nextid != 0 && sq->nextid <= mq->highid) { renatofilho@787: GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); renatofilho@787: g_cond_signal (sq->turn); renatofilho@787: } renatofilho@787: } renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: /* WITH LOCK TAKEN */ renatofilho@787: static void renatofilho@787: compute_high_id (GstMultiQueue * mq) renatofilho@787: { renatofilho@787: /* The high-id is either the highest id among the linked pads, or if all renatofilho@787: * pads are not-linked, it's the lowest not-linked pad */ renatofilho@787: GList *tmp; renatofilho@787: guint32 lowest = G_MAXUINT32; renatofilho@787: guint32 highid = G_MAXUINT32; renatofilho@787: renatofilho@787: for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { renatofilho@787: GstSingleQueue *sq = (GstSingleQueue *) tmp->data; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", renatofilho@787: sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult)); renatofilho@787: renatofilho@787: if (sq->srcresult == GST_FLOW_NOT_LINKED) { renatofilho@787: /* No need to consider queues which are not waiting */ renatofilho@787: if (sq->nextid == 0) { renatofilho@787: GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); renatofilho@787: continue; renatofilho@787: } renatofilho@787: renatofilho@787: if (sq->nextid < lowest) renatofilho@787: lowest = sq->nextid; renatofilho@787: } else if (sq->srcresult != GST_FLOW_UNEXPECTED) { renatofilho@787: /* If we don't have a global highid, or the global highid is lower than renatofilho@787: * this single queue's last outputted id, store the queue's one, renatofilho@787: * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */ renatofilho@787: if ((highid == G_MAXUINT32) || (sq->oldid > highid)) renatofilho@787: highid = sq->oldid; renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: if (highid == G_MAXUINT32 || lowest < highid) renatofilho@787: mq->highid = lowest; renatofilho@787: else renatofilho@787: mq->highid = highid; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid, renatofilho@787: lowest); renatofilho@787: } renatofilho@787: renatofilho@787: #define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ renatofilho@787: (sq->max_size.format) <= (value)) renatofilho@787: renatofilho@787: /* renatofilho@787: * GstSingleQueue functions renatofilho@787: */ renatofilho@787: static void renatofilho@787: single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) renatofilho@787: { renatofilho@787: GstMultiQueue *mq = sq->mqueue; renatofilho@787: GList *tmp; renatofilho@787: GstDataQueueSize size; renatofilho@787: gboolean filled = FALSE; renatofilho@787: renatofilho@787: gst_data_queue_get_level (sq->queue, &size); renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { renatofilho@787: GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; renatofilho@787: GstDataQueueSize ssize; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); renatofilho@787: renatofilho@787: if (gst_data_queue_is_empty (ssq->queue)) { renatofilho@787: GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); renatofilho@787: if (IS_FILLED (visible, size.visible)) { renatofilho@787: sq->max_size.visible++; renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "Another queue is empty, bumping single queue %d max visible to %d", renatofilho@787: sq->id, sq->max_size.visible); renatofilho@787: } renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: goto beach; renatofilho@787: } renatofilho@787: /* check if we reached the hard time/bytes limits */ renatofilho@787: gst_data_queue_get_level (ssq->queue, &ssize); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" renatofilho@787: G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, renatofilho@787: ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); renatofilho@787: renatofilho@787: /* if this queue is filled completely we must signal overrun */ renatofilho@787: if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) { renatofilho@787: GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); renatofilho@787: filled = TRUE; renatofilho@787: } renatofilho@787: } renatofilho@787: /* no queues were empty */ renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: renatofilho@787: /* Overrun is always forwarded, since this is blocking the upstream element */ renatofilho@787: if (filled) { renatofilho@787: GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); renatofilho@787: g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); renatofilho@787: } renatofilho@787: renatofilho@787: beach: renatofilho@787: return; renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) renatofilho@787: { renatofilho@787: gboolean empty = TRUE; renatofilho@787: GstMultiQueue *mq = sq->mqueue; renatofilho@787: GList *tmp; renatofilho@787: renatofilho@787: GST_LOG_OBJECT (mq, renatofilho@787: "Single Queue %d is empty, Checking other single queues", sq->id); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mq); renatofilho@787: for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { renatofilho@787: GstSingleQueue *sq = (GstSingleQueue *) tmp->data; renatofilho@787: renatofilho@787: if (gst_data_queue_is_full (sq->queue)) { renatofilho@787: GstDataQueueSize size; renatofilho@787: renatofilho@787: gst_data_queue_get_level (sq->queue, &size); renatofilho@787: if (IS_FILLED (visible, size.visible)) { renatofilho@787: sq->max_size.visible++; renatofilho@787: GST_DEBUG_OBJECT (mq, renatofilho@787: "queue %d is filled, bumping its max visible to %d", sq->id, renatofilho@787: sq->max_size.visible); renatofilho@787: gst_data_queue_limits_changed (sq->queue); renatofilho@787: } renatofilho@787: } renatofilho@787: if (!gst_data_queue_is_empty (sq->queue)) renatofilho@787: empty = FALSE; renatofilho@787: } renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); renatofilho@787: renatofilho@787: if (empty) { renatofilho@787: GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); renatofilho@787: g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0); renatofilho@787: } renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, renatofilho@787: guint64 time, GstSingleQueue * sq) renatofilho@787: { renatofilho@787: gboolean res; renatofilho@787: renatofilho@787: GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT renatofilho@787: "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes, renatofilho@787: sq->max_size.bytes, sq->cur_time, sq->max_size.time); renatofilho@787: renatofilho@787: /* we are always filled on EOS */ renatofilho@787: if (sq->is_eos) renatofilho@787: return TRUE; renatofilho@787: renatofilho@787: /* we never go past the max visible items */ renatofilho@787: if (IS_FILLED (visible, visible)) renatofilho@787: return TRUE; renatofilho@787: renatofilho@787: if (sq->cur_time != 0) { renatofilho@787: /* if we have valid time in the queue, check */ renatofilho@787: res = IS_FILLED (time, sq->cur_time); renatofilho@787: } else { renatofilho@787: /* no valid time, check bytes */ renatofilho@787: res = IS_FILLED (bytes, bytes); renatofilho@787: } renatofilho@787: return res; renatofilho@787: } renatofilho@787: renatofilho@787: static void renatofilho@787: gst_single_queue_free (GstSingleQueue * sq) renatofilho@787: { renatofilho@787: /* DRAIN QUEUE */ renatofilho@787: gst_data_queue_flush (sq->queue); renatofilho@787: g_object_unref (sq->queue); renatofilho@787: g_cond_free (sq->turn); renatofilho@787: g_free (sq); renatofilho@787: } renatofilho@787: renatofilho@787: static GstSingleQueue * renatofilho@787: gst_single_queue_new (GstMultiQueue * mqueue) renatofilho@787: { renatofilho@787: GstSingleQueue *sq; renatofilho@787: gchar *tmp; renatofilho@787: renatofilho@787: sq = g_new0 (GstSingleQueue, 1); renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); renatofilho@787: sq->id = mqueue->nbqueues++; renatofilho@787: renatofilho@787: /* copy over max_size and extra_size so we don't need to take the lock renatofilho@787: * any longer when checking if the queue is full. */ renatofilho@787: sq->max_size.visible = mqueue->max_size.visible; renatofilho@787: sq->max_size.bytes = mqueue->max_size.bytes; renatofilho@787: sq->max_size.time = mqueue->max_size.time; renatofilho@787: renatofilho@787: sq->extra_size.visible = mqueue->extra_size.visible; renatofilho@787: sq->extra_size.bytes = mqueue->extra_size.bytes; renatofilho@787: sq->extra_size.time = mqueue->extra_size.time; renatofilho@787: renatofilho@787: GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); renatofilho@787: renatofilho@787: sq->mqueue = mqueue; renatofilho@787: sq->srcresult = GST_FLOW_WRONG_STATE; renatofilho@787: sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) renatofilho@787: single_queue_check_full, sq); renatofilho@787: sq->is_eos = FALSE; renatofilho@787: gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); renatofilho@787: gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); renatofilho@787: renatofilho@787: sq->nextid = 0; renatofilho@787: sq->oldid = 0; renatofilho@787: sq->turn = g_cond_new (); renatofilho@787: renatofilho@787: /* attach to underrun/overrun signals to handle non-starvation */ renatofilho@787: g_signal_connect (G_OBJECT (sq->queue), "full", renatofilho@787: G_CALLBACK (single_queue_overrun_cb), sq); renatofilho@787: g_signal_connect (G_OBJECT (sq->queue), "empty", renatofilho@787: G_CALLBACK (single_queue_underrun_cb), sq); renatofilho@787: renatofilho@787: tmp = g_strdup_printf ("sink%d", sq->id); renatofilho@787: sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); renatofilho@787: g_free (tmp); renatofilho@787: renatofilho@787: gst_pad_set_chain_function (sq->sinkpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_chain)); renatofilho@787: gst_pad_set_activatepush_function (sq->sinkpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push)); renatofilho@787: gst_pad_set_event_function (sq->sinkpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event)); renatofilho@787: gst_pad_set_getcaps_function (sq->sinkpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps)); renatofilho@787: gst_pad_set_bufferalloc_function (sq->sinkpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc)); renatofilho@787: gst_pad_set_internal_link_function (sq->sinkpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); renatofilho@787: renatofilho@787: tmp = g_strdup_printf ("src%d", sq->id); renatofilho@787: sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp); renatofilho@787: g_free (tmp); renatofilho@787: renatofilho@787: gst_pad_set_activatepush_function (sq->srcpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push)); renatofilho@787: gst_pad_set_acceptcaps_function (sq->srcpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps)); renatofilho@787: gst_pad_set_getcaps_function (sq->srcpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps)); renatofilho@787: gst_pad_set_event_function (sq->srcpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_src_event)); renatofilho@787: gst_pad_set_query_function (sq->srcpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_src_query)); renatofilho@787: gst_pad_set_internal_link_function (sq->srcpad, renatofilho@787: GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); renatofilho@787: renatofilho@787: gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); renatofilho@787: gst_pad_set_element_private (sq->srcpad, (gpointer) sq); renatofilho@787: renatofilho@787: gst_pad_set_active (sq->srcpad, TRUE); renatofilho@787: gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); renatofilho@787: renatofilho@787: gst_pad_set_active (sq->sinkpad, TRUE); renatofilho@787: gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); renatofilho@787: renatofilho@787: GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", renatofilho@787: sq->id); renatofilho@787: renatofilho@787: return sq; renatofilho@787: } renatofilho@787: renatofilho@787: static gboolean renatofilho@787: plugin_init (GstPlugin * plugin) renatofilho@787: { renatofilho@787: return gst_element_register (plugin, "multiqueue", GST_RANK_NONE, renatofilho@787: GST_TYPE_MULTI_QUEUE); renatofilho@787: } renatofilho@787: renatofilho@787: GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, renatofilho@787: GST_VERSION_MINOR, renatofilho@787: "multiqueue", renatofilho@787: "multiqueue", plugin_init, VERSION, GST_LICENSE, renatofilho@787: GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) renatofilho@787: