2 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
23 * SECTION:gstdataqueue
24 * @short_description: Threadsafe queueing object
26 * #GstDataQueue is an object that handles threadsafe queueing of objects. It
27 * also provides size-related functionality. This object should be used for
28 * any #GstElement that wishes to provide some sort of queueing functionality.
32 #include "gstdataqueue.h"
34 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
35 #define GST_CAT_DEFAULT (data_queue_debug)
36 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
39 /* Queue signals and args */
50 ARG_CUR_LEVEL_VISIBLE,
56 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
57 GST_CAT_LOG (data_queue_dataflow, \
58 "locking qlock from thread %p", \
60 g_mutex_lock (q->qlock); \
61 GST_CAT_LOG (data_queue_dataflow, \
62 "locked qlock from thread %p", \
66 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \
67 GST_DATA_QUEUE_MUTEX_LOCK (q); \
72 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
73 GST_CAT_LOG (data_queue_dataflow, \
74 "unlocking qlock from thread %p", \
76 g_mutex_unlock (q->qlock); \
79 #define STATUS(q, msg) \
80 GST_CAT_LOG (data_queue_dataflow, \
81 "queue:%p " msg ": %u visible items, %u " \
82 "bytes, %"G_GUINT64_FORMAT \
85 q->cur_level.visible, \
90 static void gst_data_queue_base_init (GstDataQueueClass * klass);
91 static void gst_data_queue_class_init (GstDataQueueClass * klass);
92 static void gst_data_queue_init (GstDataQueue * queue);
93 static void gst_data_queue_finalize (GObject * object);
95 static void gst_data_queue_set_property (GObject * object,
96 guint prop_id, const GValue * value, GParamSpec * pspec);
97 static void gst_data_queue_get_property (GObject * object,
98 guint prop_id, GValue * value, GParamSpec * pspec);
100 static GObjectClass *parent_class = NULL;
101 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
104 gst_data_queue_get_type (void)
106 static GType queue_type = 0;
109 static const GTypeInfo queue_info = {
110 sizeof (GstDataQueueClass),
111 (GBaseInitFunc) gst_data_queue_base_init,
113 (GClassInitFunc) gst_data_queue_class_init,
116 sizeof (GstDataQueue),
118 (GInstanceInitFunc) gst_data_queue_init,
122 queue_type = g_type_register_static (G_TYPE_OBJECT,
123 "GstDataQueue", &queue_info, 0);
124 GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0,
125 "data queue object");
126 GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0,
127 "dataflow inside the data queue object");
134 gst_data_queue_base_init (GstDataQueueClass * klass)
136 /* Do we need anything here ?? */
141 gst_data_queue_class_init (GstDataQueueClass * klass)
143 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
145 parent_class = g_type_class_peek_parent (klass);
147 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property);
148 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property);
152 * GstDataQueue::empty:
153 * @queue: the queue instance
155 * Reports that the queue became empty (empty).
156 * A queue is empty if the total amount of visible items inside it (num-visible, time,
157 * size) is lower than the boundary values which can be set through the GObject
160 gst_data_queue_signals[SIGNAL_EMPTY] =
161 g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
162 G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
163 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
166 * GstDataQueue::full:
167 * @queue: the queue instance
169 * Reports that the queue became full (full).
170 * A queue is full if the total amount of data inside it (num-visible, time,
171 * size) is higher than the boundary values which can be set through the GObject
174 gst_data_queue_signals[SIGNAL_FULL] =
175 g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
176 G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
177 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
180 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
181 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
182 "Current amount of data in the queue (bytes)",
183 0, G_MAXUINT, 0, G_PARAM_READABLE));
184 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
185 g_param_spec_uint ("current-level-visible",
186 "Current level (visible items)",
187 "Current number of visible items in the queue", 0, G_MAXUINT, 0,
189 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
190 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
191 "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
194 /* set several parent class virtual functions */
195 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);
200 gst_data_queue_init (GstDataQueue * queue)
202 queue->cur_level.visible = 0; /* no content */
203 queue->cur_level.bytes = 0; /* no content */
204 queue->cur_level.time = 0; /* no content */
206 queue->checkfull = NULL;
208 queue->qlock = g_mutex_new ();
209 queue->item_add = g_cond_new ();
210 queue->item_del = g_cond_new ();
211 queue->queue = g_queue_new ();
213 GST_DEBUG ("initialized queue's not_empty & not_full conditions");
217 * gst_data_queue_new:
218 * @checkfull: the callback used to tell if the element considers the queue full
220 * @checkdata: a #gpointer that will be given in the @checkfull callback.
222 * Returns: a new #GstDataQueue.
226 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
230 g_return_val_if_fail (checkfull != NULL, NULL);
232 ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
233 ret->checkfull = checkfull;
234 ret->checkdata = checkdata;
240 gst_data_queue_cleanup (GstDataQueue * queue)
242 while (!g_queue_is_empty (queue->queue)) {
243 GstDataQueueItem *item = g_queue_pop_head (queue->queue);
245 /* Just call the destroy notify on the item */
246 item->destroy (item);
248 queue->cur_level.visible = 0;
249 queue->cur_level.bytes = 0;
250 queue->cur_level.time = 0;
253 /* called only once, as opposed to dispose */
255 gst_data_queue_finalize (GObject * object)
257 GstDataQueue *queue = GST_DATA_QUEUE (object);
259 GST_DEBUG ("finalizing queue");
261 gst_data_queue_cleanup (queue);
262 g_queue_free (queue->queue);
264 GST_DEBUG ("free mutex");
265 g_mutex_free (queue->qlock);
266 GST_DEBUG ("done free mutex");
268 g_cond_free (queue->item_add);
269 g_cond_free (queue->item_del);
271 G_OBJECT_CLASS (parent_class)->finalize (object);
275 gst_data_queue_locked_flush (GstDataQueue * queue)
277 STATUS (queue, "before flushing");
278 gst_data_queue_cleanup (queue);
279 STATUS (queue, "after flushing");
280 /* we deleted something... */
281 g_cond_signal (queue->item_del);
285 gst_data_queue_locked_is_empty (GstDataQueue * queue)
287 return (queue->queue->length == 0);
291 gst_data_queue_locked_is_full (GstDataQueue * queue)
293 return queue->checkfull (queue, queue->cur_level.visible,
294 queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
298 * gst_data_queue_flush:
299 * @queue: a #GstDataQueue.
301 * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and
302 * #gst_data_queue_pop will be released.
306 gst_data_queue_flush (GstDataQueue * queue)
308 GST_DEBUG ("queue:%p", queue);
309 GST_DATA_QUEUE_MUTEX_LOCK (queue);
310 gst_data_queue_locked_flush (queue);
311 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
315 * gst_data_queue_is_empty:
316 * @queue: a #GstDataQueue.
318 * Queries if there are any items in the @queue.
321 * Returns: #TRUE if @queue is empty.
324 gst_data_queue_is_empty (GstDataQueue * queue)
328 GST_DATA_QUEUE_MUTEX_LOCK (queue);
329 res = gst_data_queue_locked_is_empty (queue);
330 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
336 * gst_data_queue_is_full:
337 * @queue: a #GstDataQueue.
339 * Queries if @queue is full. This check will be done using the
340 * #GstDataQueueCheckFullCallback registered with @queue.
343 * Returns: #TRUE if @queue is full.
346 gst_data_queue_is_full (GstDataQueue * queue)
350 GST_DATA_QUEUE_MUTEX_LOCK (queue);
351 res = gst_data_queue_locked_is_full (queue);
352 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
358 * gst_data_queue_set_flushing:
359 * @queue: a #GstDataQueue.
360 * @flushing: a #gboolean stating if the queue will be flushing or not.
362 * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
363 * state, any incoming data on the @queue will be discarded. Any call currently
364 * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
365 * away with a return value of #FALSE. While the @queue is in flushing state,
366 * all calls to those two functions will return #FALSE.
371 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
373 GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
375 GST_DATA_QUEUE_MUTEX_LOCK (queue);
376 queue->flushing = flushing;
378 /* release push/pop functions */
379 g_cond_signal (queue->item_add);
380 g_cond_signal (queue->item_del);
382 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
386 * gst_data_queue_push:
387 * @queue: a #GstDataQueue.
388 * @item: a #GstDataQueueItem.
390 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
391 * on the @queue. If the @queue is full, the call will block until space is
392 * available, OR the @queue is set to flushing state.
395 * Note that this function has slightly different semantics than gst_pad_push()
396 * and gst_pad_push_event(): this function only takes ownership of @item and
397 * the #GstMiniObject contained in @item if the push was successful. If FALSE
398 * is returned, the caller is responsible for freeing @item and its contents.
400 * Returns: #TRUE if the @item was successfully pushed on the @queue.
403 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
405 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
406 g_return_val_if_fail (item != NULL, FALSE);
408 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
410 STATUS (queue, "before pushing");
412 /* We ALWAYS need to check for queue fillness */
413 if (gst_data_queue_locked_is_full (queue)) {
414 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
415 g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
416 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
418 /* signal might have removed some items */
419 while (gst_data_queue_locked_is_full (queue)) {
420 g_cond_wait (queue->item_del, queue->qlock);
426 g_queue_push_tail (queue->queue, item);
429 queue->cur_level.visible++;
430 queue->cur_level.bytes += item->size;
431 queue->cur_level.time += item->duration;
433 STATUS (queue, "after pushing");
434 g_cond_signal (queue->item_add);
436 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
443 GST_DEBUG ("queue:%p, we are flushing", queue);
444 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
450 * gst_data_queue_pop:
451 * @queue: a #GstDataQueue.
452 * @item: pointer to store the returned #GstDataQueueItem.
454 * Retrieves the first @item available on the @queue. If the queue is currently
455 * empty, the call will block until at least one item is available, OR the
456 * @queue is set to the flushing state.
459 * Returns: #TRUE if an @item was successfully retrieved from the @queue.
462 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
464 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
465 g_return_val_if_fail (item != NULL, FALSE);
467 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
469 STATUS (queue, "before popping");
471 if (gst_data_queue_locked_is_empty (queue)) {
472 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
473 g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
474 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
476 while (gst_data_queue_locked_is_empty (queue)) {
477 g_cond_wait (queue->item_add, queue->qlock);
483 /* Get the item from the GQueue */
484 *item = g_queue_pop_head (queue->queue);
486 /* update current level counter */
487 if ((*item)->visible)
488 queue->cur_level.visible--;
489 queue->cur_level.bytes -= (*item)->size;
490 queue->cur_level.time -= (*item)->duration;
492 STATUS (queue, "after popping");
493 g_cond_signal (queue->item_del);
495 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
502 GST_DEBUG ("queue:%p, we are flushing", queue);
503 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
509 * gst_data_queue_drop_head:
510 * @queue: The #GstDataQueue to drop an item from.
511 * @type: The #GType of the item to drop.
513 * Pop and unref the head-most #GstMiniObject with the given #GType.
515 * Returns: TRUE if an element was removed.
518 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
520 gboolean res = FALSE;
522 GstDataQueueItem *leak = NULL;
524 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
526 GST_DEBUG ("queue:%p", queue);
528 GST_DATA_QUEUE_MUTEX_LOCK (queue);
529 for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
530 GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
532 if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
541 g_queue_delete_link (queue->queue, item);
544 queue->cur_level.visible--;
545 queue->cur_level.bytes -= leak->size;
546 queue->cur_level.time -= leak->duration;
548 leak->destroy (leak);
553 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
555 GST_DEBUG ("queue:%p , res:%d", queue, res);
561 * gst_data_queue_limits_changed:
562 * @queue: The #GstDataQueue
564 * Inform the queue that the limits for the fullness check have changed and that
565 * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
568 gst_data_queue_limits_changed (GstDataQueue * queue)
570 g_return_if_fail (GST_IS_DATA_QUEUE (queue));
572 GST_DATA_QUEUE_MUTEX_LOCK (queue);
573 GST_DEBUG ("signal del");
574 g_cond_signal (queue->item_del);
575 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
579 * gst_data_queue_get_level:
580 * @queue: The #GstDataQueue
581 * @level: the location to store the result
583 * Get the current level of the queue.
586 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
588 level->visible = queue->cur_level.visible;
589 level->bytes = queue->cur_level.bytes;
590 level->time = queue->cur_level.time;
594 gst_data_queue_set_property (GObject * object,
595 guint prop_id, const GValue * value, GParamSpec * pspec)
599 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
605 gst_data_queue_get_property (GObject * object,
606 guint prop_id, GValue * value, GParamSpec * pspec)
608 GstDataQueue *queue = GST_DATA_QUEUE (object);
610 GST_DATA_QUEUE_MUTEX_LOCK (queue);
613 case ARG_CUR_LEVEL_BYTES:
614 g_value_set_uint (value, queue->cur_level.bytes);
616 case ARG_CUR_LEVEL_VISIBLE:
617 g_value_set_uint (value, queue->cur_level.visible);
619 case ARG_CUR_LEVEL_TIME:
620 g_value_set_uint64 (value, queue->cur_level.time);
623 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
627 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);