1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/gst-gmyth/multiqueue/gstmultiqueue.c Fri Sep 14 13:46:26 2007 +0100
1.3 @@ -0,0 +1,1397 @@
1.4 +/* GStreamer
1.5 + * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
1.6 + * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
1.7 + * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
1.8 + *
1.9 + * gstmultiqueue.c:
1.10 + *
1.11 + * This library is free software; you can redistribute it and/or
1.12 + * modify it under the terms of the GNU Library General Public
1.13 + * License as published by the Free Software Foundation; either
1.14 + * version 2 of the License, or (at your option) any later version.
1.15 + *
1.16 + * This library is distributed in the hope that it will be useful,
1.17 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.18 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1.19 + * Library General Public License for more details.
1.20 + *
1.21 + * You should have received a copy of the GNU Library General Public
1.22 + * License along with this library; if not, write to the
1.23 + * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
1.24 + * Boston, MA 02111-1307, USA.
1.25 + */
1.26 +
1.27 +#ifdef HAVE_CONFIG_H
1.28 +# include "config.h"
1.29 +#endif
1.30 +
1.31 +#include <gst/gst.h>
1.32 +#include "gstmultiqueue.h"
1.33 +
1.34 +/**
1.35 + * GstSingleQueue:
1.36 + * @sinkpad: associated sink #GstPad
1.37 + * @srcpad: associated source #GstPad
1.38 + *
1.39 + * Structure containing all information and properties about
1.40 + * a single queue.
1.41 + */
1.42 +typedef struct _GstSingleQueue GstSingleQueue;
1.43 +
1.44 +struct _GstSingleQueue
1.45 +{
1.46 + /* unique identifier of the queue */
1.47 + guint id;
1.48 +
1.49 + GstMultiQueue *mqueue;
1.50 +
1.51 + GstPad *sinkpad;
1.52 + GstPad *srcpad;
1.53 +
1.54 + /* flowreturn of previous srcpad push */
1.55 + GstFlowReturn srcresult;
1.56 + GstSegment sink_segment;
1.57 + GstSegment src_segment;
1.58 +
1.59 + /* queue of data */
1.60 + GstDataQueue *queue;
1.61 + GstDataQueueSize max_size, extra_size;
1.62 + GstClockTime cur_time;
1.63 + gboolean is_eos;
1.64 + gboolean inextra; /* TRUE if the queue is currently in extradata mode */
1.65 +
1.66 + /* Protected by global lock */
1.67 + guint32 nextid; /* ID of the next object waiting to be pushed */
1.68 + guint32 oldid; /* ID of the last object pushed (last in a series) */
1.69 + GCond *turn; /* SingleQueue turn waiting conditional */
1.70 +};
1.71 +
1.72 +
1.73 +/* Extension of GstDataQueueItem structure for our usage */
1.74 +typedef struct _GstMultiQueueItem GstMultiQueueItem;
1.75 +
1.76 +struct _GstMultiQueueItem
1.77 +{
1.78 + GstMiniObject *object;
1.79 + guint size;
1.80 + guint64 duration;
1.81 + gboolean visible;
1.82 +
1.83 + GDestroyNotify destroy;
1.84 + guint32 posid;
1.85 +};
1.86 +
1.87 +static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
1.88 +static void gst_single_queue_free (GstSingleQueue * squeue);
1.89 +
1.90 +static void wake_up_next_non_linked (GstMultiQueue * mq);
1.91 +static void compute_high_id (GstMultiQueue * mq);
1.92 +
1.93 +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
1.94 + GST_PAD_SINK,
1.95 + GST_PAD_REQUEST,
1.96 + GST_STATIC_CAPS_ANY);
1.97 +
1.98 +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
1.99 + GST_PAD_SRC,
1.100 + GST_PAD_SOMETIMES,
1.101 + GST_STATIC_CAPS_ANY);
1.102 +
1.103 +GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
1.104 +#define GST_CAT_DEFAULT (multi_queue_debug)
1.105 +
1.106 +/* default limits, we try to keep up to 2 seconds of data and if there is not
1.107 + * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
1.108 + * there is data in the queues. Normally, the byte and time limits are not hit
1.109 + * in theses conditions. */
1.110 +#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
1.111 +#define DEFAULT_MAX_SIZE_BUFFERS 5
1.112 +#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
1.113 +
1.114 +/* second limits. When we hit one of the above limits we are probably dealing
1.115 + * with a badly muxed file and we scale the limits to these emergency values.
1.116 + * This is currently not yet implemented. */
1.117 +#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
1.118 +#define DEFAULT_EXTRA_SIZE_BUFFERS 5
1.119 +#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
1.120 +
1.121 +/* Signals and args */
1.122 +enum
1.123 +{
1.124 + SIGNAL_UNDERRUN,
1.125 + SIGNAL_OVERRUN,
1.126 + LAST_SIGNAL
1.127 +};
1.128 +
1.129 +enum
1.130 +{
1.131 + ARG_0,
1.132 + ARG_EXTRA_SIZE_BYTES,
1.133 + ARG_EXTRA_SIZE_BUFFERS,
1.134 + ARG_EXTRA_SIZE_TIME,
1.135 + ARG_MAX_SIZE_BYTES,
1.136 + ARG_MAX_SIZE_BUFFERS,
1.137 + ARG_MAX_SIZE_TIME,
1.138 +};
1.139 +
1.140 +
1.141 +static const GstElementDetails gst_multiqueue_details =
1.142 +GST_ELEMENT_DETAILS ("MultiQueue",
1.143 + "Generic",
1.144 + "Multiple data queue",
1.145 + "Edward Hervey <edward@fluendo.com>");
1.146 +
1.147 +
1.148 +#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
1.149 + g_mutex_lock (q->qlock); \
1.150 +} G_STMT_END
1.151 +
1.152 +#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
1.153 + g_mutex_unlock (q->qlock); \
1.154 +} G_STMT_END
1.155 +
1.156 +static void gst_multi_queue_finalize (GObject * object);
1.157 +static void gst_multi_queue_set_property (GObject * object,
1.158 + guint prop_id, const GValue * value, GParamSpec * pspec);
1.159 +static void gst_multi_queue_get_property (GObject * object,
1.160 + guint prop_id, GValue * value, GParamSpec * pspec);
1.161 +
1.162 +static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
1.163 + GstPadTemplate * temp, const gchar * name);
1.164 +static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
1.165 +
1.166 +static void gst_multi_queue_loop (GstPad * pad);
1.167 +
1.168 +#define _do_init(bla) \
1.169 + GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
1.170 +
1.171 +GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
1.172 + GST_TYPE_ELEMENT, _do_init);
1.173 +
1.174 +static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
1.175 +
1.176 +
1.177 +
1.178 +static void
1.179 +gst_multi_queue_base_init (gpointer g_class)
1.180 +{
1.181 + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
1.182 +
1.183 + gst_element_class_set_details (gstelement_class, &gst_multiqueue_details);
1.184 + gst_element_class_add_pad_template (gstelement_class,
1.185 + gst_static_pad_template_get (&sinktemplate));
1.186 + gst_element_class_add_pad_template (gstelement_class,
1.187 + gst_static_pad_template_get (&srctemplate));
1.188 +}
1.189 +
1.190 +static void
1.191 +gst_multi_queue_class_init (GstMultiQueueClass * klass)
1.192 +{
1.193 + GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1.194 + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1.195 +
1.196 + gobject_class->set_property =
1.197 + GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
1.198 + gobject_class->get_property =
1.199 + GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
1.200 +
1.201 + /* SIGNALS */
1.202 + gst_multi_queue_signals[SIGNAL_UNDERRUN] =
1.203 + g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
1.204 + G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
1.205 + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
1.206 +
1.207 + gst_multi_queue_signals[SIGNAL_OVERRUN] =
1.208 + g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
1.209 + G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
1.210 + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
1.211 +
1.212 + /* PROPERTIES */
1.213 +
1.214 + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
1.215 + g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
1.216 + "Max. amount of data in the queue (bytes, 0=disable)",
1.217 + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
1.218 + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
1.219 + g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
1.220 + "Max. number of buffers in the queue (0=disable)",
1.221 + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
1.222 + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
1.223 + g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
1.224 + "Max. amount of data in the queue (in ns, 0=disable)",
1.225 + 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
1.226 +
1.227 + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
1.228 + g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
1.229 + "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
1.230 + 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
1.231 + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
1.232 + g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
1.233 + "Amount of buffers the queues can grow if one of them is empty (0=disable)",
1.234 + 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
1.235 + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
1.236 + g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
1.237 + "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
1.238 + 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
1.239 +
1.240 + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
1.241 +
1.242 + gstelement_class->request_new_pad =
1.243 + GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
1.244 + gstelement_class->release_pad =
1.245 + GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
1.246 +}
1.247 +
1.248 +static void
1.249 +gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
1.250 +{
1.251 + mqueue->nbqueues = 0;
1.252 + mqueue->queues = NULL;
1.253 +
1.254 + mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
1.255 + mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
1.256 + mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
1.257 +
1.258 + mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
1.259 + mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
1.260 + mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
1.261 +
1.262 + mqueue->counter = 1;
1.263 + mqueue->highid = -1;
1.264 + mqueue->nextnotlinked = -1;
1.265 +
1.266 + mqueue->qlock = g_mutex_new ();
1.267 +}
1.268 +
1.269 +static void
1.270 +gst_multi_queue_finalize (GObject * object)
1.271 +{
1.272 + GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
1.273 +
1.274 + g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
1.275 + g_list_free (mqueue->queues);
1.276 + mqueue->queues = NULL;
1.277 +
1.278 + /* free/unref instance data */
1.279 + g_mutex_free (mqueue->qlock);
1.280 +
1.281 + G_OBJECT_CLASS (parent_class)->finalize (object);
1.282 +}
1.283 +
1.284 +
1.285 +
1.286 +#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
1.287 + GList * tmp = mq->queues; \
1.288 + while (tmp) { \
1.289 + GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
1.290 + q->max_size.format = mq->max_size.format; \
1.291 + tmp = g_list_next(tmp); \
1.292 + }; \
1.293 +} G_STMT_END
1.294 +
1.295 +static void
1.296 +gst_multi_queue_set_property (GObject * object, guint prop_id,
1.297 + const GValue * value, GParamSpec * pspec)
1.298 +{
1.299 + GstMultiQueue *mq = GST_MULTI_QUEUE (object);
1.300 +
1.301 + switch (prop_id) {
1.302 + case ARG_MAX_SIZE_BYTES:
1.303 + mq->max_size.bytes = g_value_get_uint (value);
1.304 + SET_CHILD_PROPERTY (mq, bytes);
1.305 + break;
1.306 + case ARG_MAX_SIZE_BUFFERS:
1.307 + mq->max_size.visible = g_value_get_uint (value);
1.308 + SET_CHILD_PROPERTY (mq, visible);
1.309 + break;
1.310 + case ARG_MAX_SIZE_TIME:
1.311 + mq->max_size.time = g_value_get_uint64 (value);
1.312 + SET_CHILD_PROPERTY (mq, time);
1.313 + break;
1.314 + case ARG_EXTRA_SIZE_BYTES:
1.315 + mq->extra_size.bytes = g_value_get_uint (value);
1.316 + break;
1.317 + case ARG_EXTRA_SIZE_BUFFERS:
1.318 + mq->extra_size.visible = g_value_get_uint (value);
1.319 + break;
1.320 + case ARG_EXTRA_SIZE_TIME:
1.321 + mq->extra_size.time = g_value_get_uint64 (value);
1.322 + break;
1.323 + default:
1.324 + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1.325 + break;
1.326 + }
1.327 +}
1.328 +
1.329 +static void
1.330 +gst_multi_queue_get_property (GObject * object, guint prop_id,
1.331 + GValue * value, GParamSpec * pspec)
1.332 +{
1.333 + GstMultiQueue *mq = GST_MULTI_QUEUE (object);
1.334 +
1.335 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.336 +
1.337 + switch (prop_id) {
1.338 + case ARG_EXTRA_SIZE_BYTES:
1.339 + g_value_set_uint (value, mq->extra_size.bytes);
1.340 + break;
1.341 + case ARG_EXTRA_SIZE_BUFFERS:
1.342 + g_value_set_uint (value, mq->extra_size.visible);
1.343 + break;
1.344 + case ARG_EXTRA_SIZE_TIME:
1.345 + g_value_set_uint64 (value, mq->extra_size.time);
1.346 + break;
1.347 + case ARG_MAX_SIZE_BYTES:
1.348 + g_value_set_uint (value, mq->max_size.bytes);
1.349 + break;
1.350 + case ARG_MAX_SIZE_BUFFERS:
1.351 + g_value_set_uint (value, mq->max_size.visible);
1.352 + break;
1.353 + case ARG_MAX_SIZE_TIME:
1.354 + g_value_set_uint64 (value, mq->max_size.time);
1.355 + break;
1.356 + default:
1.357 + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1.358 + break;
1.359 + }
1.360 +
1.361 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.362 +}
1.363 +
1.364 +GList *
1.365 +gst_multi_queue_get_internal_links (GstPad * pad)
1.366 +{
1.367 + GList *res = NULL;
1.368 + GstMultiQueue *mqueue;
1.369 + GstSingleQueue *sq = NULL;
1.370 + GList *tmp;
1.371 +
1.372 + g_return_val_if_fail (GST_IS_PAD (pad), NULL);
1.373 +
1.374 + mqueue = GST_MULTI_QUEUE (GST_PAD_PARENT (pad));
1.375 + if (!mqueue)
1.376 + goto no_parent;
1.377 +
1.378 + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1.379 + /* Find which single queue it belongs to */
1.380 + for (tmp = mqueue->queues; tmp && !res; tmp = g_list_next (tmp)) {
1.381 + sq = (GstSingleQueue *) tmp->data;
1.382 +
1.383 + if (sq->sinkpad == pad)
1.384 + res = g_list_prepend (res, sq->srcpad);
1.385 + if (sq->srcpad == pad)
1.386 + res = g_list_prepend (res, sq->sinkpad);
1.387 + }
1.388 +
1.389 + if (!res)
1.390 + GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
1.391 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1.392 +
1.393 + return res;
1.394 +
1.395 +no_parent:
1.396 + {
1.397 + GST_DEBUG_OBJECT (pad, "no parent");
1.398 + return NULL;
1.399 + }
1.400 +}
1.401 +
1.402 +
1.403 +/*
1.404 + * GstElement methods
1.405 + */
1.406 +
1.407 +static GstPad *
1.408 +gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
1.409 + const gchar * name)
1.410 +{
1.411 + GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
1.412 + GstSingleQueue *squeue;
1.413 +
1.414 + GST_LOG_OBJECT (element, "name : %s", name);
1.415 +
1.416 + /* Create a new single queue, add the sink and source pad and return the sink pad */
1.417 + squeue = gst_single_queue_new (mqueue);
1.418 +
1.419 + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1.420 + mqueue->queues = g_list_append (mqueue->queues, squeue);
1.421 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1.422 +
1.423 + /*
1.424 + GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
1.425 + GST_DEBUG_PAD_NAME (squeue->sinkpad));
1.426 + */
1.427 + return squeue->sinkpad;
1.428 +}
1.429 +
1.430 +static void
1.431 +gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
1.432 +{
1.433 + GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
1.434 + GstSingleQueue *sq = NULL;
1.435 + GList *tmp;
1.436 +
1.437 +// GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1.438 +
1.439 + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1.440 + /* Find which single queue it belongs to, knowing that it should be a sinkpad */
1.441 + for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
1.442 + sq = (GstSingleQueue *) tmp->data;
1.443 +
1.444 + if (sq->sinkpad == pad)
1.445 + break;
1.446 + }
1.447 +
1.448 + if (!tmp) {
1.449 + GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
1.450 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1.451 + return;
1.452 + }
1.453 +
1.454 + /* FIXME: The removal of the singlequeue should probably not happen until it
1.455 + * finishes draining */
1.456 +
1.457 + /* remove it from the list */
1.458 + mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
1.459 +
1.460 + /* FIXME : recompute next-non-linked */
1.461 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1.462 +
1.463 + /* delete SingleQueue */
1.464 + gst_data_queue_set_flushing (sq->queue, TRUE);
1.465 +
1.466 + gst_pad_set_active (sq->srcpad, FALSE);
1.467 + gst_pad_set_active (sq->sinkpad, FALSE);
1.468 + gst_element_remove_pad (element, sq->srcpad);
1.469 + gst_element_remove_pad (element, sq->sinkpad);
1.470 + gst_single_queue_free (sq);
1.471 +}
1.472 +
1.473 +static gboolean
1.474 +gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
1.475 +{
1.476 + gboolean result;
1.477 +
1.478 + GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
1.479 + sq->id);
1.480 +
1.481 + if (flush) {
1.482 + sq->srcresult = GST_FLOW_WRONG_STATE;
1.483 + gst_data_queue_set_flushing (sq->queue, TRUE);
1.484 +
1.485 + /* wake up non-linked task */
1.486 + GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
1.487 + sq->id);
1.488 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.489 + g_cond_signal (sq->turn);
1.490 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.491 +
1.492 + GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
1.493 + result = gst_pad_pause_task (sq->srcpad);
1.494 + } else {
1.495 + gst_data_queue_flush (sq->queue);
1.496 + gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1.497 + gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1.498 + /* All pads start off not-linked for a smooth kick-off */
1.499 + sq->srcresult = GST_FLOW_NOT_LINKED;
1.500 + sq->cur_time = 0;
1.501 + sq->max_size.visible = mq->max_size.visible;
1.502 + sq->is_eos = FALSE;
1.503 + sq->inextra = FALSE;
1.504 + sq->nextid = 0;
1.505 + sq->oldid = 0;
1.506 + gst_data_queue_set_flushing (sq->queue, FALSE);
1.507 +
1.508 + GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
1.509 + result =
1.510 + gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
1.511 + sq->srcpad);
1.512 + }
1.513 + return result;
1.514 +}
1.515 +
1.516 +/* calculate the diff between running time on the sink and src of the queue.
1.517 + * This is the total amount of time in the queue.
1.518 + * WITH LOCK TAKEN */
1.519 +static void
1.520 +update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
1.521 +{
1.522 + gint64 sink_time, src_time;
1.523 +
1.524 + sink_time =
1.525 + gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
1.526 + sq->sink_segment.last_stop);
1.527 +
1.528 + src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
1.529 + sq->src_segment.last_stop);
1.530 +
1.531 + GST_DEBUG_OBJECT (mq,
1.532 + "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
1.533 + GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
1.534 +
1.535 + /* This allows for streams with out of order timestamping - sometimes the
1.536 + * emerging timestamp is later than the arriving one(s) */
1.537 + if (sink_time >= src_time)
1.538 + sq->cur_time = sink_time - src_time;
1.539 + else
1.540 + sq->cur_time = 0;
1.541 +}
1.542 +
1.543 +/* take a NEWSEGMENT event and apply the values to segment, updating the time
1.544 + * level of queue. */
1.545 +static void
1.546 +apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1.547 + GstSegment * segment)
1.548 +{
1.549 + gboolean update;
1.550 + GstFormat format;
1.551 + gdouble rate, arate;
1.552 + gint64 start, stop, time;
1.553 +
1.554 + gst_event_parse_new_segment_full (event, &update, &rate, &arate,
1.555 + &format, &start, &stop, &time);
1.556 +
1.557 + /* now configure the values, we use these to track timestamps on the
1.558 + * sinkpad. */
1.559 + if (format != GST_FORMAT_TIME) {
1.560 + /* non-time format, pretent the current time segment is closed with a
1.561 + * 0 start and unknown stop time. */
1.562 + update = FALSE;
1.563 + format = GST_FORMAT_TIME;
1.564 + start = 0;
1.565 + stop = -1;
1.566 + time = 0;
1.567 + }
1.568 +
1.569 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.570 +
1.571 + gst_segment_set_newsegment_full (segment, update,
1.572 + rate, arate, format, start, stop, time);
1.573 +
1.574 + GST_DEBUG_OBJECT (mq,
1.575 + "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
1.576 +
1.577 + /* segment can update the time level of the queue */
1.578 + update_time_level (mq, sq);
1.579 +
1.580 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.581 +}
1.582 +
1.583 +/* take a buffer and update segment, updating the time level of the queue. */
1.584 +static void
1.585 +apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
1.586 + GstClockTime duration, GstSegment * segment)
1.587 +{
1.588 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.589 +
1.590 + /* if no timestamp is set, assume it's continuous with the previous
1.591 + * time */
1.592 + if (timestamp == GST_CLOCK_TIME_NONE)
1.593 + timestamp = segment->last_stop;
1.594 +
1.595 + /* add duration */
1.596 + if (duration != GST_CLOCK_TIME_NONE)
1.597 + timestamp += duration;
1.598 +
1.599 + GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
1.600 + sq->id, GST_TIME_ARGS (timestamp));
1.601 +
1.602 + gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
1.603 +
1.604 + /* calc diff with other end */
1.605 + update_time_level (mq, sq);
1.606 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.607 +}
1.608 +
1.609 +static GstFlowReturn
1.610 +gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
1.611 + GstMiniObject * object)
1.612 +{
1.613 + GstFlowReturn result = GST_FLOW_OK;
1.614 +
1.615 + if (GST_IS_BUFFER (object)) {
1.616 + GstBuffer *buffer;
1.617 + GstClockTime timestamp, duration;
1.618 +
1.619 + buffer = GST_BUFFER_CAST (object);
1.620 + timestamp = GST_BUFFER_TIMESTAMP (buffer);
1.621 + duration = GST_BUFFER_DURATION (buffer);
1.622 +
1.623 + apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1.624 +
1.625 + /* Applying the buffer may have made the queue non-full again, unblock it if needed */
1.626 + gst_data_queue_limits_changed (sq->queue);
1.627 +
1.628 + GST_DEBUG_OBJECT (mq,
1.629 + "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
1.630 + sq->id, buffer, GST_TIME_ARGS (timestamp));
1.631 +
1.632 + result = gst_pad_push (sq->srcpad, buffer);
1.633 + } else if (GST_IS_EVENT (object)) {
1.634 + GstEvent *event;
1.635 +
1.636 + event = GST_EVENT_CAST (object);
1.637 +
1.638 + switch (GST_EVENT_TYPE (event)) {
1.639 + case GST_EVENT_EOS:
1.640 + result = GST_FLOW_UNEXPECTED;
1.641 + break;
1.642 + case GST_EVENT_NEWSEGMENT:
1.643 + apply_segment (mq, sq, event, &sq->src_segment);
1.644 + /* Applying the segment may have made the queue non-full again, unblock it if needed */
1.645 + gst_data_queue_limits_changed (sq->queue);
1.646 + break;
1.647 + default:
1.648 + break;
1.649 + }
1.650 +
1.651 + GST_DEBUG_OBJECT (mq,
1.652 + "SingleQueue %d : Pushing event %p of type %s",
1.653 + sq->id, event, GST_EVENT_TYPE_NAME (event));
1.654 +
1.655 + gst_pad_push_event (sq->srcpad, event);
1.656 + } else {
1.657 + g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
1.658 + sq->id);
1.659 + }
1.660 + return result;
1.661 +
1.662 + /* ERRORS */
1.663 +}
1.664 +
1.665 +static GstMiniObject *
1.666 +gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
1.667 +{
1.668 + GstMiniObject *res;
1.669 +
1.670 + res = item->object;
1.671 + item->object = NULL;
1.672 +
1.673 + return res;
1.674 +}
1.675 +
1.676 +static void
1.677 +gst_multi_queue_item_destroy (GstMultiQueueItem * item)
1.678 +{
1.679 + if (item->object)
1.680 + gst_mini_object_unref (item->object);
1.681 + g_free (item);
1.682 +}
1.683 +
1.684 +/* takes ownership of passed mini object! */
1.685 +static GstMultiQueueItem *
1.686 +gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
1.687 +{
1.688 + GstMultiQueueItem *item;
1.689 +
1.690 + item = g_new (GstMultiQueueItem, 1);
1.691 + item->object = object;
1.692 + item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1.693 + item->posid = curid;
1.694 +
1.695 + if (GST_IS_BUFFER (object)) {
1.696 + item->size = GST_BUFFER_SIZE (object);
1.697 + item->duration = GST_BUFFER_DURATION (object);
1.698 + if (item->duration == GST_CLOCK_TIME_NONE)
1.699 + item->duration = 0;
1.700 + item->visible = TRUE;
1.701 + } else {
1.702 + item->size = 0;
1.703 + item->duration = 0;
1.704 + item->visible = FALSE;
1.705 + }
1.706 + return item;
1.707 +}
1.708 +
1.709 +/* Each main loop attempts to push buffers until the return value
1.710 + * is not-linked. not-linked pads are not allowed to push data beyond
1.711 + * any linked pads, so they don't 'rush ahead of the pack'.
1.712 + */
1.713 +static void
1.714 +gst_multi_queue_loop (GstPad * pad)
1.715 +{
1.716 + GstSingleQueue *sq;
1.717 + GstMultiQueueItem *item;
1.718 + GstDataQueueItem *sitem;
1.719 + GstMultiQueue *mq;
1.720 + GstMiniObject *object;
1.721 + guint32 newid;
1.722 + guint32 oldid = -1;
1.723 + GstFlowReturn result;
1.724 +
1.725 + sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1.726 + mq = sq->mqueue;
1.727 +
1.728 + do {
1.729 + GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
1.730 +
1.731 + /* Get something from the queue, blocking until that happens, or we get
1.732 + * flushed */
1.733 + if (!(gst_data_queue_pop (sq->queue, &sitem)))
1.734 + goto out_flushing;
1.735 +
1.736 + item = (GstMultiQueueItem *) sitem;
1.737 + newid = item->posid;
1.738 +
1.739 + /* steal the object and destroy the item */
1.740 + object = gst_multi_queue_item_steal_object (item);
1.741 + gst_multi_queue_item_destroy (item);
1.742 +
1.743 + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
1.744 + sq->id, newid, oldid);
1.745 +
1.746 + /* If we're not-linked, we do some extra work because we might need to
1.747 + * wait before pushing. If we're linked but there's a gap in the IDs,
1.748 + * or it's the first loop, or we just passed the previous highid,
1.749 + * we might need to wake some sleeping pad up, so there's extra work
1.750 + * there too */
1.751 + if (sq->srcresult == GST_FLOW_NOT_LINKED ||
1.752 + (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) {
1.753 + GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
1.754 + gst_flow_get_name (sq->srcresult));
1.755 +
1.756 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.757 +
1.758 + /* Update the nextid so other threads know when to wake us up */
1.759 + sq->nextid = newid;
1.760 +
1.761 + /* Update the oldid (the last ID we output) for highid tracking */
1.762 + if (oldid != -1)
1.763 + sq->oldid = oldid;
1.764 +
1.765 + if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1.766 + /* Go to sleep until it's time to push this buffer */
1.767 +
1.768 + /* Recompute the highid */
1.769 + compute_high_id (mq);
1.770 + while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
1.771 + GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
1.772 + "newid %u and highid %u", sq->id, newid, mq->highid);
1.773 +
1.774 +
1.775 + /* Wake up all non-linked pads before we sleep */
1.776 + wake_up_next_non_linked (mq);
1.777 +
1.778 + mq->numwaiting++;
1.779 + g_cond_wait (sq->turn, mq->qlock);
1.780 + mq->numwaiting--;
1.781 +
1.782 + GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
1.783 + "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
1.784 + }
1.785 +
1.786 + /* Re-compute the high_id in case someone else pushed */
1.787 + compute_high_id (mq);
1.788 + } else {
1.789 + compute_high_id (mq);
1.790 + /* Wake up all non-linked pads */
1.791 + wake_up_next_non_linked (mq);
1.792 + }
1.793 + /* We're done waiting, we can clear the nextid */
1.794 + sq->nextid = 0;
1.795 +
1.796 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.797 + }
1.798 +
1.799 + GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
1.800 + gst_flow_get_name (sq->srcresult));
1.801 +
1.802 + /* Try to push out the new object */
1.803 + result = gst_single_queue_push_one (mq, sq, object);
1.804 + sq->srcresult = result;
1.805 +
1.806 + if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
1.807 + goto out_flushing;
1.808 +
1.809 + GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
1.810 + gst_flow_get_name (sq->srcresult));
1.811 +
1.812 + oldid = newid;
1.813 + }
1.814 + while (TRUE);
1.815 +
1.816 +out_flushing:
1.817 + {
1.818 + /* Need to make sure wake up any sleeping pads when we exit */
1.819 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.820 + compute_high_id (mq);
1.821 + wake_up_next_non_linked (mq);
1.822 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.823 +
1.824 + gst_data_queue_set_flushing (sq->queue, TRUE);
1.825 + gst_pad_pause_task (sq->srcpad);
1.826 + GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
1.827 + "SingleQueue[%d] task paused, reason:%s",
1.828 + sq->id, gst_flow_get_name (sq->srcresult));
1.829 + return;
1.830 + }
1.831 +}
1.832 +
1.833 +/**
1.834 + * gst_multi_queue_chain:
1.835 + *
1.836 + * This is similar to GstQueue's chain function, except:
1.837 + * _ we don't have leak behavioures,
1.838 + * _ we push with a unique id (curid)
1.839 + */
1.840 +static GstFlowReturn
1.841 +gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
1.842 +{
1.843 + GstSingleQueue *sq;
1.844 + GstMultiQueue *mq;
1.845 + GstMultiQueueItem *item;
1.846 + GstFlowReturn ret = GST_FLOW_OK;
1.847 + guint32 curid;
1.848 + GstClockTime timestamp, duration;
1.849 +
1.850 + sq = gst_pad_get_element_private (pad);
1.851 + mq = (GstMultiQueue *) gst_pad_get_parent (pad);
1.852 +
1.853 + /* Get a unique incrementing id */
1.854 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.855 + curid = mq->counter++;
1.856 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.857 +
1.858 + GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
1.859 + sq->id, buffer, curid);
1.860 +
1.861 + item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
1.862 +
1.863 + timestamp = GST_BUFFER_TIMESTAMP (buffer);
1.864 + duration = GST_BUFFER_DURATION (buffer);
1.865 +
1.866 + if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
1.867 + goto flushing;
1.868 +
1.869 + /* update time level, we must do this after pushing the data in the queue so
1.870 + * that we never end up filling the queue first. */
1.871 + apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
1.872 +
1.873 +done:
1.874 + gst_object_unref (mq);
1.875 +
1.876 + return ret;
1.877 +
1.878 + /* ERRORS */
1.879 +flushing:
1.880 + {
1.881 + ret = sq->srcresult;
1.882 + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
1.883 + sq->id, gst_flow_get_name (ret));
1.884 + gst_multi_queue_item_destroy (item);
1.885 + goto done;
1.886 + }
1.887 +}
1.888 +
1.889 +static gboolean
1.890 +gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
1.891 +{
1.892 + GstSingleQueue *sq;
1.893 +
1.894 + sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1.895 +
1.896 + if (active) {
1.897 + /* All pads start off not-linked for a smooth kick-off */
1.898 + sq->srcresult = GST_FLOW_NOT_LINKED;
1.899 + } else {
1.900 + sq->srcresult = GST_FLOW_WRONG_STATE;
1.901 + gst_data_queue_flush (sq->queue);
1.902 + }
1.903 + return TRUE;
1.904 +}
1.905 +
1.906 +static gboolean
1.907 +gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
1.908 +{
1.909 + GstSingleQueue *sq;
1.910 + GstMultiQueue *mq;
1.911 + guint32 curid;
1.912 + GstMultiQueueItem *item;
1.913 + gboolean res;
1.914 + GstEventType type;
1.915 + GstEvent *sref = NULL;
1.916 +
1.917 + sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1.918 + mq = (GstMultiQueue *) gst_pad_get_parent (pad);
1.919 +
1.920 + type = GST_EVENT_TYPE (event);
1.921 +
1.922 + switch (type) {
1.923 + case GST_EVENT_FLUSH_START:
1.924 + GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
1.925 + sq->id);
1.926 +
1.927 + res = gst_pad_push_event (sq->srcpad, event);
1.928 +
1.929 + gst_single_queue_flush (mq, sq, TRUE);
1.930 + goto done;
1.931 +
1.932 + case GST_EVENT_FLUSH_STOP:
1.933 + GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
1.934 + sq->id);
1.935 +
1.936 + res = gst_pad_push_event (sq->srcpad, event);
1.937 +
1.938 + gst_single_queue_flush (mq, sq, FALSE);
1.939 + goto done;
1.940 + case GST_EVENT_NEWSEGMENT:
1.941 + /* take ref because the queue will take ownership and we need the event
1.942 + * afterwards to update the segment */
1.943 + sref = gst_event_ref (event);
1.944 + break;
1.945 +
1.946 + default:
1.947 + if (!(GST_EVENT_IS_SERIALIZED (event))) {
1.948 + res = gst_pad_push_event (sq->srcpad, event);
1.949 + goto done;
1.950 + }
1.951 + break;
1.952 + }
1.953 +
1.954 + /* Get an unique incrementing id */
1.955 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.956 + curid = mq->counter++;
1.957 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.958 +
1.959 + item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
1.960 +
1.961 + GST_DEBUG_OBJECT (mq,
1.962 + "SingleQueue %d : Enqueuing event %p of type %s with id %d",
1.963 + sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
1.964 +
1.965 + if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
1.966 + goto flushing;
1.967 +
1.968 + /* mark EOS when we received one, we must do that after putting the
1.969 + * buffer in the queue because EOS marks the buffer as filled. No need to take
1.970 + * a lock, the _check_full happens from this thread only, right before pushing
1.971 + * into dataqueue. */
1.972 + switch (type) {
1.973 + case GST_EVENT_EOS:
1.974 + sq->is_eos = TRUE;
1.975 + break;
1.976 + case GST_EVENT_NEWSEGMENT:
1.977 + apply_segment (mq, sq, sref, &sq->sink_segment);
1.978 + gst_event_unref (sref);
1.979 + break;
1.980 + default:
1.981 + break;
1.982 + }
1.983 +done:
1.984 + gst_object_unref (mq);
1.985 + return res;
1.986 +
1.987 +flushing:
1.988 + {
1.989 + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
1.990 + sq->id, gst_flow_get_name (sq->srcresult));
1.991 + if (sref)
1.992 + gst_event_unref (sref);
1.993 + gst_multi_queue_item_destroy (item);
1.994 + goto done;
1.995 + }
1.996 +}
1.997 +
1.998 +static GstCaps *
1.999 +gst_multi_queue_getcaps (GstPad * pad)
1.1000 +{
1.1001 + GstSingleQueue *sq = gst_pad_get_element_private (pad);
1.1002 + GstPad *otherpad;
1.1003 + GstCaps *result;
1.1004 +
1.1005 + otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
1.1006 +
1.1007 + GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
1.1008 +
1.1009 + result = gst_pad_peer_get_caps (otherpad);
1.1010 + if (result == NULL)
1.1011 + result = gst_caps_new_any ();
1.1012 +
1.1013 + return result;
1.1014 +}
1.1015 +
1.1016 +static GstFlowReturn
1.1017 +gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
1.1018 + GstCaps * caps, GstBuffer ** buf)
1.1019 +{
1.1020 + GstSingleQueue *sq = gst_pad_get_element_private (pad);
1.1021 +
1.1022 + return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
1.1023 +}
1.1024 +
1.1025 +static gboolean
1.1026 +gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
1.1027 +{
1.1028 + GstMultiQueue *mq;
1.1029 + GstSingleQueue *sq;
1.1030 + gboolean result = FALSE;
1.1031 +
1.1032 + sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1.1033 + mq = sq->mqueue;
1.1034 +
1.1035 + GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
1.1036 +
1.1037 + if (active) {
1.1038 + result = gst_single_queue_flush (mq, sq, FALSE);
1.1039 + } else {
1.1040 + result = gst_single_queue_flush (mq, sq, TRUE);
1.1041 + /* make sure streaming finishes */
1.1042 + result |= gst_pad_stop_task (pad);
1.1043 + }
1.1044 + return result;
1.1045 +}
1.1046 +
1.1047 +static gboolean
1.1048 +gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
1.1049 +{
1.1050 + return TRUE;
1.1051 +}
1.1052 +
1.1053 +static gboolean
1.1054 +gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
1.1055 +{
1.1056 + GstSingleQueue *sq = gst_pad_get_element_private (pad);
1.1057 +
1.1058 + return gst_pad_push_event (sq->sinkpad, event);
1.1059 +}
1.1060 +
1.1061 +static gboolean
1.1062 +gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
1.1063 +{
1.1064 + GstSingleQueue *sq = gst_pad_get_element_private (pad);
1.1065 + GstPad *peerpad;
1.1066 + gboolean res;
1.1067 +
1.1068 + /* FIXME, Handle position offset depending on queue size */
1.1069 +
1.1070 + /* default handling */
1.1071 + if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
1.1072 + goto no_peer;
1.1073 +
1.1074 + res = gst_pad_query (peerpad, query);
1.1075 +
1.1076 + gst_object_unref (peerpad);
1.1077 +
1.1078 + return res;
1.1079 +
1.1080 + /* ERRORS */
1.1081 +no_peer:
1.1082 + {
1.1083 + GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
1.1084 + return FALSE;
1.1085 + }
1.1086 +}
1.1087 +
1.1088 +/*
1.1089 + * Next-non-linked functions
1.1090 + */
1.1091 +
1.1092 +/* WITH LOCK TAKEN */
1.1093 +static void
1.1094 +wake_up_next_non_linked (GstMultiQueue * mq)
1.1095 +{
1.1096 + GList *tmp;
1.1097 +
1.1098 + /* maybe no-one is waiting */
1.1099 + if (mq->numwaiting < 1)
1.1100 + return;
1.1101 +
1.1102 + /* Else figure out which singlequeue(s) need waking up */
1.1103 + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1.1104 + GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1.1105 +
1.1106 + if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1.1107 + if (sq->nextid != 0 && sq->nextid <= mq->highid) {
1.1108 + GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
1.1109 + g_cond_signal (sq->turn);
1.1110 + }
1.1111 + }
1.1112 + }
1.1113 +}
1.1114 +
1.1115 +/* WITH LOCK TAKEN */
1.1116 +static void
1.1117 +compute_high_id (GstMultiQueue * mq)
1.1118 +{
1.1119 + /* The high-id is either the highest id among the linked pads, or if all
1.1120 + * pads are not-linked, it's the lowest not-linked pad */
1.1121 + GList *tmp;
1.1122 + guint32 lowest = G_MAXUINT32;
1.1123 + guint32 highid = G_MAXUINT32;
1.1124 +
1.1125 + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1.1126 + GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1.1127 +
1.1128 + GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
1.1129 + sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
1.1130 +
1.1131 + if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1.1132 + /* No need to consider queues which are not waiting */
1.1133 + if (sq->nextid == 0) {
1.1134 + GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
1.1135 + continue;
1.1136 + }
1.1137 +
1.1138 + if (sq->nextid < lowest)
1.1139 + lowest = sq->nextid;
1.1140 + } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
1.1141 + /* If we don't have a global highid, or the global highid is lower than
1.1142 + * this single queue's last outputted id, store the queue's one,
1.1143 + * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
1.1144 + if ((highid == G_MAXUINT32) || (sq->oldid > highid))
1.1145 + highid = sq->oldid;
1.1146 + }
1.1147 + }
1.1148 +
1.1149 + if (highid == G_MAXUINT32 || lowest < highid)
1.1150 + mq->highid = lowest;
1.1151 + else
1.1152 + mq->highid = highid;
1.1153 +
1.1154 + GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
1.1155 + lowest);
1.1156 +}
1.1157 +
1.1158 +#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
1.1159 + (sq->max_size.format) <= (value))
1.1160 +
1.1161 +/*
1.1162 + * GstSingleQueue functions
1.1163 + */
1.1164 +static void
1.1165 +single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1.1166 +{
1.1167 + GstMultiQueue *mq = sq->mqueue;
1.1168 + GList *tmp;
1.1169 + GstDataQueueSize size;
1.1170 + gboolean filled = FALSE;
1.1171 +
1.1172 + gst_data_queue_get_level (sq->queue, &size);
1.1173 +
1.1174 + GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
1.1175 +
1.1176 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.1177 + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1.1178 + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
1.1179 + GstDataQueueSize ssize;
1.1180 +
1.1181 + GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
1.1182 +
1.1183 + if (gst_data_queue_is_empty (ssq->queue)) {
1.1184 + GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
1.1185 + if (IS_FILLED (visible, size.visible)) {
1.1186 + sq->max_size.visible++;
1.1187 + GST_DEBUG_OBJECT (mq,
1.1188 + "Another queue is empty, bumping single queue %d max visible to %d",
1.1189 + sq->id, sq->max_size.visible);
1.1190 + }
1.1191 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.1192 + goto beach;
1.1193 + }
1.1194 + /* check if we reached the hard time/bytes limits */
1.1195 + gst_data_queue_get_level (ssq->queue, &ssize);
1.1196 +
1.1197 + GST_DEBUG_OBJECT (mq,
1.1198 + "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
1.1199 + G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
1.1200 + ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1.1201 +
1.1202 + /* if this queue is filled completely we must signal overrun */
1.1203 + if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
1.1204 + GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
1.1205 + filled = TRUE;
1.1206 + }
1.1207 + }
1.1208 + /* no queues were empty */
1.1209 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.1210 +
1.1211 + /* Overrun is always forwarded, since this is blocking the upstream element */
1.1212 + if (filled) {
1.1213 + GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
1.1214 + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
1.1215 + }
1.1216 +
1.1217 +beach:
1.1218 + return;
1.1219 +}
1.1220 +
1.1221 +static void
1.1222 +single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1.1223 +{
1.1224 + gboolean empty = TRUE;
1.1225 + GstMultiQueue *mq = sq->mqueue;
1.1226 + GList *tmp;
1.1227 +
1.1228 + GST_LOG_OBJECT (mq,
1.1229 + "Single Queue %d is empty, Checking other single queues", sq->id);
1.1230 +
1.1231 + GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1.1232 + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1.1233 + GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1.1234 +
1.1235 + if (gst_data_queue_is_full (sq->queue)) {
1.1236 + GstDataQueueSize size;
1.1237 +
1.1238 + gst_data_queue_get_level (sq->queue, &size);
1.1239 + if (IS_FILLED (visible, size.visible)) {
1.1240 + sq->max_size.visible++;
1.1241 + GST_DEBUG_OBJECT (mq,
1.1242 + "queue %d is filled, bumping its max visible to %d", sq->id,
1.1243 + sq->max_size.visible);
1.1244 + gst_data_queue_limits_changed (sq->queue);
1.1245 + }
1.1246 + }
1.1247 + if (!gst_data_queue_is_empty (sq->queue))
1.1248 + empty = FALSE;
1.1249 + }
1.1250 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1.1251 +
1.1252 + if (empty) {
1.1253 + GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
1.1254 + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
1.1255 + }
1.1256 +}
1.1257 +
1.1258 +static gboolean
1.1259 +single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
1.1260 + guint64 time, GstSingleQueue * sq)
1.1261 +{
1.1262 + gboolean res;
1.1263 +
1.1264 + GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
1.1265 + "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
1.1266 + sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1.1267 +
1.1268 + /* we are always filled on EOS */
1.1269 + if (sq->is_eos)
1.1270 + return TRUE;
1.1271 +
1.1272 + /* we never go past the max visible items */
1.1273 + if (IS_FILLED (visible, visible))
1.1274 + return TRUE;
1.1275 +
1.1276 + if (sq->cur_time != 0) {
1.1277 + /* if we have valid time in the queue, check */
1.1278 + res = IS_FILLED (time, sq->cur_time);
1.1279 + } else {
1.1280 + /* no valid time, check bytes */
1.1281 + res = IS_FILLED (bytes, bytes);
1.1282 + }
1.1283 + return res;
1.1284 +}
1.1285 +
1.1286 +static void
1.1287 +gst_single_queue_free (GstSingleQueue * sq)
1.1288 +{
1.1289 + /* DRAIN QUEUE */
1.1290 + gst_data_queue_flush (sq->queue);
1.1291 + g_object_unref (sq->queue);
1.1292 + g_cond_free (sq->turn);
1.1293 + g_free (sq);
1.1294 +}
1.1295 +
1.1296 +static GstSingleQueue *
1.1297 +gst_single_queue_new (GstMultiQueue * mqueue)
1.1298 +{
1.1299 + GstSingleQueue *sq;
1.1300 + gchar *tmp;
1.1301 +
1.1302 + sq = g_new0 (GstSingleQueue, 1);
1.1303 +
1.1304 + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1.1305 + sq->id = mqueue->nbqueues++;
1.1306 +
1.1307 + /* copy over max_size and extra_size so we don't need to take the lock
1.1308 + * any longer when checking if the queue is full. */
1.1309 + sq->max_size.visible = mqueue->max_size.visible;
1.1310 + sq->max_size.bytes = mqueue->max_size.bytes;
1.1311 + sq->max_size.time = mqueue->max_size.time;
1.1312 +
1.1313 + sq->extra_size.visible = mqueue->extra_size.visible;
1.1314 + sq->extra_size.bytes = mqueue->extra_size.bytes;
1.1315 + sq->extra_size.time = mqueue->extra_size.time;
1.1316 +
1.1317 + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1.1318 +
1.1319 + GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
1.1320 +
1.1321 + sq->mqueue = mqueue;
1.1322 + sq->srcresult = GST_FLOW_WRONG_STATE;
1.1323 + sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
1.1324 + single_queue_check_full, sq);
1.1325 + sq->is_eos = FALSE;
1.1326 + gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1.1327 + gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1.1328 +
1.1329 + sq->nextid = 0;
1.1330 + sq->oldid = 0;
1.1331 + sq->turn = g_cond_new ();
1.1332 +
1.1333 + /* attach to underrun/overrun signals to handle non-starvation */
1.1334 + g_signal_connect (G_OBJECT (sq->queue), "full",
1.1335 + G_CALLBACK (single_queue_overrun_cb), sq);
1.1336 + g_signal_connect (G_OBJECT (sq->queue), "empty",
1.1337 + G_CALLBACK (single_queue_underrun_cb), sq);
1.1338 +
1.1339 + tmp = g_strdup_printf ("sink%d", sq->id);
1.1340 + sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
1.1341 + g_free (tmp);
1.1342 +
1.1343 + gst_pad_set_chain_function (sq->sinkpad,
1.1344 + GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
1.1345 + gst_pad_set_activatepush_function (sq->sinkpad,
1.1346 + GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
1.1347 + gst_pad_set_event_function (sq->sinkpad,
1.1348 + GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
1.1349 + gst_pad_set_getcaps_function (sq->sinkpad,
1.1350 + GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1.1351 + gst_pad_set_bufferalloc_function (sq->sinkpad,
1.1352 + GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
1.1353 + gst_pad_set_internal_link_function (sq->sinkpad,
1.1354 + GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
1.1355 +
1.1356 + tmp = g_strdup_printf ("src%d", sq->id);
1.1357 + sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
1.1358 + g_free (tmp);
1.1359 +
1.1360 + gst_pad_set_activatepush_function (sq->srcpad,
1.1361 + GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
1.1362 + gst_pad_set_acceptcaps_function (sq->srcpad,
1.1363 + GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
1.1364 + gst_pad_set_getcaps_function (sq->srcpad,
1.1365 + GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1.1366 + gst_pad_set_event_function (sq->srcpad,
1.1367 + GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
1.1368 + gst_pad_set_query_function (sq->srcpad,
1.1369 + GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
1.1370 + gst_pad_set_internal_link_function (sq->srcpad,
1.1371 + GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
1.1372 +
1.1373 + gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
1.1374 + gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
1.1375 +
1.1376 + gst_pad_set_active (sq->srcpad, TRUE);
1.1377 + gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
1.1378 +
1.1379 + gst_pad_set_active (sq->sinkpad, TRUE);
1.1380 + gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
1.1381 +
1.1382 + GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
1.1383 + sq->id);
1.1384 +
1.1385 + return sq;
1.1386 +}
1.1387 +
1.1388 +static gboolean
1.1389 +plugin_init (GstPlugin * plugin)
1.1390 +{
1.1391 + return gst_element_register (plugin, "multiqueue", GST_RANK_NONE,
1.1392 + GST_TYPE_MULTI_QUEUE);
1.1393 +}
1.1394 +
1.1395 +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1.1396 + GST_VERSION_MINOR,
1.1397 + "multiqueue",
1.1398 + "multiqueue", plugin_init, VERSION, GST_LICENSE,
1.1399 + GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
1.1400 +