1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/gst-gmyth/multiqueue/gstdataqueue.c Tue Sep 18 18:19:39 2007 +0100
1.3 @@ -0,0 +1,628 @@
1.4 +/* GStreamer
1.5 + * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
1.6 + *
1.7 + * gstdataqueue.c:
1.8 + *
1.9 + * This library is free software; you can redistribute it and/or
1.10 + * modify it under the terms of the GNU Library General Public
1.11 + * License as published by the Free Software Foundation; either
1.12 + * version 2 of the License, or (at your option) any later version.
1.13 + *
1.14 + * This library is distributed in the hope that it will be useful,
1.15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1.17 + * Library General Public License for more details.
1.18 + *
1.19 + * You should have received a copy of the GNU Library General Public
1.20 + * License along with this library; if not, write to the
1.21 + * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
1.22 + * Boston, MA 02111-1307, USA.
1.23 + */
1.24 +
1.25 +/**
1.26 + * SECTION:gstdataqueue
1.27 + * @short_description: Threadsafe queueing object
1.28 + *
1.29 + * #GstDataQueue is an object that handles threadsafe queueing of objects. It
1.30 + * also provides size-related functionality. This object should be used for
1.31 + * any #GstElement that wishes to provide some sort of queueing functionality.
1.32 + */
1.33 +
1.34 +#include <gst/gst.h>
1.35 +#include "gstdataqueue.h"
1.36 +
1.37 +GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
1.38 +#define GST_CAT_DEFAULT (data_queue_debug)
1.39 +GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
1.40 +
1.41 +
1.42 +/* Queue signals and args */
1.43 +enum
1.44 +{
1.45 + SIGNAL_EMPTY,
1.46 + SIGNAL_FULL,
1.47 + LAST_SIGNAL
1.48 +};
1.49 +
1.50 +enum
1.51 +{
1.52 + ARG_0,
1.53 + ARG_CUR_LEVEL_VISIBLE,
1.54 + ARG_CUR_LEVEL_BYTES,
1.55 + ARG_CUR_LEVEL_TIME
1.56 + /* FILL ME */
1.57 +};
1.58 +
1.59 +#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
1.60 + GST_CAT_LOG (data_queue_dataflow, \
1.61 + "locking qlock from thread %p", \
1.62 + g_thread_self ()); \
1.63 + g_mutex_lock (q->qlock); \
1.64 + GST_CAT_LOG (data_queue_dataflow, \
1.65 + "locked qlock from thread %p", \
1.66 + g_thread_self ()); \
1.67 +} G_STMT_END
1.68 +
1.69 +#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \
1.70 + GST_DATA_QUEUE_MUTEX_LOCK (q); \
1.71 + if (q->flushing) \
1.72 + goto label; \
1.73 + } G_STMT_END
1.74 +
1.75 +#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
1.76 + GST_CAT_LOG (data_queue_dataflow, \
1.77 + "unlocking qlock from thread %p", \
1.78 + g_thread_self ()); \
1.79 + g_mutex_unlock (q->qlock); \
1.80 +} G_STMT_END
1.81 +
1.82 +#define STATUS(q, msg) \
1.83 + GST_CAT_LOG (data_queue_dataflow, \
1.84 + "queue:%p " msg ": %u visible items, %u " \
1.85 + "bytes, %"G_GUINT64_FORMAT \
1.86 + " ns, %u elements", \
1.87 + queue, \
1.88 + q->cur_level.visible, \
1.89 + q->cur_level.bytes, \
1.90 + q->cur_level.time, \
1.91 + q->queue->length)
1.92 +
1.93 +static void gst_data_queue_base_init (GstDataQueueClass * klass);
1.94 +static void gst_data_queue_class_init (GstDataQueueClass * klass);
1.95 +static void gst_data_queue_init (GstDataQueue * queue);
1.96 +static void gst_data_queue_finalize (GObject * object);
1.97 +
1.98 +static void gst_data_queue_set_property (GObject * object,
1.99 + guint prop_id, const GValue * value, GParamSpec * pspec);
1.100 +static void gst_data_queue_get_property (GObject * object,
1.101 + guint prop_id, GValue * value, GParamSpec * pspec);
1.102 +
1.103 +static GObjectClass *parent_class = NULL;
1.104 +static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
1.105 +
1.106 +GType
1.107 +gst_data_queue_get_type (void)
1.108 +{
1.109 + static GType queue_type = 0;
1.110 +
1.111 + if (!queue_type) {
1.112 + static const GTypeInfo queue_info = {
1.113 + sizeof (GstDataQueueClass),
1.114 + (GBaseInitFunc) gst_data_queue_base_init,
1.115 + NULL,
1.116 + (GClassInitFunc) gst_data_queue_class_init,
1.117 + NULL,
1.118 + NULL,
1.119 + sizeof (GstDataQueue),
1.120 + 0,
1.121 + (GInstanceInitFunc) gst_data_queue_init,
1.122 + NULL
1.123 + };
1.124 +
1.125 + queue_type = g_type_register_static (G_TYPE_OBJECT,
1.126 + "GstDataQueue", &queue_info, 0);
1.127 + GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0,
1.128 + "data queue object");
1.129 + GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0,
1.130 + "dataflow inside the data queue object");
1.131 + }
1.132 +
1.133 + return queue_type;
1.134 +}
1.135 +
1.136 +static void
1.137 +gst_data_queue_base_init (GstDataQueueClass * klass)
1.138 +{
1.139 + /* Do we need anything here ?? */
1.140 + return;
1.141 +}
1.142 +
1.143 +static void
1.144 +gst_data_queue_class_init (GstDataQueueClass * klass)
1.145 +{
1.146 + GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1.147 +
1.148 + parent_class = g_type_class_peek_parent (klass);
1.149 +
1.150 + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property);
1.151 + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property);
1.152 +
1.153 + /* signals */
1.154 + /**
1.155 + * GstDataQueue::empty:
1.156 + * @queue: the queue instance
1.157 + *
1.158 + * Reports that the queue became empty (empty).
1.159 + * A queue is empty if the total amount of visible items inside it (num-visible, time,
1.160 + * size) is lower than the boundary values which can be set through the GObject
1.161 + * properties.
1.162 + */
1.163 + gst_data_queue_signals[SIGNAL_EMPTY] =
1.164 + g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
1.165 + G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
1.166 + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
1.167 +
1.168 + /**
1.169 + * GstDataQueue::full:
1.170 + * @queue: the queue instance
1.171 + *
1.172 + * Reports that the queue became full (full).
1.173 + * A queue is full if the total amount of data inside it (num-visible, time,
1.174 + * size) is higher than the boundary values which can be set through the GObject
1.175 + * properties.
1.176 + */
1.177 + gst_data_queue_signals[SIGNAL_FULL] =
1.178 + g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
1.179 + G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
1.180 + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
1.181 +
1.182 + /* properties */
1.183 + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
1.184 + g_param_spec_uint ("current-level-bytes", "Current level (kB)",
1.185 + "Current amount of data in the queue (bytes)",
1.186 + 0, G_MAXUINT, 0, G_PARAM_READABLE));
1.187 + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
1.188 + g_param_spec_uint ("current-level-visible",
1.189 + "Current level (visible items)",
1.190 + "Current number of visible items in the queue", 0, G_MAXUINT, 0,
1.191 + G_PARAM_READABLE));
1.192 + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
1.193 + g_param_spec_uint64 ("current-level-time", "Current level (ns)",
1.194 + "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
1.195 + G_PARAM_READABLE));
1.196 +
1.197 + /* set several parent class virtual functions */
1.198 + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);
1.199 +
1.200 +}
1.201 +
1.202 +static void
1.203 +gst_data_queue_init (GstDataQueue * queue)
1.204 +{
1.205 + queue->cur_level.visible = 0; /* no content */
1.206 + queue->cur_level.bytes = 0; /* no content */
1.207 + queue->cur_level.time = 0; /* no content */
1.208 +
1.209 + queue->checkfull = NULL;
1.210 +
1.211 + queue->qlock = g_mutex_new ();
1.212 + queue->item_add = g_cond_new ();
1.213 + queue->item_del = g_cond_new ();
1.214 + queue->queue = g_queue_new ();
1.215 +
1.216 + GST_DEBUG ("initialized queue's not_empty & not_full conditions");
1.217 +}
1.218 +
1.219 +/**
1.220 + * gst_data_queue_new:
1.221 + * @checkfull: the callback used to tell if the element considers the queue full
1.222 + * or not.
1.223 + * @checkdata: a #gpointer that will be given in the @checkfull callback.
1.224 + *
1.225 + * Returns: a new #GstDataQueue.
1.226 + */
1.227 +
1.228 +GstDataQueue *
1.229 +gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
1.230 +{
1.231 + GstDataQueue *ret;
1.232 +
1.233 + g_return_val_if_fail (checkfull != NULL, NULL);
1.234 +
1.235 + ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
1.236 + ret->checkfull = checkfull;
1.237 + ret->checkdata = checkdata;
1.238 +
1.239 + return ret;
1.240 +}
1.241 +
1.242 +static void
1.243 +gst_data_queue_cleanup (GstDataQueue * queue)
1.244 +{
1.245 + while (!g_queue_is_empty (queue->queue)) {
1.246 + GstDataQueueItem *item = g_queue_pop_head (queue->queue);
1.247 +
1.248 + /* Just call the destroy notify on the item */
1.249 + item->destroy (item);
1.250 + }
1.251 + queue->cur_level.visible = 0;
1.252 + queue->cur_level.bytes = 0;
1.253 + queue->cur_level.time = 0;
1.254 +}
1.255 +
1.256 +/* called only once, as opposed to dispose */
1.257 +static void
1.258 +gst_data_queue_finalize (GObject * object)
1.259 +{
1.260 + GstDataQueue *queue = GST_DATA_QUEUE (object);
1.261 +
1.262 + GST_DEBUG ("finalizing queue");
1.263 +
1.264 + gst_data_queue_cleanup (queue);
1.265 + g_queue_free (queue->queue);
1.266 +
1.267 + GST_DEBUG ("free mutex");
1.268 + g_mutex_free (queue->qlock);
1.269 + GST_DEBUG ("done free mutex");
1.270 +
1.271 + g_cond_free (queue->item_add);
1.272 + g_cond_free (queue->item_del);
1.273 +
1.274 + G_OBJECT_CLASS (parent_class)->finalize (object);
1.275 +}
1.276 +
1.277 +static void
1.278 +gst_data_queue_locked_flush (GstDataQueue * queue)
1.279 +{
1.280 + STATUS (queue, "before flushing");
1.281 + gst_data_queue_cleanup (queue);
1.282 + STATUS (queue, "after flushing");
1.283 + /* we deleted something... */
1.284 + g_cond_signal (queue->item_del);
1.285 +}
1.286 +
1.287 +static gboolean
1.288 +gst_data_queue_locked_is_empty (GstDataQueue * queue)
1.289 +{
1.290 + return (queue->queue->length == 0);
1.291 +}
1.292 +
1.293 +static gboolean
1.294 +gst_data_queue_locked_is_full (GstDataQueue * queue)
1.295 +{
1.296 + return queue->checkfull (queue, queue->cur_level.visible,
1.297 + queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
1.298 +}
1.299 +
1.300 +/**
1.301 + * gst_data_queue_flush:
1.302 + * @queue: a #GstDataQueue.
1.303 + *
1.304 + * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and
1.305 + * #gst_data_queue_pop will be released.
1.306 + * MT safe.
1.307 + */
1.308 +void
1.309 +gst_data_queue_flush (GstDataQueue * queue)
1.310 +{
1.311 + GST_DEBUG ("queue:%p", queue);
1.312 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.313 + gst_data_queue_locked_flush (queue);
1.314 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.315 +}
1.316 +
1.317 +/**
1.318 + * gst_data_queue_is_empty:
1.319 + * @queue: a #GstDataQueue.
1.320 + *
1.321 + * Queries if there are any items in the @queue.
1.322 + * MT safe.
1.323 + *
1.324 + * Returns: #TRUE if @queue is empty.
1.325 + */
1.326 +gboolean
1.327 +gst_data_queue_is_empty (GstDataQueue * queue)
1.328 +{
1.329 + gboolean res;
1.330 +
1.331 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.332 + res = gst_data_queue_locked_is_empty (queue);
1.333 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.334 +
1.335 + return res;
1.336 +}
1.337 +
1.338 +/**
1.339 + * gst_data_queue_is_full:
1.340 + * @queue: a #GstDataQueue.
1.341 + *
1.342 + * Queries if @queue is full. This check will be done using the
1.343 + * #GstDataQueueCheckFullCallback registered with @queue.
1.344 + * MT safe.
1.345 + *
1.346 + * Returns: #TRUE if @queue is full.
1.347 + */
1.348 +gboolean
1.349 +gst_data_queue_is_full (GstDataQueue * queue)
1.350 +{
1.351 + gboolean res;
1.352 +
1.353 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.354 + res = gst_data_queue_locked_is_full (queue);
1.355 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.356 +
1.357 + return res;
1.358 +}
1.359 +
1.360 +/**
1.361 + * gst_data_queue_set_flushing:
1.362 + * @queue: a #GstDataQueue.
1.363 + * @flushing: a #gboolean stating if the queue will be flushing or not.
1.364 + *
1.365 + * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
1.366 + * state, any incoming data on the @queue will be discarded. Any call currently
1.367 + * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
1.368 + * away with a return value of #FALSE. While the @queue is in flushing state,
1.369 + * all calls to those two functions will return #FALSE.
1.370 + *
1.371 + * MT Safe.
1.372 + */
1.373 +void
1.374 +gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
1.375 +{
1.376 + GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
1.377 +
1.378 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.379 + queue->flushing = flushing;
1.380 + if (flushing) {
1.381 + /* release push/pop functions */
1.382 + g_cond_signal (queue->item_add);
1.383 + g_cond_signal (queue->item_del);
1.384 + }
1.385 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.386 +}
1.387 +
1.388 +/**
1.389 + * gst_data_queue_push:
1.390 + * @queue: a #GstDataQueue.
1.391 + * @item: a #GstDataQueueItem.
1.392 + *
1.393 + * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
1.394 + * on the @queue. If the @queue is full, the call will block until space is
1.395 + * available, OR the @queue is set to flushing state.
1.396 + * MT safe.
1.397 + *
1.398 + * Note that this function has slightly different semantics than gst_pad_push()
1.399 + * and gst_pad_push_event(): this function only takes ownership of @item and
1.400 + * the #GstMiniObject contained in @item if the push was successful. If FALSE
1.401 + * is returned, the caller is responsible for freeing @item and its contents.
1.402 + *
1.403 + * Returns: #TRUE if the @item was successfully pushed on the @queue.
1.404 + */
1.405 +gboolean
1.406 +gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
1.407 +{
1.408 + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
1.409 + g_return_val_if_fail (item != NULL, FALSE);
1.410 +
1.411 + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
1.412 +
1.413 + STATUS (queue, "before pushing");
1.414 +
1.415 + /* We ALWAYS need to check for queue fillness */
1.416 + if (gst_data_queue_locked_is_full (queue)) {
1.417 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.418 + g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
1.419 + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
1.420 +
1.421 + /* signal might have removed some items */
1.422 + while (gst_data_queue_locked_is_full (queue)) {
1.423 + g_cond_wait (queue->item_del, queue->qlock);
1.424 + if (queue->flushing)
1.425 + goto flushing;
1.426 + }
1.427 + }
1.428 +
1.429 + g_queue_push_tail (queue->queue, item);
1.430 +
1.431 + if (item->visible)
1.432 + queue->cur_level.visible++;
1.433 + queue->cur_level.bytes += item->size;
1.434 + queue->cur_level.time += item->duration;
1.435 +
1.436 + STATUS (queue, "after pushing");
1.437 + g_cond_signal (queue->item_add);
1.438 +
1.439 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.440 +
1.441 + return TRUE;
1.442 +
1.443 + /* ERRORS */
1.444 +flushing:
1.445 + {
1.446 + GST_DEBUG ("queue:%p, we are flushing", queue);
1.447 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.448 + return FALSE;
1.449 + }
1.450 +}
1.451 +
1.452 +/**
1.453 + * gst_data_queue_pop:
1.454 + * @queue: a #GstDataQueue.
1.455 + * @item: pointer to store the returned #GstDataQueueItem.
1.456 + *
1.457 + * Retrieves the first @item available on the @queue. If the queue is currently
1.458 + * empty, the call will block until at least one item is available, OR the
1.459 + * @queue is set to the flushing state.
1.460 + * MT safe.
1.461 + *
1.462 + * Returns: #TRUE if an @item was successfully retrieved from the @queue.
1.463 + */
1.464 +gboolean
1.465 +gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
1.466 +{
1.467 + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
1.468 + g_return_val_if_fail (item != NULL, FALSE);
1.469 +
1.470 + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
1.471 +
1.472 + STATUS (queue, "before popping");
1.473 +
1.474 + if (gst_data_queue_locked_is_empty (queue)) {
1.475 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.476 + g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
1.477 + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
1.478 +
1.479 + while (gst_data_queue_locked_is_empty (queue)) {
1.480 + g_cond_wait (queue->item_add, queue->qlock);
1.481 + if (queue->flushing)
1.482 + goto flushing;
1.483 + }
1.484 + }
1.485 +
1.486 + /* Get the item from the GQueue */
1.487 + *item = g_queue_pop_head (queue->queue);
1.488 +
1.489 + /* update current level counter */
1.490 + if ((*item)->visible)
1.491 + queue->cur_level.visible--;
1.492 + queue->cur_level.bytes -= (*item)->size;
1.493 + queue->cur_level.time -= (*item)->duration;
1.494 +
1.495 + STATUS (queue, "after popping");
1.496 + g_cond_signal (queue->item_del);
1.497 +
1.498 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.499 +
1.500 + return TRUE;
1.501 +
1.502 + /* ERRORS */
1.503 +flushing:
1.504 + {
1.505 + GST_DEBUG ("queue:%p, we are flushing", queue);
1.506 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.507 + return FALSE;
1.508 + }
1.509 +}
1.510 +
1.511 +/**
1.512 + * gst_data_queue_drop_head:
1.513 + * @queue: The #GstDataQueue to drop an item from.
1.514 + * @type: The #GType of the item to drop.
1.515 + *
1.516 + * Pop and unref the head-most #GstMiniObject with the given #GType.
1.517 + *
1.518 + * Returns: TRUE if an element was removed.
1.519 + */
1.520 +gboolean
1.521 +gst_data_queue_drop_head (GstDataQueue * queue, GType type)
1.522 +{
1.523 + gboolean res = FALSE;
1.524 + GList *item;
1.525 + GstDataQueueItem *leak = NULL;
1.526 +
1.527 + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
1.528 +
1.529 + GST_DEBUG ("queue:%p", queue);
1.530 +
1.531 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.532 + for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
1.533 + GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
1.534 +
1.535 + if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
1.536 + leak = tmp;
1.537 + break;
1.538 + }
1.539 + }
1.540 +
1.541 + if (!leak)
1.542 + goto done;
1.543 +
1.544 + g_queue_delete_link (queue->queue, item);
1.545 +
1.546 + if (leak->visible)
1.547 + queue->cur_level.visible--;
1.548 + queue->cur_level.bytes -= leak->size;
1.549 + queue->cur_level.time -= leak->duration;
1.550 +
1.551 + leak->destroy (leak);
1.552 +
1.553 + res = TRUE;
1.554 +
1.555 +done:
1.556 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.557 +
1.558 + GST_DEBUG ("queue:%p , res:%d", queue, res);
1.559 +
1.560 + return res;
1.561 +}
1.562 +
1.563 +/**
1.564 + * gst_data_queue_limits_changed:
1.565 + * @queue: The #GstDataQueue
1.566 + *
1.567 + * Inform the queue that the limits for the fullness check have changed and that
1.568 + * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
1.569 + */
1.570 +void
1.571 +gst_data_queue_limits_changed (GstDataQueue * queue)
1.572 +{
1.573 + g_return_if_fail (GST_IS_DATA_QUEUE (queue));
1.574 +
1.575 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.576 + GST_DEBUG ("signal del");
1.577 + g_cond_signal (queue->item_del);
1.578 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.579 +}
1.580 +
1.581 +/**
1.582 + * gst_data_queue_get_level:
1.583 + * @queue: The #GstDataQueue
1.584 + * @level: the location to store the result
1.585 + *
1.586 + * Get the current level of the queue.
1.587 + */
1.588 +void
1.589 +gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
1.590 +{
1.591 + level->visible = queue->cur_level.visible;
1.592 + level->bytes = queue->cur_level.bytes;
1.593 + level->time = queue->cur_level.time;
1.594 +}
1.595 +
1.596 +static void
1.597 +gst_data_queue_set_property (GObject * object,
1.598 + guint prop_id, const GValue * value, GParamSpec * pspec)
1.599 +{
1.600 + switch (prop_id) {
1.601 + default:
1.602 + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1.603 + break;
1.604 + }
1.605 +}
1.606 +
1.607 +static void
1.608 +gst_data_queue_get_property (GObject * object,
1.609 + guint prop_id, GValue * value, GParamSpec * pspec)
1.610 +{
1.611 + GstDataQueue *queue = GST_DATA_QUEUE (object);
1.612 +
1.613 + GST_DATA_QUEUE_MUTEX_LOCK (queue);
1.614 +
1.615 + switch (prop_id) {
1.616 + case ARG_CUR_LEVEL_BYTES:
1.617 + g_value_set_uint (value, queue->cur_level.bytes);
1.618 + break;
1.619 + case ARG_CUR_LEVEL_VISIBLE:
1.620 + g_value_set_uint (value, queue->cur_level.visible);
1.621 + break;
1.622 + case ARG_CUR_LEVEL_TIME:
1.623 + g_value_set_uint64 (value, queue->cur_level.time);
1.624 + break;
1.625 + default:
1.626 + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1.627 + break;
1.628 + }
1.629 +
1.630 + GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
1.631 +}