gst-gmyth/multiqueue/gstmultiqueue.c
branchtrunk
changeset 827 30368d31696e
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/gst-gmyth/multiqueue/gstmultiqueue.c	Tue Aug 28 08:16:13 2007 +0100
     1.3 @@ -0,0 +1,1397 @@
     1.4 +/* GStreamer
     1.5 + * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
     1.6 + * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
     1.7 + * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
     1.8 + *
     1.9 + * gstmultiqueue.c:
    1.10 + *
    1.11 + * This library is free software; you can redistribute it and/or
    1.12 + * modify it under the terms of the GNU Library General Public
    1.13 + * License as published by the Free Software Foundation; either
    1.14 + * version 2 of the License, or (at your option) any later version.
    1.15 + *
    1.16 + * This library is distributed in the hope that it will be useful,
    1.17 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
    1.18 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    1.19 + * Library General Public License for more details.
    1.20 + *
    1.21 + * You should have received a copy of the GNU Library General Public
    1.22 + * License along with this library; if not, write to the
    1.23 + * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
    1.24 + * Boston, MA 02111-1307, USA.
    1.25 + */
    1.26 +
    1.27 +#ifdef HAVE_CONFIG_H
    1.28 +#  include "config.h"
    1.29 +#endif
    1.30 +
    1.31 +#include <gst/gst.h>
    1.32 +#include "gstmultiqueue.h"
    1.33 +
    1.34 +/**
    1.35 + * GstSingleQueue:
    1.36 + * @sinkpad: associated sink #GstPad
    1.37 + * @srcpad: associated source #GstPad
    1.38 + *
    1.39 + * Structure containing all information and properties about
    1.40 + * a single queue.
    1.41 + */
    1.42 +typedef struct _GstSingleQueue GstSingleQueue;
    1.43 +
    1.44 +struct _GstSingleQueue
    1.45 +{
    1.46 +  /* unique identifier of the queue */
    1.47 +  guint id;
    1.48 +
    1.49 +  GstMultiQueue *mqueue;
    1.50 +
    1.51 +  GstPad *sinkpad;
    1.52 +  GstPad *srcpad;
    1.53 +
    1.54 +  /* flowreturn of previous srcpad push */
    1.55 +  GstFlowReturn srcresult;
    1.56 +  GstSegment sink_segment;
    1.57 +  GstSegment src_segment;
    1.58 +
    1.59 +  /* queue of data */
    1.60 +  GstDataQueue *queue;
    1.61 +  GstDataQueueSize max_size, extra_size;
    1.62 +  GstClockTime cur_time;
    1.63 +  gboolean is_eos;
    1.64 +  gboolean inextra;             /* TRUE if the queue is currently in extradata mode */
    1.65 +
    1.66 +  /* Protected by global lock */
    1.67 +  guint32 nextid;               /* ID of the next object waiting to be pushed */
    1.68 +  guint32 oldid;                /* ID of the last object pushed (last in a series) */
    1.69 +  GCond *turn;                  /* SingleQueue turn waiting conditional */
    1.70 +};
    1.71 +
    1.72 +
    1.73 +/* Extension of GstDataQueueItem structure for our usage */
    1.74 +typedef struct _GstMultiQueueItem GstMultiQueueItem;
    1.75 +
    1.76 +struct _GstMultiQueueItem
    1.77 +{
    1.78 +  GstMiniObject *object;
    1.79 +  guint size;
    1.80 +  guint64 duration;
    1.81 +  gboolean visible;
    1.82 +
    1.83 +  GDestroyNotify destroy;
    1.84 +  guint32 posid;
    1.85 +};
    1.86 +
    1.87 +static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
    1.88 +static void gst_single_queue_free (GstSingleQueue * squeue);
    1.89 +
    1.90 +static void wake_up_next_non_linked (GstMultiQueue * mq);
    1.91 +static void compute_high_id (GstMultiQueue * mq);
    1.92 +
    1.93 +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
    1.94 +    GST_PAD_SINK,
    1.95 +    GST_PAD_REQUEST,
    1.96 +    GST_STATIC_CAPS_ANY);
    1.97 +
    1.98 +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
    1.99 +    GST_PAD_SRC,
   1.100 +    GST_PAD_SOMETIMES,
   1.101 +    GST_STATIC_CAPS_ANY);
   1.102 +
   1.103 +GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
   1.104 +#define GST_CAT_DEFAULT (multi_queue_debug)
   1.105 +
   1.106 +/* default limits, we try to keep up to 2 seconds of data and if there is not
   1.107 + * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
   1.108 + * there is data in the queues. Normally, the byte and time limits are not hit
   1.109 + * in theses conditions. */
   1.110 +#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
   1.111 +#define DEFAULT_MAX_SIZE_BUFFERS 5
   1.112 +#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
   1.113 +
   1.114 +/* second limits. When we hit one of the above limits we are probably dealing
   1.115 + * with a badly muxed file and we scale the limits to these emergency values.
   1.116 + * This is currently not yet implemented. */
   1.117 +#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
   1.118 +#define DEFAULT_EXTRA_SIZE_BUFFERS 5
   1.119 +#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
   1.120 +
   1.121 +/* Signals and args */
   1.122 +enum
   1.123 +{
   1.124 +  SIGNAL_UNDERRUN,
   1.125 +  SIGNAL_OVERRUN,
   1.126 +  LAST_SIGNAL
   1.127 +};
   1.128 +
   1.129 +enum
   1.130 +{
   1.131 +  ARG_0,
   1.132 +  ARG_EXTRA_SIZE_BYTES,
   1.133 +  ARG_EXTRA_SIZE_BUFFERS,
   1.134 +  ARG_EXTRA_SIZE_TIME,
   1.135 +  ARG_MAX_SIZE_BYTES,
   1.136 +  ARG_MAX_SIZE_BUFFERS,
   1.137 +  ARG_MAX_SIZE_TIME,
   1.138 +};
   1.139 +
   1.140 +
   1.141 +static const GstElementDetails  gst_multiqueue_details = 
   1.142 +GST_ELEMENT_DETAILS ("MultiQueue",
   1.143 +      "Generic",
   1.144 +      "Multiple data queue",
   1.145 +      "Edward Hervey <edward@fluendo.com>");
   1.146 +
   1.147 +
   1.148 +#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
   1.149 +  g_mutex_lock (q->qlock);                                              \
   1.150 +} G_STMT_END
   1.151 +
   1.152 +#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
   1.153 +  g_mutex_unlock (q->qlock);                                            \
   1.154 +} G_STMT_END
   1.155 +
   1.156 +static void gst_multi_queue_finalize (GObject * object);
   1.157 +static void gst_multi_queue_set_property (GObject * object,
   1.158 +    guint prop_id, const GValue * value, GParamSpec * pspec);
   1.159 +static void gst_multi_queue_get_property (GObject * object,
   1.160 +    guint prop_id, GValue * value, GParamSpec * pspec);
   1.161 +
   1.162 +static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
   1.163 +    GstPadTemplate * temp, const gchar * name);
   1.164 +static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
   1.165 +
   1.166 +static void gst_multi_queue_loop (GstPad * pad);
   1.167 +
   1.168 +#define _do_init(bla) \
   1.169 +  GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
   1.170 +
   1.171 +GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
   1.172 +    GST_TYPE_ELEMENT, _do_init);
   1.173 +
   1.174 +static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
   1.175 +
   1.176 +
   1.177 +
   1.178 +static void
   1.179 +gst_multi_queue_base_init (gpointer g_class)
   1.180 +{
   1.181 +  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
   1.182 +
   1.183 +  gst_element_class_set_details (gstelement_class, &gst_multiqueue_details);
   1.184 +  gst_element_class_add_pad_template (gstelement_class,
   1.185 +      gst_static_pad_template_get (&sinktemplate));
   1.186 +  gst_element_class_add_pad_template (gstelement_class,
   1.187 +      gst_static_pad_template_get (&srctemplate));
   1.188 +}
   1.189 +
   1.190 +static void
   1.191 +gst_multi_queue_class_init (GstMultiQueueClass * klass)
   1.192 +{
   1.193 +  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   1.194 +  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
   1.195 +
   1.196 +  gobject_class->set_property =
   1.197 +      GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
   1.198 +  gobject_class->get_property =
   1.199 +      GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
   1.200 +
   1.201 +  /* SIGNALS */
   1.202 +  gst_multi_queue_signals[SIGNAL_UNDERRUN] =
   1.203 +      g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   1.204 +      G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
   1.205 +      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   1.206 +
   1.207 +  gst_multi_queue_signals[SIGNAL_OVERRUN] =
   1.208 +      g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   1.209 +      G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
   1.210 +      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   1.211 +
   1.212 +  /* PROPERTIES */
   1.213 +
   1.214 +  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   1.215 +      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   1.216 +          "Max. amount of data in the queue (bytes, 0=disable)",
   1.217 +          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
   1.218 +  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   1.219 +      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   1.220 +          "Max. number of buffers in the queue (0=disable)",
   1.221 +          0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
   1.222 +  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   1.223 +      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   1.224 +          "Max. amount of data in the queue (in ns, 0=disable)",
   1.225 +          0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
   1.226 +
   1.227 +  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
   1.228 +      g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
   1.229 +          "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
   1.230 +          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
   1.231 +  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
   1.232 +      g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
   1.233 +          "Amount of buffers the queues can grow if one of them is empty (0=disable)",
   1.234 +          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
   1.235 +  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
   1.236 +      g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
   1.237 +          "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
   1.238 +          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
   1.239 +
   1.240 +  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
   1.241 +
   1.242 +  gstelement_class->request_new_pad =
   1.243 +      GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
   1.244 +  gstelement_class->release_pad =
   1.245 +      GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
   1.246 +}
   1.247 +
   1.248 +static void
   1.249 +gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
   1.250 +{
   1.251 +  mqueue->nbqueues = 0;
   1.252 +  mqueue->queues = NULL;
   1.253 +
   1.254 +  mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
   1.255 +  mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
   1.256 +  mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
   1.257 +
   1.258 +  mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
   1.259 +  mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
   1.260 +  mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
   1.261 +
   1.262 +  mqueue->counter = 1;
   1.263 +  mqueue->highid = -1;
   1.264 +  mqueue->nextnotlinked = -1;
   1.265 +
   1.266 +  mqueue->qlock = g_mutex_new ();
   1.267 +}
   1.268 +
   1.269 +static void
   1.270 +gst_multi_queue_finalize (GObject * object)
   1.271 +{
   1.272 +  GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
   1.273 +
   1.274 +  g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
   1.275 +  g_list_free (mqueue->queues);
   1.276 +  mqueue->queues = NULL;
   1.277 +
   1.278 +  /* free/unref instance data */
   1.279 +  g_mutex_free (mqueue->qlock);
   1.280 +
   1.281 +  G_OBJECT_CLASS (parent_class)->finalize (object);
   1.282 +}
   1.283 +
   1.284 +
   1.285 +
   1.286 +#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {	        \
   1.287 +    GList * tmp = mq->queues;					\
   1.288 +    while (tmp) {						\
   1.289 +      GstSingleQueue *q = (GstSingleQueue*)tmp->data;		\
   1.290 +      q->max_size.format = mq->max_size.format;                 \
   1.291 +      tmp = g_list_next(tmp);					\
   1.292 +    };								\
   1.293 +} G_STMT_END
   1.294 +
   1.295 +static void
   1.296 +gst_multi_queue_set_property (GObject * object, guint prop_id,
   1.297 +    const GValue * value, GParamSpec * pspec)
   1.298 +{
   1.299 +  GstMultiQueue *mq = GST_MULTI_QUEUE (object);
   1.300 +
   1.301 +  switch (prop_id) {
   1.302 +    case ARG_MAX_SIZE_BYTES:
   1.303 +      mq->max_size.bytes = g_value_get_uint (value);
   1.304 +      SET_CHILD_PROPERTY (mq, bytes);
   1.305 +      break;
   1.306 +    case ARG_MAX_SIZE_BUFFERS:
   1.307 +      mq->max_size.visible = g_value_get_uint (value);
   1.308 +      SET_CHILD_PROPERTY (mq, visible);
   1.309 +      break;
   1.310 +    case ARG_MAX_SIZE_TIME:
   1.311 +      mq->max_size.time = g_value_get_uint64 (value);
   1.312 +      SET_CHILD_PROPERTY (mq, time);
   1.313 +      break;
   1.314 +    case ARG_EXTRA_SIZE_BYTES:
   1.315 +      mq->extra_size.bytes = g_value_get_uint (value);
   1.316 +      break;
   1.317 +    case ARG_EXTRA_SIZE_BUFFERS:
   1.318 +      mq->extra_size.visible = g_value_get_uint (value);
   1.319 +      break;
   1.320 +    case ARG_EXTRA_SIZE_TIME:
   1.321 +      mq->extra_size.time = g_value_get_uint64 (value);
   1.322 +      break;
   1.323 +    default:
   1.324 +      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   1.325 +      break;
   1.326 +  }
   1.327 +}
   1.328 +
   1.329 +static void
   1.330 +gst_multi_queue_get_property (GObject * object, guint prop_id,
   1.331 +    GValue * value, GParamSpec * pspec)
   1.332 +{
   1.333 +  GstMultiQueue *mq = GST_MULTI_QUEUE (object);
   1.334 +
   1.335 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.336 +
   1.337 +  switch (prop_id) {
   1.338 +    case ARG_EXTRA_SIZE_BYTES:
   1.339 +      g_value_set_uint (value, mq->extra_size.bytes);
   1.340 +      break;
   1.341 +    case ARG_EXTRA_SIZE_BUFFERS:
   1.342 +      g_value_set_uint (value, mq->extra_size.visible);
   1.343 +      break;
   1.344 +    case ARG_EXTRA_SIZE_TIME:
   1.345 +      g_value_set_uint64 (value, mq->extra_size.time);
   1.346 +      break;
   1.347 +    case ARG_MAX_SIZE_BYTES:
   1.348 +      g_value_set_uint (value, mq->max_size.bytes);
   1.349 +      break;
   1.350 +    case ARG_MAX_SIZE_BUFFERS:
   1.351 +      g_value_set_uint (value, mq->max_size.visible);
   1.352 +      break;
   1.353 +    case ARG_MAX_SIZE_TIME:
   1.354 +      g_value_set_uint64 (value, mq->max_size.time);
   1.355 +      break;
   1.356 +    default:
   1.357 +      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   1.358 +      break;
   1.359 +  }
   1.360 +
   1.361 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.362 +}
   1.363 +
   1.364 +GList *
   1.365 +gst_multi_queue_get_internal_links (GstPad * pad)
   1.366 +{
   1.367 +  GList *res = NULL;
   1.368 +  GstMultiQueue *mqueue;
   1.369 +  GstSingleQueue *sq = NULL;
   1.370 +  GList *tmp;
   1.371 +
   1.372 +  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
   1.373 +
   1.374 +  mqueue = GST_MULTI_QUEUE (GST_PAD_PARENT (pad));
   1.375 +  if (!mqueue)
   1.376 +    goto no_parent;
   1.377 +
   1.378 +  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
   1.379 +  /* Find which single queue it belongs to */
   1.380 +  for (tmp = mqueue->queues; tmp && !res; tmp = g_list_next (tmp)) {
   1.381 +    sq = (GstSingleQueue *) tmp->data;
   1.382 +
   1.383 +    if (sq->sinkpad == pad)
   1.384 +      res = g_list_prepend (res, sq->srcpad);
   1.385 +    if (sq->srcpad == pad)
   1.386 +      res = g_list_prepend (res, sq->sinkpad);
   1.387 +  }
   1.388 +
   1.389 +  if (!res)
   1.390 +    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
   1.391 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
   1.392 +
   1.393 +  return res;
   1.394 +
   1.395 +no_parent:
   1.396 +  {
   1.397 +    GST_DEBUG_OBJECT (pad, "no parent");
   1.398 +    return NULL;
   1.399 +  }
   1.400 +}
   1.401 +
   1.402 +
   1.403 +/*
   1.404 + * GstElement methods
   1.405 + */
   1.406 +
   1.407 +static GstPad *
   1.408 +gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
   1.409 +    const gchar * name)
   1.410 +{
   1.411 +  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
   1.412 +  GstSingleQueue *squeue;
   1.413 +
   1.414 +  GST_LOG_OBJECT (element, "name : %s", name);
   1.415 +
   1.416 +  /* Create a new single queue, add the sink and source pad and return the sink pad */
   1.417 +  squeue = gst_single_queue_new (mqueue);
   1.418 +
   1.419 +  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
   1.420 +  mqueue->queues = g_list_append (mqueue->queues, squeue);
   1.421 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
   1.422 +
   1.423 +  /*
   1.424 +  GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
   1.425 +      GST_DEBUG_PAD_NAME (squeue->sinkpad));
   1.426 +    */
   1.427 +  return squeue->sinkpad;
   1.428 +}
   1.429 +
   1.430 +static void
   1.431 +gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
   1.432 +{
   1.433 +  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
   1.434 +  GstSingleQueue *sq = NULL;
   1.435 +  GList *tmp;
   1.436 +
   1.437 +//  GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
   1.438 +
   1.439 +  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
   1.440 +  /* Find which single queue it belongs to, knowing that it should be a sinkpad */
   1.441 +  for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
   1.442 +    sq = (GstSingleQueue *) tmp->data;
   1.443 +
   1.444 +    if (sq->sinkpad == pad)
   1.445 +      break;
   1.446 +  }
   1.447 +
   1.448 +  if (!tmp) {
   1.449 +    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
   1.450 +    GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
   1.451 +    return;
   1.452 +  }
   1.453 +
   1.454 +  /* FIXME: The removal of the singlequeue should probably not happen until it
   1.455 +   * finishes draining */
   1.456 +
   1.457 +  /* remove it from the list */
   1.458 +  mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
   1.459 +
   1.460 +  /* FIXME : recompute next-non-linked */
   1.461 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
   1.462 +
   1.463 +  /* delete SingleQueue */
   1.464 +  gst_data_queue_set_flushing (sq->queue, TRUE);
   1.465 +
   1.466 +  gst_pad_set_active (sq->srcpad, FALSE);
   1.467 +  gst_pad_set_active (sq->sinkpad, FALSE);
   1.468 +  gst_element_remove_pad (element, sq->srcpad);
   1.469 +  gst_element_remove_pad (element, sq->sinkpad);
   1.470 +  gst_single_queue_free (sq);
   1.471 +}
   1.472 +
   1.473 +static gboolean
   1.474 +gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
   1.475 +{
   1.476 +  gboolean result;
   1.477 +
   1.478 +  GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
   1.479 +      sq->id);
   1.480 +
   1.481 +  if (flush) {
   1.482 +    sq->srcresult = GST_FLOW_WRONG_STATE;
   1.483 +    gst_data_queue_set_flushing (sq->queue, TRUE);
   1.484 +
   1.485 +    /* wake up non-linked task */
   1.486 +    GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
   1.487 +        sq->id);
   1.488 +    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.489 +    g_cond_signal (sq->turn);
   1.490 +    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.491 +
   1.492 +    GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
   1.493 +    result = gst_pad_pause_task (sq->srcpad);
   1.494 +  } else {
   1.495 +    gst_data_queue_flush (sq->queue);
   1.496 +    gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
   1.497 +    gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
   1.498 +    /* All pads start off not-linked for a smooth kick-off */
   1.499 +    sq->srcresult = GST_FLOW_NOT_LINKED;
   1.500 +    sq->cur_time = 0;
   1.501 +    sq->max_size.visible = mq->max_size.visible;
   1.502 +    sq->is_eos = FALSE;
   1.503 +    sq->inextra = FALSE;
   1.504 +    sq->nextid = 0;
   1.505 +    sq->oldid = 0;
   1.506 +    gst_data_queue_set_flushing (sq->queue, FALSE);
   1.507 +
   1.508 +    GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
   1.509 +    result =
   1.510 +        gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
   1.511 +        sq->srcpad);
   1.512 +  }
   1.513 +  return result;
   1.514 +}
   1.515 +
   1.516 +/* calculate the diff between running time on the sink and src of the queue.
   1.517 + * This is the total amount of time in the queue. 
   1.518 + * WITH LOCK TAKEN */
   1.519 +static void
   1.520 +update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
   1.521 +{
   1.522 +  gint64 sink_time, src_time;
   1.523 +
   1.524 +  sink_time =
   1.525 +      gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
   1.526 +      sq->sink_segment.last_stop);
   1.527 +
   1.528 +  src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
   1.529 +      sq->src_segment.last_stop);
   1.530 +
   1.531 +  GST_DEBUG_OBJECT (mq,
   1.532 +      "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
   1.533 +      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
   1.534 +
   1.535 +  /* This allows for streams with out of order timestamping - sometimes the 
   1.536 +   * emerging timestamp is later than the arriving one(s) */
   1.537 +  if (sink_time >= src_time)
   1.538 +    sq->cur_time = sink_time - src_time;
   1.539 +  else
   1.540 +    sq->cur_time = 0;
   1.541 +}
   1.542 +
   1.543 +/* take a NEWSEGMENT event and apply the values to segment, updating the time
   1.544 + * level of queue. */
   1.545 +static void
   1.546 +apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
   1.547 +    GstSegment * segment)
   1.548 +{
   1.549 +  gboolean update;
   1.550 +  GstFormat format;
   1.551 +  gdouble rate, arate;
   1.552 +  gint64 start, stop, time;
   1.553 +
   1.554 +  gst_event_parse_new_segment_full (event, &update, &rate, &arate,
   1.555 +      &format, &start, &stop, &time);
   1.556 +
   1.557 +  /* now configure the values, we use these to track timestamps on the
   1.558 +   * sinkpad. */
   1.559 +  if (format != GST_FORMAT_TIME) {
   1.560 +    /* non-time format, pretent the current time segment is closed with a
   1.561 +     * 0 start and unknown stop time. */
   1.562 +    update = FALSE;
   1.563 +    format = GST_FORMAT_TIME;
   1.564 +    start = 0;
   1.565 +    stop = -1;
   1.566 +    time = 0;
   1.567 +  }
   1.568 +
   1.569 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.570 +
   1.571 +  gst_segment_set_newsegment_full (segment, update,
   1.572 +      rate, arate, format, start, stop, time);
   1.573 +
   1.574 +  GST_DEBUG_OBJECT (mq,
   1.575 +      "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
   1.576 +
   1.577 +  /* segment can update the time level of the queue */
   1.578 +  update_time_level (mq, sq);
   1.579 +
   1.580 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.581 +}
   1.582 +
   1.583 +/* take a buffer and update segment, updating the time level of the queue. */
   1.584 +static void
   1.585 +apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
   1.586 +    GstClockTime duration, GstSegment * segment)
   1.587 +{
   1.588 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.589 +
   1.590 +  /* if no timestamp is set, assume it's continuous with the previous 
   1.591 +   * time */
   1.592 +  if (timestamp == GST_CLOCK_TIME_NONE)
   1.593 +    timestamp = segment->last_stop;
   1.594 +
   1.595 +  /* add duration */
   1.596 +  if (duration != GST_CLOCK_TIME_NONE)
   1.597 +    timestamp += duration;
   1.598 +
   1.599 +  GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
   1.600 +      sq->id, GST_TIME_ARGS (timestamp));
   1.601 +
   1.602 +  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
   1.603 +
   1.604 +  /* calc diff with other end */
   1.605 +  update_time_level (mq, sq);
   1.606 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.607 +}
   1.608 +
   1.609 +static GstFlowReturn
   1.610 +gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
   1.611 +    GstMiniObject * object)
   1.612 +{
   1.613 +  GstFlowReturn result = GST_FLOW_OK;
   1.614 +
   1.615 +  if (GST_IS_BUFFER (object)) {
   1.616 +    GstBuffer *buffer;
   1.617 +    GstClockTime timestamp, duration;
   1.618 +
   1.619 +    buffer = GST_BUFFER_CAST (object);
   1.620 +    timestamp = GST_BUFFER_TIMESTAMP (buffer);
   1.621 +    duration = GST_BUFFER_DURATION (buffer);
   1.622 +
   1.623 +    apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
   1.624 +
   1.625 +    /* Applying the buffer may have made the queue non-full again, unblock it if needed */
   1.626 +    gst_data_queue_limits_changed (sq->queue);
   1.627 +
   1.628 +    GST_DEBUG_OBJECT (mq,
   1.629 +        "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
   1.630 +        sq->id, buffer, GST_TIME_ARGS (timestamp));
   1.631 +
   1.632 +    result = gst_pad_push (sq->srcpad, buffer);
   1.633 +  } else if (GST_IS_EVENT (object)) {
   1.634 +    GstEvent *event;
   1.635 +
   1.636 +    event = GST_EVENT_CAST (object);
   1.637 +
   1.638 +    switch (GST_EVENT_TYPE (event)) {
   1.639 +      case GST_EVENT_EOS:
   1.640 +        result = GST_FLOW_UNEXPECTED;
   1.641 +        break;
   1.642 +      case GST_EVENT_NEWSEGMENT:
   1.643 +        apply_segment (mq, sq, event, &sq->src_segment);
   1.644 +        /* Applying the segment may have made the queue non-full again, unblock it if needed */
   1.645 +        gst_data_queue_limits_changed (sq->queue);
   1.646 +        break;
   1.647 +      default:
   1.648 +        break;
   1.649 +    }
   1.650 +
   1.651 +    GST_DEBUG_OBJECT (mq,
   1.652 +        "SingleQueue %d : Pushing event %p of type %s",
   1.653 +        sq->id, event, GST_EVENT_TYPE_NAME (event));
   1.654 +
   1.655 +    gst_pad_push_event (sq->srcpad, event);
   1.656 +  } else {
   1.657 +    g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
   1.658 +        sq->id);
   1.659 +  }
   1.660 +  return result;
   1.661 +
   1.662 +  /* ERRORS */
   1.663 +}
   1.664 +
   1.665 +static GstMiniObject *
   1.666 +gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
   1.667 +{
   1.668 +  GstMiniObject *res;
   1.669 +
   1.670 +  res = item->object;
   1.671 +  item->object = NULL;
   1.672 +
   1.673 +  return res;
   1.674 +}
   1.675 +
   1.676 +static void
   1.677 +gst_multi_queue_item_destroy (GstMultiQueueItem * item)
   1.678 +{
   1.679 +  if (item->object)
   1.680 +    gst_mini_object_unref (item->object);
   1.681 +  g_free (item);
   1.682 +}
   1.683 +
   1.684 +/* takes ownership of passed mini object! */
   1.685 +static GstMultiQueueItem *
   1.686 +gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
   1.687 +{
   1.688 +  GstMultiQueueItem *item;
   1.689 +
   1.690 +  item = g_new (GstMultiQueueItem, 1);
   1.691 +  item->object = object;
   1.692 +  item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
   1.693 +  item->posid = curid;
   1.694 +
   1.695 +  if (GST_IS_BUFFER (object)) {
   1.696 +    item->size = GST_BUFFER_SIZE (object);
   1.697 +    item->duration = GST_BUFFER_DURATION (object);
   1.698 +    if (item->duration == GST_CLOCK_TIME_NONE)
   1.699 +      item->duration = 0;
   1.700 +    item->visible = TRUE;
   1.701 +  } else {
   1.702 +    item->size = 0;
   1.703 +    item->duration = 0;
   1.704 +    item->visible = FALSE;
   1.705 +  }
   1.706 +  return item;
   1.707 +}
   1.708 +
   1.709 +/* Each main loop attempts to push buffers until the return value
   1.710 + * is not-linked. not-linked pads are not allowed to push data beyond
   1.711 + * any linked pads, so they don't 'rush ahead of the pack'.
   1.712 + */
   1.713 +static void
   1.714 +gst_multi_queue_loop (GstPad * pad)
   1.715 +{
   1.716 +  GstSingleQueue *sq;
   1.717 +  GstMultiQueueItem *item;
   1.718 +  GstDataQueueItem *sitem;
   1.719 +  GstMultiQueue *mq;
   1.720 +  GstMiniObject *object;
   1.721 +  guint32 newid;
   1.722 +  guint32 oldid = -1;
   1.723 +  GstFlowReturn result;
   1.724 +
   1.725 +  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
   1.726 +  mq = sq->mqueue;
   1.727 +
   1.728 +  do {
   1.729 +    GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
   1.730 +
   1.731 +    /* Get something from the queue, blocking until that happens, or we get
   1.732 +     * flushed */
   1.733 +    if (!(gst_data_queue_pop (sq->queue, &sitem)))
   1.734 +      goto out_flushing;
   1.735 +
   1.736 +    item = (GstMultiQueueItem *) sitem;
   1.737 +    newid = item->posid;
   1.738 +
   1.739 +    /* steal the object and destroy the item */
   1.740 +    object = gst_multi_queue_item_steal_object (item);
   1.741 +    gst_multi_queue_item_destroy (item);
   1.742 +
   1.743 +    GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
   1.744 +        sq->id, newid, oldid);
   1.745 +
   1.746 +    /* If we're not-linked, we do some extra work because we might need to
   1.747 +     * wait before pushing. If we're linked but there's a gap in the IDs,
   1.748 +     * or it's the first loop, or we just passed the previous highid, 
   1.749 +     * we might need to wake some sleeping pad up, so there's extra work 
   1.750 +     * there too */
   1.751 +    if (sq->srcresult == GST_FLOW_NOT_LINKED ||
   1.752 +        (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) {
   1.753 +      GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
   1.754 +          gst_flow_get_name (sq->srcresult));
   1.755 +
   1.756 +      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.757 +
   1.758 +      /* Update the nextid so other threads know when to wake us up */
   1.759 +      sq->nextid = newid;
   1.760 +
   1.761 +      /* Update the oldid (the last ID we output) for highid tracking */
   1.762 +      if (oldid != -1)
   1.763 +        sq->oldid = oldid;
   1.764 +
   1.765 +      if (sq->srcresult == GST_FLOW_NOT_LINKED) {
   1.766 +        /* Go to sleep until it's time to push this buffer */
   1.767 +
   1.768 +        /* Recompute the highid */
   1.769 +        compute_high_id (mq);
   1.770 +        while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
   1.771 +          GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
   1.772 +              "newid %u and highid %u", sq->id, newid, mq->highid);
   1.773 +
   1.774 +
   1.775 +          /* Wake up all non-linked pads before we sleep */
   1.776 +          wake_up_next_non_linked (mq);
   1.777 +
   1.778 +          mq->numwaiting++;
   1.779 +          g_cond_wait (sq->turn, mq->qlock);
   1.780 +          mq->numwaiting--;
   1.781 +
   1.782 +          GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
   1.783 +              "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
   1.784 +        }
   1.785 +
   1.786 +        /* Re-compute the high_id in case someone else pushed */
   1.787 +        compute_high_id (mq);
   1.788 +      } else {
   1.789 +        compute_high_id (mq);
   1.790 +        /* Wake up all non-linked pads */
   1.791 +        wake_up_next_non_linked (mq);
   1.792 +      }
   1.793 +      /* We're done waiting, we can clear the nextid */
   1.794 +      sq->nextid = 0;
   1.795 +
   1.796 +      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.797 +    }
   1.798 +
   1.799 +    GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
   1.800 +        gst_flow_get_name (sq->srcresult));
   1.801 +
   1.802 +    /* Try to push out the new object */
   1.803 +    result = gst_single_queue_push_one (mq, sq, object);
   1.804 +    sq->srcresult = result;
   1.805 +
   1.806 +    if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
   1.807 +      goto out_flushing;
   1.808 +
   1.809 +    GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
   1.810 +        gst_flow_get_name (sq->srcresult));
   1.811 +
   1.812 +    oldid = newid;
   1.813 +  }
   1.814 +  while (TRUE);
   1.815 +
   1.816 +out_flushing:
   1.817 +  {
   1.818 +    /* Need to make sure wake up any sleeping pads when we exit */
   1.819 +    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.820 +    compute_high_id (mq);
   1.821 +    wake_up_next_non_linked (mq);
   1.822 +    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.823 +
   1.824 +    gst_data_queue_set_flushing (sq->queue, TRUE);
   1.825 +    gst_pad_pause_task (sq->srcpad);
   1.826 +    GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
   1.827 +        "SingleQueue[%d] task paused, reason:%s",
   1.828 +        sq->id, gst_flow_get_name (sq->srcresult));
   1.829 +    return;
   1.830 +  }
   1.831 +}
   1.832 +
   1.833 +/**
   1.834 + * gst_multi_queue_chain:
   1.835 + *
   1.836 + * This is similar to GstQueue's chain function, except:
   1.837 + * _ we don't have leak behavioures,
   1.838 + * _ we push with a unique id (curid)
   1.839 + */
   1.840 +static GstFlowReturn
   1.841 +gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
   1.842 +{
   1.843 +  GstSingleQueue *sq;
   1.844 +  GstMultiQueue *mq;
   1.845 +  GstMultiQueueItem *item;
   1.846 +  GstFlowReturn ret = GST_FLOW_OK;
   1.847 +  guint32 curid;
   1.848 +  GstClockTime timestamp, duration;
   1.849 +
   1.850 +  sq = gst_pad_get_element_private (pad);
   1.851 +  mq = (GstMultiQueue *) gst_pad_get_parent (pad);
   1.852 +
   1.853 +  /* Get a unique incrementing id */
   1.854 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.855 +  curid = mq->counter++;
   1.856 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.857 +
   1.858 +  GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
   1.859 +      sq->id, buffer, curid);
   1.860 +
   1.861 +  item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
   1.862 +
   1.863 +  timestamp = GST_BUFFER_TIMESTAMP (buffer);
   1.864 +  duration = GST_BUFFER_DURATION (buffer);
   1.865 +
   1.866 +  if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
   1.867 +    goto flushing;
   1.868 +
   1.869 +  /* update time level, we must do this after pushing the data in the queue so
   1.870 +   * that we never end up filling the queue first. */
   1.871 +  apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
   1.872 +
   1.873 +done:
   1.874 +  gst_object_unref (mq);
   1.875 +
   1.876 +  return ret;
   1.877 +
   1.878 +  /* ERRORS */
   1.879 +flushing:
   1.880 +  {
   1.881 +    ret = sq->srcresult;
   1.882 +    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
   1.883 +        sq->id, gst_flow_get_name (ret));
   1.884 +    gst_multi_queue_item_destroy (item);
   1.885 +    goto done;
   1.886 +  }
   1.887 +}
   1.888 +
   1.889 +static gboolean
   1.890 +gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
   1.891 +{
   1.892 +  GstSingleQueue *sq;
   1.893 +
   1.894 +  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
   1.895 +
   1.896 +  if (active) {
   1.897 +    /* All pads start off not-linked for a smooth kick-off */
   1.898 +    sq->srcresult = GST_FLOW_NOT_LINKED;
   1.899 +  } else {
   1.900 +    sq->srcresult = GST_FLOW_WRONG_STATE;
   1.901 +    gst_data_queue_flush (sq->queue);
   1.902 +  }
   1.903 +  return TRUE;
   1.904 +}
   1.905 +
   1.906 +static gboolean
   1.907 +gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
   1.908 +{
   1.909 +  GstSingleQueue *sq;
   1.910 +  GstMultiQueue *mq;
   1.911 +  guint32 curid;
   1.912 +  GstMultiQueueItem *item;
   1.913 +  gboolean res;
   1.914 +  GstEventType type;
   1.915 +  GstEvent *sref = NULL;
   1.916 +
   1.917 +  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
   1.918 +  mq = (GstMultiQueue *) gst_pad_get_parent (pad);
   1.919 +
   1.920 +  type = GST_EVENT_TYPE (event);
   1.921 +
   1.922 +  switch (type) {
   1.923 +    case GST_EVENT_FLUSH_START:
   1.924 +      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
   1.925 +          sq->id);
   1.926 +
   1.927 +      res = gst_pad_push_event (sq->srcpad, event);
   1.928 +
   1.929 +      gst_single_queue_flush (mq, sq, TRUE);
   1.930 +      goto done;
   1.931 +
   1.932 +    case GST_EVENT_FLUSH_STOP:
   1.933 +      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
   1.934 +          sq->id);
   1.935 +
   1.936 +      res = gst_pad_push_event (sq->srcpad, event);
   1.937 +
   1.938 +      gst_single_queue_flush (mq, sq, FALSE);
   1.939 +      goto done;
   1.940 +    case GST_EVENT_NEWSEGMENT:
   1.941 +      /* take ref because the queue will take ownership and we need the event
   1.942 +       * afterwards to update the segment */
   1.943 +      sref = gst_event_ref (event);
   1.944 +      break;
   1.945 +
   1.946 +    default:
   1.947 +      if (!(GST_EVENT_IS_SERIALIZED (event))) {
   1.948 +        res = gst_pad_push_event (sq->srcpad, event);
   1.949 +        goto done;
   1.950 +      }
   1.951 +      break;
   1.952 +  }
   1.953 +
   1.954 +  /* Get an unique incrementing id */
   1.955 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   1.956 +  curid = mq->counter++;
   1.957 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   1.958 +
   1.959 +  item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
   1.960 +
   1.961 +  GST_DEBUG_OBJECT (mq,
   1.962 +      "SingleQueue %d : Enqueuing event %p of type %s with id %d",
   1.963 +      sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
   1.964 +
   1.965 +  if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
   1.966 +    goto flushing;
   1.967 +
   1.968 +  /* mark EOS when we received one, we must do that after putting the
   1.969 +   * buffer in the queue because EOS marks the buffer as filled. No need to take
   1.970 +   * a lock, the _check_full happens from this thread only, right before pushing
   1.971 +   * into dataqueue. */
   1.972 +  switch (type) {
   1.973 +    case GST_EVENT_EOS:
   1.974 +      sq->is_eos = TRUE;
   1.975 +      break;
   1.976 +    case GST_EVENT_NEWSEGMENT:
   1.977 +      apply_segment (mq, sq, sref, &sq->sink_segment);
   1.978 +      gst_event_unref (sref);
   1.979 +      break;
   1.980 +    default:
   1.981 +      break;
   1.982 +  }
   1.983 +done:
   1.984 +  gst_object_unref (mq);
   1.985 +  return res;
   1.986 +
   1.987 +flushing:
   1.988 +  {
   1.989 +    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
   1.990 +        sq->id, gst_flow_get_name (sq->srcresult));
   1.991 +    if (sref)
   1.992 +      gst_event_unref (sref);
   1.993 +    gst_multi_queue_item_destroy (item);
   1.994 +    goto done;
   1.995 +  }
   1.996 +}
   1.997 +
   1.998 +static GstCaps *
   1.999 +gst_multi_queue_getcaps (GstPad * pad)
  1.1000 +{
  1.1001 +  GstSingleQueue *sq = gst_pad_get_element_private (pad);
  1.1002 +  GstPad *otherpad;
  1.1003 +  GstCaps *result;
  1.1004 +
  1.1005 +  otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
  1.1006 +
  1.1007 +  GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
  1.1008 +
  1.1009 +  result = gst_pad_peer_get_caps (otherpad);
  1.1010 +  if (result == NULL)
  1.1011 +    result = gst_caps_new_any ();
  1.1012 +
  1.1013 +  return result;
  1.1014 +}
  1.1015 +
  1.1016 +static GstFlowReturn
  1.1017 +gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
  1.1018 +    GstCaps * caps, GstBuffer ** buf)
  1.1019 +{
  1.1020 +  GstSingleQueue *sq = gst_pad_get_element_private (pad);
  1.1021 +
  1.1022 +  return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
  1.1023 +}
  1.1024 +
  1.1025 +static gboolean
  1.1026 +gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
  1.1027 +{
  1.1028 +  GstMultiQueue *mq;
  1.1029 +  GstSingleQueue *sq;
  1.1030 +  gboolean result = FALSE;
  1.1031 +
  1.1032 +  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
  1.1033 +  mq = sq->mqueue;
  1.1034 +
  1.1035 +  GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
  1.1036 +
  1.1037 +  if (active) {
  1.1038 +    result = gst_single_queue_flush (mq, sq, FALSE);
  1.1039 +  } else {
  1.1040 +    result = gst_single_queue_flush (mq, sq, TRUE);
  1.1041 +    /* make sure streaming finishes */
  1.1042 +    result |= gst_pad_stop_task (pad);
  1.1043 +  }
  1.1044 +  return result;
  1.1045 +}
  1.1046 +
  1.1047 +static gboolean
  1.1048 +gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
  1.1049 +{
  1.1050 +  return TRUE;
  1.1051 +}
  1.1052 +
  1.1053 +static gboolean
  1.1054 +gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
  1.1055 +{
  1.1056 +  GstSingleQueue *sq = gst_pad_get_element_private (pad);
  1.1057 +
  1.1058 +  return gst_pad_push_event (sq->sinkpad, event);
  1.1059 +}
  1.1060 +
  1.1061 +static gboolean
  1.1062 +gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
  1.1063 +{
  1.1064 +  GstSingleQueue *sq = gst_pad_get_element_private (pad);
  1.1065 +  GstPad *peerpad;
  1.1066 +  gboolean res;
  1.1067 +
  1.1068 +  /* FIXME, Handle position offset depending on queue size */
  1.1069 +
  1.1070 +  /* default handling */
  1.1071 +  if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
  1.1072 +    goto no_peer;
  1.1073 +
  1.1074 +  res = gst_pad_query (peerpad, query);
  1.1075 +
  1.1076 +  gst_object_unref (peerpad);
  1.1077 +
  1.1078 +  return res;
  1.1079 +
  1.1080 +  /* ERRORS */
  1.1081 +no_peer:
  1.1082 +  {
  1.1083 +    GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
  1.1084 +    return FALSE;
  1.1085 +  }
  1.1086 +}
  1.1087 +
  1.1088 +/*
  1.1089 + * Next-non-linked functions
  1.1090 + */
  1.1091 +
  1.1092 +/* WITH LOCK TAKEN */
  1.1093 +static void
  1.1094 +wake_up_next_non_linked (GstMultiQueue * mq)
  1.1095 +{
  1.1096 +  GList *tmp;
  1.1097 +
  1.1098 +  /* maybe no-one is waiting */
  1.1099 +  if (mq->numwaiting < 1)
  1.1100 +    return;
  1.1101 +
  1.1102 +  /* Else figure out which singlequeue(s) need waking up */
  1.1103 +  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
  1.1104 +    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
  1.1105 +
  1.1106 +    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
  1.1107 +      if (sq->nextid != 0 && sq->nextid <= mq->highid) {
  1.1108 +        GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
  1.1109 +        g_cond_signal (sq->turn);
  1.1110 +      }
  1.1111 +    }
  1.1112 +  }
  1.1113 +}
  1.1114 +
  1.1115 +/* WITH LOCK TAKEN */
  1.1116 +static void
  1.1117 +compute_high_id (GstMultiQueue * mq)
  1.1118 +{
  1.1119 +  /* The high-id is either the highest id among the linked pads, or if all
  1.1120 +   * pads are not-linked, it's the lowest not-linked pad */
  1.1121 +  GList *tmp;
  1.1122 +  guint32 lowest = G_MAXUINT32;
  1.1123 +  guint32 highid = G_MAXUINT32;
  1.1124 +
  1.1125 +  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
  1.1126 +    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
  1.1127 +
  1.1128 +    GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
  1.1129 +        sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
  1.1130 +
  1.1131 +    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
  1.1132 +      /* No need to consider queues which are not waiting */
  1.1133 +      if (sq->nextid == 0) {
  1.1134 +        GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
  1.1135 +        continue;
  1.1136 +      }
  1.1137 +
  1.1138 +      if (sq->nextid < lowest)
  1.1139 +        lowest = sq->nextid;
  1.1140 +    } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
  1.1141 +      /* If we don't have a global highid, or the global highid is lower than
  1.1142 +       * this single queue's last outputted id, store the queue's one, 
  1.1143 +       * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
  1.1144 +      if ((highid == G_MAXUINT32) || (sq->oldid > highid))
  1.1145 +        highid = sq->oldid;
  1.1146 +    }
  1.1147 +  }
  1.1148 +
  1.1149 +  if (highid == G_MAXUINT32 || lowest < highid)
  1.1150 +    mq->highid = lowest;
  1.1151 +  else
  1.1152 +    mq->highid = highid;
  1.1153 +
  1.1154 +  GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
  1.1155 +      lowest);
  1.1156 +}
  1.1157 +
  1.1158 +#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
  1.1159 +     (sq->max_size.format) <= (value))
  1.1160 +
  1.1161 +/*
  1.1162 + * GstSingleQueue functions
  1.1163 + */
  1.1164 +static void
  1.1165 +single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
  1.1166 +{
  1.1167 +  GstMultiQueue *mq = sq->mqueue;
  1.1168 +  GList *tmp;
  1.1169 +  GstDataQueueSize size;
  1.1170 +  gboolean filled = FALSE;
  1.1171 +
  1.1172 +  gst_data_queue_get_level (sq->queue, &size);
  1.1173 +
  1.1174 +  GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
  1.1175 +
  1.1176 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  1.1177 +  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
  1.1178 +    GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
  1.1179 +    GstDataQueueSize ssize;
  1.1180 +
  1.1181 +    GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
  1.1182 +
  1.1183 +    if (gst_data_queue_is_empty (ssq->queue)) {
  1.1184 +      GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
  1.1185 +      if (IS_FILLED (visible, size.visible)) {
  1.1186 +        sq->max_size.visible++;
  1.1187 +        GST_DEBUG_OBJECT (mq,
  1.1188 +            "Another queue is empty, bumping single queue %d max visible to %d",
  1.1189 +            sq->id, sq->max_size.visible);
  1.1190 +      }
  1.1191 +      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1.1192 +      goto beach;
  1.1193 +    }
  1.1194 +    /* check if we reached the hard time/bytes limits */
  1.1195 +    gst_data_queue_get_level (ssq->queue, &ssize);
  1.1196 +
  1.1197 +    GST_DEBUG_OBJECT (mq,
  1.1198 +        "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
  1.1199 +        G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
  1.1200 +        ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
  1.1201 +
  1.1202 +    /* if this queue is filled completely we must signal overrun */
  1.1203 +    if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
  1.1204 +      GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
  1.1205 +      filled = TRUE;
  1.1206 +    }
  1.1207 +  }
  1.1208 +  /* no queues were empty */
  1.1209 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1.1210 +
  1.1211 +  /* Overrun is always forwarded, since this is blocking the upstream element */
  1.1212 +  if (filled) {
  1.1213 +    GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
  1.1214 +    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
  1.1215 +  }
  1.1216 +
  1.1217 +beach:
  1.1218 +  return;
  1.1219 +}
  1.1220 +
  1.1221 +static void
  1.1222 +single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
  1.1223 +{
  1.1224 +  gboolean empty = TRUE;
  1.1225 +  GstMultiQueue *mq = sq->mqueue;
  1.1226 +  GList *tmp;
  1.1227 +
  1.1228 +  GST_LOG_OBJECT (mq,
  1.1229 +      "Single Queue %d is empty, Checking other single queues", sq->id);
  1.1230 +
  1.1231 +  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  1.1232 +  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
  1.1233 +    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
  1.1234 +
  1.1235 +    if (gst_data_queue_is_full (sq->queue)) {
  1.1236 +      GstDataQueueSize size;
  1.1237 +
  1.1238 +      gst_data_queue_get_level (sq->queue, &size);
  1.1239 +      if (IS_FILLED (visible, size.visible)) {
  1.1240 +        sq->max_size.visible++;
  1.1241 +        GST_DEBUG_OBJECT (mq,
  1.1242 +            "queue %d is filled, bumping its max visible to %d", sq->id,
  1.1243 +            sq->max_size.visible);
  1.1244 +        gst_data_queue_limits_changed (sq->queue);
  1.1245 +      }
  1.1246 +    }
  1.1247 +    if (!gst_data_queue_is_empty (sq->queue))
  1.1248 +      empty = FALSE;
  1.1249 +  }
  1.1250 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1.1251 +
  1.1252 +  if (empty) {
  1.1253 +    GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
  1.1254 +    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
  1.1255 +  }
  1.1256 +}
  1.1257 +
  1.1258 +static gboolean
  1.1259 +single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
  1.1260 +    guint64 time, GstSingleQueue * sq)
  1.1261 +{
  1.1262 +  gboolean res;
  1.1263 +
  1.1264 +  GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
  1.1265 +      "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
  1.1266 +      sq->max_size.bytes, sq->cur_time, sq->max_size.time);
  1.1267 +
  1.1268 +  /* we are always filled on EOS */
  1.1269 +  if (sq->is_eos)
  1.1270 +    return TRUE;
  1.1271 +
  1.1272 +  /* we never go past the max visible items */
  1.1273 +  if (IS_FILLED (visible, visible))
  1.1274 +    return TRUE;
  1.1275 +
  1.1276 +  if (sq->cur_time != 0) {
  1.1277 +    /* if we have valid time in the queue, check */
  1.1278 +    res = IS_FILLED (time, sq->cur_time);
  1.1279 +  } else {
  1.1280 +    /* no valid time, check bytes */
  1.1281 +    res = IS_FILLED (bytes, bytes);
  1.1282 +  }
  1.1283 +  return res;
  1.1284 +}
  1.1285 +
  1.1286 +static void
  1.1287 +gst_single_queue_free (GstSingleQueue * sq)
  1.1288 +{
  1.1289 +  /* DRAIN QUEUE */
  1.1290 +  gst_data_queue_flush (sq->queue);
  1.1291 +  g_object_unref (sq->queue);
  1.1292 +  g_cond_free (sq->turn);
  1.1293 +  g_free (sq);
  1.1294 +}
  1.1295 +
  1.1296 +static GstSingleQueue *
  1.1297 +gst_single_queue_new (GstMultiQueue * mqueue)
  1.1298 +{
  1.1299 +  GstSingleQueue *sq;
  1.1300 +  gchar *tmp;
  1.1301 +
  1.1302 +  sq = g_new0 (GstSingleQueue, 1);
  1.1303 +
  1.1304 +  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  1.1305 +  sq->id = mqueue->nbqueues++;
  1.1306 +
  1.1307 +  /* copy over max_size and extra_size so we don't need to take the lock
  1.1308 +   * any longer when checking if the queue is full. */
  1.1309 +  sq->max_size.visible = mqueue->max_size.visible;
  1.1310 +  sq->max_size.bytes = mqueue->max_size.bytes;
  1.1311 +  sq->max_size.time = mqueue->max_size.time;
  1.1312 +
  1.1313 +  sq->extra_size.visible = mqueue->extra_size.visible;
  1.1314 +  sq->extra_size.bytes = mqueue->extra_size.bytes;
  1.1315 +  sq->extra_size.time = mqueue->extra_size.time;
  1.1316 +
  1.1317 +  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
  1.1318 +
  1.1319 +  GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
  1.1320 +
  1.1321 +  sq->mqueue = mqueue;
  1.1322 +  sq->srcresult = GST_FLOW_WRONG_STATE;
  1.1323 +  sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
  1.1324 +      single_queue_check_full, sq);
  1.1325 +  sq->is_eos = FALSE;
  1.1326 +  gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
  1.1327 +  gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
  1.1328 +
  1.1329 +  sq->nextid = 0;
  1.1330 +  sq->oldid = 0;
  1.1331 +  sq->turn = g_cond_new ();
  1.1332 +
  1.1333 +  /* attach to underrun/overrun signals to handle non-starvation  */
  1.1334 +  g_signal_connect (G_OBJECT (sq->queue), "full",
  1.1335 +      G_CALLBACK (single_queue_overrun_cb), sq);
  1.1336 +  g_signal_connect (G_OBJECT (sq->queue), "empty",
  1.1337 +      G_CALLBACK (single_queue_underrun_cb), sq);
  1.1338 +
  1.1339 +  tmp = g_strdup_printf ("sink%d", sq->id);
  1.1340 +  sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
  1.1341 +  g_free (tmp);
  1.1342 +
  1.1343 +  gst_pad_set_chain_function (sq->sinkpad,
  1.1344 +      GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
  1.1345 +  gst_pad_set_activatepush_function (sq->sinkpad,
  1.1346 +      GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
  1.1347 +  gst_pad_set_event_function (sq->sinkpad,
  1.1348 +      GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
  1.1349 +  gst_pad_set_getcaps_function (sq->sinkpad,
  1.1350 +      GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
  1.1351 +  gst_pad_set_bufferalloc_function (sq->sinkpad,
  1.1352 +      GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
  1.1353 +  gst_pad_set_internal_link_function (sq->sinkpad,
  1.1354 +      GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
  1.1355 +
  1.1356 +  tmp = g_strdup_printf ("src%d", sq->id);
  1.1357 +  sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
  1.1358 +  g_free (tmp);
  1.1359 +
  1.1360 +  gst_pad_set_activatepush_function (sq->srcpad,
  1.1361 +      GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
  1.1362 +  gst_pad_set_acceptcaps_function (sq->srcpad,
  1.1363 +      GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
  1.1364 +  gst_pad_set_getcaps_function (sq->srcpad,
  1.1365 +      GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
  1.1366 +  gst_pad_set_event_function (sq->srcpad,
  1.1367 +      GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
  1.1368 +  gst_pad_set_query_function (sq->srcpad,
  1.1369 +      GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
  1.1370 +  gst_pad_set_internal_link_function (sq->srcpad,
  1.1371 +      GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
  1.1372 +
  1.1373 +  gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
  1.1374 +  gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
  1.1375 +
  1.1376 +  gst_pad_set_active (sq->srcpad, TRUE);
  1.1377 +  gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
  1.1378 +
  1.1379 +  gst_pad_set_active (sq->sinkpad, TRUE);
  1.1380 +  gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
  1.1381 +
  1.1382 +  GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
  1.1383 +      sq->id);
  1.1384 +
  1.1385 +  return sq;
  1.1386 +}
  1.1387 +
  1.1388 +static gboolean
  1.1389 +plugin_init (GstPlugin * plugin)
  1.1390 +{
  1.1391 +  return gst_element_register (plugin, "multiqueue", GST_RANK_NONE,
  1.1392 +      GST_TYPE_MULTI_QUEUE);
  1.1393 +}
  1.1394 +
  1.1395 +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
  1.1396 +    GST_VERSION_MINOR,
  1.1397 +    "multiqueue",
  1.1398 +    "multiqueue", plugin_init, VERSION, GST_LICENSE,
  1.1399 +    GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
  1.1400 +