gst-gmyth/multiqueue/gstdataqueue.c
branchtrunk
changeset 791 cdafc5e948b8
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/gst-gmyth/multiqueue/gstdataqueue.c	Thu Aug 02 14:49:26 2007 +0100
     1.3 @@ -0,0 +1,628 @@
     1.4 +/* GStreamer
     1.5 + * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
     1.6 + *
     1.7 + * gstdataqueue.c:
     1.8 + *
     1.9 + * This library is free software; you can redistribute it and/or
    1.10 + * modify it under the terms of the GNU Library General Public
    1.11 + * License as published by the Free Software Foundation; either
    1.12 + * version 2 of the License, or (at your option) any later version.
    1.13 + *
    1.14 + * This library is distributed in the hope that it will be useful,
    1.15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
    1.16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    1.17 + * Library General Public License for more details.
    1.18 + *
    1.19 + * You should have received a copy of the GNU Library General Public
    1.20 + * License along with this library; if not, write to the
    1.21 + * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
    1.22 + * Boston, MA 02111-1307, USA.
    1.23 + */
    1.24 +
    1.25 +/**
    1.26 + * SECTION:gstdataqueue
    1.27 + * @short_description: Threadsafe queueing object
    1.28 + *
    1.29 + * #GstDataQueue is an object that handles threadsafe queueing of objects. It
    1.30 + * also provides size-related functionality. This object should be used for
    1.31 + * any #GstElement that wishes to provide some sort of queueing functionality.
    1.32 + */
    1.33 +
    1.34 +#include <gst/gst.h>
    1.35 +#include "gstdataqueue.h"
    1.36 +
    1.37 +GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
    1.38 +#define GST_CAT_DEFAULT (data_queue_debug)
    1.39 +GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
    1.40 +
    1.41 +
    1.42 +/* Queue signals and args */
    1.43 +enum
    1.44 +{
    1.45 +  SIGNAL_EMPTY,
    1.46 +  SIGNAL_FULL,
    1.47 +  LAST_SIGNAL
    1.48 +};
    1.49 +
    1.50 +enum
    1.51 +{
    1.52 +  ARG_0,
    1.53 +  ARG_CUR_LEVEL_VISIBLE,
    1.54 +  ARG_CUR_LEVEL_BYTES,
    1.55 +  ARG_CUR_LEVEL_TIME
    1.56 +      /* FILL ME */
    1.57 +};
    1.58 +
    1.59 +#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
    1.60 +    GST_CAT_LOG (data_queue_dataflow,                                   \
    1.61 +      "locking qlock from thread %p",                                   \
    1.62 +      g_thread_self ());                                                \
    1.63 +  g_mutex_lock (q->qlock);                                              \
    1.64 +  GST_CAT_LOG (data_queue_dataflow,                                     \
    1.65 +      "locked qlock from thread %p",                                    \
    1.66 +      g_thread_self ());                                                \
    1.67 +} G_STMT_END
    1.68 +
    1.69 +#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
    1.70 +    GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
    1.71 +    if (q->flushing)                                                    \
    1.72 +      goto label;                                                       \
    1.73 +  } G_STMT_END
    1.74 +
    1.75 +#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
    1.76 +    GST_CAT_LOG (data_queue_dataflow,                                   \
    1.77 +      "unlocking qlock from thread %p",                                 \
    1.78 +      g_thread_self ());                                                \
    1.79 +  g_mutex_unlock (q->qlock);                                            \
    1.80 +} G_STMT_END
    1.81 +
    1.82 +#define STATUS(q, msg)                                                  \
    1.83 +  GST_CAT_LOG (data_queue_dataflow,                                     \
    1.84 +               "queue:%p " msg ": %u visible items, %u "                \
    1.85 +               "bytes, %"G_GUINT64_FORMAT                               \
    1.86 +               " ns, %u elements",                                      \
    1.87 +               queue,                                                   \
    1.88 +               q->cur_level.visible,                                    \
    1.89 +               q->cur_level.bytes,                                      \
    1.90 +               q->cur_level.time,                                       \
    1.91 +               q->queue->length)
    1.92 +
    1.93 +static void gst_data_queue_base_init (GstDataQueueClass * klass);
    1.94 +static void gst_data_queue_class_init (GstDataQueueClass * klass);
    1.95 +static void gst_data_queue_init (GstDataQueue * queue);
    1.96 +static void gst_data_queue_finalize (GObject * object);
    1.97 +
    1.98 +static void gst_data_queue_set_property (GObject * object,
    1.99 +    guint prop_id, const GValue * value, GParamSpec * pspec);
   1.100 +static void gst_data_queue_get_property (GObject * object,
   1.101 +    guint prop_id, GValue * value, GParamSpec * pspec);
   1.102 +
   1.103 +static GObjectClass *parent_class = NULL;
   1.104 +static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
   1.105 +
   1.106 +GType
   1.107 +gst_data_queue_get_type (void)
   1.108 +{
   1.109 +  static GType queue_type = 0;
   1.110 +
   1.111 +  if (!queue_type) {
   1.112 +    static const GTypeInfo queue_info = {
   1.113 +      sizeof (GstDataQueueClass),
   1.114 +      (GBaseInitFunc) gst_data_queue_base_init,
   1.115 +      NULL,
   1.116 +      (GClassInitFunc) gst_data_queue_class_init,
   1.117 +      NULL,
   1.118 +      NULL,
   1.119 +      sizeof (GstDataQueue),
   1.120 +      0,
   1.121 +      (GInstanceInitFunc) gst_data_queue_init,
   1.122 +      NULL
   1.123 +    };
   1.124 +
   1.125 +    queue_type = g_type_register_static (G_TYPE_OBJECT,
   1.126 +        "GstDataQueue", &queue_info, 0);
   1.127 +    GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0,
   1.128 +        "data queue object");
   1.129 +    GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0,
   1.130 +        "dataflow inside the data queue object");
   1.131 +  }
   1.132 +
   1.133 +  return queue_type;
   1.134 +}
   1.135 +
   1.136 +static void
   1.137 +gst_data_queue_base_init (GstDataQueueClass * klass)
   1.138 +{
   1.139 +  /* Do we need anything here ?? */
   1.140 +  return;
   1.141 +}
   1.142 +
   1.143 +static void
   1.144 +gst_data_queue_class_init (GstDataQueueClass * klass)
   1.145 +{
   1.146 +  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   1.147 +
   1.148 +  parent_class = g_type_class_peek_parent (klass);
   1.149 +
   1.150 +  gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property);
   1.151 +  gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property);
   1.152 +
   1.153 +  /* signals */
   1.154 +  /**
   1.155 +   * GstDataQueue::empty:
   1.156 +   * @queue: the queue instance
   1.157 +   *
   1.158 +   * Reports that the queue became empty (empty).
   1.159 +   * A queue is empty if the total amount of visible items inside it (num-visible, time,
   1.160 +   * size) is lower than the boundary values which can be set through the GObject
   1.161 +   * properties.
   1.162 +   */
   1.163 +  gst_data_queue_signals[SIGNAL_EMPTY] =
   1.164 +      g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   1.165 +      G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
   1.166 +      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   1.167 +
   1.168 +  /**
   1.169 +   * GstDataQueue::full:
   1.170 +   * @queue: the queue instance
   1.171 +   *
   1.172 +   * Reports that the queue became full (full).
   1.173 +   * A queue is full if the total amount of data inside it (num-visible, time,
   1.174 +   * size) is higher than the boundary values which can be set through the GObject
   1.175 +   * properties.
   1.176 +   */
   1.177 +  gst_data_queue_signals[SIGNAL_FULL] =
   1.178 +      g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   1.179 +      G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
   1.180 +      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   1.181 +
   1.182 +  /* properties */
   1.183 +  g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
   1.184 +      g_param_spec_uint ("current-level-bytes", "Current level (kB)",
   1.185 +          "Current amount of data in the queue (bytes)",
   1.186 +          0, G_MAXUINT, 0, G_PARAM_READABLE));
   1.187 +  g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
   1.188 +      g_param_spec_uint ("current-level-visible",
   1.189 +          "Current level (visible items)",
   1.190 +          "Current number of visible items in the queue", 0, G_MAXUINT, 0,
   1.191 +          G_PARAM_READABLE));
   1.192 +  g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
   1.193 +      g_param_spec_uint64 ("current-level-time", "Current level (ns)",
   1.194 +          "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
   1.195 +          G_PARAM_READABLE));
   1.196 +
   1.197 +  /* set several parent class virtual functions */
   1.198 +  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);
   1.199 +
   1.200 +}
   1.201 +
   1.202 +static void
   1.203 +gst_data_queue_init (GstDataQueue * queue)
   1.204 +{
   1.205 +  queue->cur_level.visible = 0; /* no content */
   1.206 +  queue->cur_level.bytes = 0;   /* no content */
   1.207 +  queue->cur_level.time = 0;    /* no content */
   1.208 +
   1.209 +  queue->checkfull = NULL;
   1.210 +
   1.211 +  queue->qlock = g_mutex_new ();
   1.212 +  queue->item_add = g_cond_new ();
   1.213 +  queue->item_del = g_cond_new ();
   1.214 +  queue->queue = g_queue_new ();
   1.215 +
   1.216 +  GST_DEBUG ("initialized queue's not_empty & not_full conditions");
   1.217 +}
   1.218 +
   1.219 +/**
   1.220 + * gst_data_queue_new:
   1.221 + * @checkfull: the callback used to tell if the element considers the queue full
   1.222 + * or not.
   1.223 + * @checkdata: a #gpointer that will be given in the @checkfull callback.
   1.224 + *
   1.225 + * Returns: a new #GstDataQueue.
   1.226 + */
   1.227 +
   1.228 +GstDataQueue *
   1.229 +gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
   1.230 +{
   1.231 +  GstDataQueue *ret;
   1.232 +
   1.233 +  g_return_val_if_fail (checkfull != NULL, NULL);
   1.234 +
   1.235 +  ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
   1.236 +  ret->checkfull = checkfull;
   1.237 +  ret->checkdata = checkdata;
   1.238 +
   1.239 +  return ret;
   1.240 +}
   1.241 +
   1.242 +static void
   1.243 +gst_data_queue_cleanup (GstDataQueue * queue)
   1.244 +{
   1.245 +  while (!g_queue_is_empty (queue->queue)) {
   1.246 +    GstDataQueueItem *item = g_queue_pop_head (queue->queue);
   1.247 +
   1.248 +    /* Just call the destroy notify on the item */
   1.249 +    item->destroy (item);
   1.250 +  }
   1.251 +  queue->cur_level.visible = 0;
   1.252 +  queue->cur_level.bytes = 0;
   1.253 +  queue->cur_level.time = 0;
   1.254 +}
   1.255 +
   1.256 +/* called only once, as opposed to dispose */
   1.257 +static void
   1.258 +gst_data_queue_finalize (GObject * object)
   1.259 +{
   1.260 +  GstDataQueue *queue = GST_DATA_QUEUE (object);
   1.261 +
   1.262 +  GST_DEBUG ("finalizing queue");
   1.263 +
   1.264 +  gst_data_queue_cleanup (queue);
   1.265 +  g_queue_free (queue->queue);
   1.266 +
   1.267 +  GST_DEBUG ("free mutex");
   1.268 +  g_mutex_free (queue->qlock);
   1.269 +  GST_DEBUG ("done free mutex");
   1.270 +
   1.271 +  g_cond_free (queue->item_add);
   1.272 +  g_cond_free (queue->item_del);
   1.273 +
   1.274 +  G_OBJECT_CLASS (parent_class)->finalize (object);
   1.275 +}
   1.276 +
   1.277 +static void
   1.278 +gst_data_queue_locked_flush (GstDataQueue * queue)
   1.279 +{
   1.280 +  STATUS (queue, "before flushing");
   1.281 +  gst_data_queue_cleanup (queue);
   1.282 +  STATUS (queue, "after flushing");
   1.283 +  /* we deleted something... */
   1.284 +  g_cond_signal (queue->item_del);
   1.285 +}
   1.286 +
   1.287 +static gboolean
   1.288 +gst_data_queue_locked_is_empty (GstDataQueue * queue)
   1.289 +{
   1.290 +  return (queue->queue->length == 0);
   1.291 +}
   1.292 +
   1.293 +static gboolean
   1.294 +gst_data_queue_locked_is_full (GstDataQueue * queue)
   1.295 +{
   1.296 +  return queue->checkfull (queue, queue->cur_level.visible,
   1.297 +      queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
   1.298 +}
   1.299 +
   1.300 +/**
   1.301 + * gst_data_queue_flush:
   1.302 + * @queue: a #GstDataQueue.
   1.303 + *
   1.304 + * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and
   1.305 + * #gst_data_queue_pop will be released.
   1.306 + * MT safe.
   1.307 + */
   1.308 +void
   1.309 +gst_data_queue_flush (GstDataQueue * queue)
   1.310 +{
   1.311 +  GST_DEBUG ("queue:%p", queue);
   1.312 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.313 +  gst_data_queue_locked_flush (queue);
   1.314 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.315 +}
   1.316 +
   1.317 +/**
   1.318 + * gst_data_queue_is_empty:
   1.319 + * @queue: a #GstDataQueue.
   1.320 + *
   1.321 + * Queries if there are any items in the @queue.
   1.322 + * MT safe.
   1.323 + *
   1.324 + * Returns: #TRUE if @queue is empty.
   1.325 + */
   1.326 +gboolean
   1.327 +gst_data_queue_is_empty (GstDataQueue * queue)
   1.328 +{
   1.329 +  gboolean res;
   1.330 +
   1.331 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.332 +  res = gst_data_queue_locked_is_empty (queue);
   1.333 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.334 +
   1.335 +  return res;
   1.336 +}
   1.337 +
   1.338 +/**
   1.339 + * gst_data_queue_is_full:
   1.340 + * @queue: a #GstDataQueue.
   1.341 + *
   1.342 + * Queries if @queue is full. This check will be done using the
   1.343 + * #GstDataQueueCheckFullCallback registered with @queue.
   1.344 + * MT safe.
   1.345 + *
   1.346 + * Returns: #TRUE if @queue is full.
   1.347 + */
   1.348 +gboolean
   1.349 +gst_data_queue_is_full (GstDataQueue * queue)
   1.350 +{
   1.351 +  gboolean res;
   1.352 +
   1.353 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.354 +  res = gst_data_queue_locked_is_full (queue);
   1.355 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.356 +
   1.357 +  return res;
   1.358 +}
   1.359 +
   1.360 +/**
   1.361 + * gst_data_queue_set_flushing:
   1.362 + * @queue: a #GstDataQueue.
   1.363 + * @flushing: a #gboolean stating if the queue will be flushing or not.
   1.364 + *
   1.365 + * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
   1.366 + * state, any incoming data on the @queue will be discarded. Any call currently
   1.367 + * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
   1.368 + * away with a return value of #FALSE. While the @queue is in flushing state, 
   1.369 + * all calls to those two functions will return #FALSE.
   1.370 + *
   1.371 + * MT Safe.
   1.372 + */
   1.373 +void
   1.374 +gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
   1.375 +{
   1.376 +  GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
   1.377 +
   1.378 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.379 +  queue->flushing = flushing;
   1.380 +  if (flushing) {
   1.381 +    /* release push/pop functions */
   1.382 +    g_cond_signal (queue->item_add);
   1.383 +    g_cond_signal (queue->item_del);
   1.384 +  }
   1.385 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.386 +}
   1.387 +
   1.388 +/**
   1.389 + * gst_data_queue_push:
   1.390 + * @queue: a #GstDataQueue.
   1.391 + * @item: a #GstDataQueueItem.
   1.392 + *
   1.393 + * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
   1.394 + * on the @queue. If the @queue is full, the call will block until space is
   1.395 + * available, OR the @queue is set to flushing state.
   1.396 + * MT safe.
   1.397 + *
   1.398 + * Note that this function has slightly different semantics than gst_pad_push()
   1.399 + * and gst_pad_push_event(): this function only takes ownership of @item and
   1.400 + * the #GstMiniObject contained in @item if the push was successful. If FALSE
   1.401 + * is returned, the caller is responsible for freeing @item and its contents.
   1.402 + *
   1.403 + * Returns: #TRUE if the @item was successfully pushed on the @queue.
   1.404 + */
   1.405 +gboolean
   1.406 +gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
   1.407 +{
   1.408 +  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
   1.409 +  g_return_val_if_fail (item != NULL, FALSE);
   1.410 +
   1.411 +  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   1.412 +
   1.413 +  STATUS (queue, "before pushing");
   1.414 +
   1.415 +  /* We ALWAYS need to check for queue fillness */
   1.416 +  if (gst_data_queue_locked_is_full (queue)) {
   1.417 +    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.418 +    g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
   1.419 +    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   1.420 +
   1.421 +    /* signal might have removed some items */
   1.422 +    while (gst_data_queue_locked_is_full (queue)) {
   1.423 +      g_cond_wait (queue->item_del, queue->qlock);
   1.424 +      if (queue->flushing)
   1.425 +        goto flushing;
   1.426 +    }
   1.427 +  }
   1.428 +
   1.429 +  g_queue_push_tail (queue->queue, item);
   1.430 +
   1.431 +  if (item->visible)
   1.432 +    queue->cur_level.visible++;
   1.433 +  queue->cur_level.bytes += item->size;
   1.434 +  queue->cur_level.time += item->duration;
   1.435 +
   1.436 +  STATUS (queue, "after pushing");
   1.437 +  g_cond_signal (queue->item_add);
   1.438 +
   1.439 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.440 +
   1.441 +  return TRUE;
   1.442 +
   1.443 +  /* ERRORS */
   1.444 +flushing:
   1.445 +  {
   1.446 +    GST_DEBUG ("queue:%p, we are flushing", queue);
   1.447 +    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.448 +    return FALSE;
   1.449 +  }
   1.450 +}
   1.451 +
   1.452 +/**
   1.453 + * gst_data_queue_pop:
   1.454 + * @queue: a #GstDataQueue.
   1.455 + * @item: pointer to store the returned #GstDataQueueItem.
   1.456 + *
   1.457 + * Retrieves the first @item available on the @queue. If the queue is currently
   1.458 + * empty, the call will block until at least one item is available, OR the
   1.459 + * @queue is set to the flushing state.
   1.460 + * MT safe.
   1.461 + *
   1.462 + * Returns: #TRUE if an @item was successfully retrieved from the @queue.
   1.463 + */
   1.464 +gboolean
   1.465 +gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
   1.466 +{
   1.467 +  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
   1.468 +  g_return_val_if_fail (item != NULL, FALSE);
   1.469 +
   1.470 +  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   1.471 +
   1.472 +  STATUS (queue, "before popping");
   1.473 +
   1.474 +  if (gst_data_queue_locked_is_empty (queue)) {
   1.475 +    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.476 +    g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
   1.477 +    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   1.478 +
   1.479 +    while (gst_data_queue_locked_is_empty (queue)) {
   1.480 +      g_cond_wait (queue->item_add, queue->qlock);
   1.481 +      if (queue->flushing)
   1.482 +        goto flushing;
   1.483 +    }
   1.484 +  }
   1.485 +
   1.486 +  /* Get the item from the GQueue */
   1.487 +  *item = g_queue_pop_head (queue->queue);
   1.488 +
   1.489 +  /* update current level counter */
   1.490 +  if ((*item)->visible)
   1.491 +    queue->cur_level.visible--;
   1.492 +  queue->cur_level.bytes -= (*item)->size;
   1.493 +  queue->cur_level.time -= (*item)->duration;
   1.494 +
   1.495 +  STATUS (queue, "after popping");
   1.496 +  g_cond_signal (queue->item_del);
   1.497 +
   1.498 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.499 +
   1.500 +  return TRUE;
   1.501 +
   1.502 +  /* ERRORS */
   1.503 +flushing:
   1.504 +  {
   1.505 +    GST_DEBUG ("queue:%p, we are flushing", queue);
   1.506 +    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.507 +    return FALSE;
   1.508 +  }
   1.509 +}
   1.510 +
   1.511 +/**
   1.512 + * gst_data_queue_drop_head:
   1.513 + * @queue: The #GstDataQueue to drop an item from.
   1.514 + * @type: The #GType of the item to drop.
   1.515 + *
   1.516 + * Pop and unref the head-most #GstMiniObject with the given #GType.
   1.517 + *
   1.518 + * Returns: TRUE if an element was removed.
   1.519 + */
   1.520 +gboolean
   1.521 +gst_data_queue_drop_head (GstDataQueue * queue, GType type)
   1.522 +{
   1.523 +  gboolean res = FALSE;
   1.524 +  GList *item;
   1.525 +  GstDataQueueItem *leak = NULL;
   1.526 +
   1.527 +  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
   1.528 +
   1.529 +  GST_DEBUG ("queue:%p", queue);
   1.530 +
   1.531 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.532 +  for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
   1.533 +    GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
   1.534 +
   1.535 +    if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
   1.536 +      leak = tmp;
   1.537 +      break;
   1.538 +    }
   1.539 +  }
   1.540 +
   1.541 +  if (!leak)
   1.542 +    goto done;
   1.543 +
   1.544 +  g_queue_delete_link (queue->queue, item);
   1.545 +
   1.546 +  if (leak->visible)
   1.547 +    queue->cur_level.visible--;
   1.548 +  queue->cur_level.bytes -= leak->size;
   1.549 +  queue->cur_level.time -= leak->duration;
   1.550 +
   1.551 +  leak->destroy (leak);
   1.552 +
   1.553 +  res = TRUE;
   1.554 +
   1.555 +done:
   1.556 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.557 +
   1.558 +  GST_DEBUG ("queue:%p , res:%d", queue, res);
   1.559 +
   1.560 +  return res;
   1.561 +}
   1.562 +
   1.563 +/**
   1.564 + * gst_data_queue_limits_changed:
   1.565 + * @queue: The #GstDataQueue 
   1.566 + *
   1.567 + * Inform the queue that the limits for the fullness check have changed and that
   1.568 + * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
   1.569 + */
   1.570 +void
   1.571 +gst_data_queue_limits_changed (GstDataQueue * queue)
   1.572 +{
   1.573 +  g_return_if_fail (GST_IS_DATA_QUEUE (queue));
   1.574 +
   1.575 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.576 +  GST_DEBUG ("signal del");
   1.577 +  g_cond_signal (queue->item_del);
   1.578 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.579 +}
   1.580 +
   1.581 +/**
   1.582 + * gst_data_queue_get_level:
   1.583 + * @queue: The #GstDataQueue
   1.584 + * @level: the location to store the result
   1.585 + *
   1.586 + * Get the current level of the queue.
   1.587 + */
   1.588 +void
   1.589 +gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
   1.590 +{
   1.591 +  level->visible = queue->cur_level.visible;
   1.592 +  level->bytes = queue->cur_level.bytes;
   1.593 +  level->time = queue->cur_level.time;
   1.594 +}
   1.595 +
   1.596 +static void
   1.597 +gst_data_queue_set_property (GObject * object,
   1.598 +    guint prop_id, const GValue * value, GParamSpec * pspec)
   1.599 +{
   1.600 +  switch (prop_id) {
   1.601 +    default:
   1.602 +      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   1.603 +      break;
   1.604 +  }
   1.605 +}
   1.606 +
   1.607 +static void
   1.608 +gst_data_queue_get_property (GObject * object,
   1.609 +    guint prop_id, GValue * value, GParamSpec * pspec)
   1.610 +{
   1.611 +  GstDataQueue *queue = GST_DATA_QUEUE (object);
   1.612 +
   1.613 +  GST_DATA_QUEUE_MUTEX_LOCK (queue);
   1.614 +
   1.615 +  switch (prop_id) {
   1.616 +    case ARG_CUR_LEVEL_BYTES:
   1.617 +      g_value_set_uint (value, queue->cur_level.bytes);
   1.618 +      break;
   1.619 +    case ARG_CUR_LEVEL_VISIBLE:
   1.620 +      g_value_set_uint (value, queue->cur_level.visible);
   1.621 +      break;
   1.622 +    case ARG_CUR_LEVEL_TIME:
   1.623 +      g_value_set_uint64 (value, queue->cur_level.time);
   1.624 +      break;
   1.625 +    default:
   1.626 +      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   1.627 +      break;
   1.628 +  }
   1.629 +
   1.630 +  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   1.631 +}