gst-gmyth/multiqueue/gstdataqueue.c
author melunko
Thu Aug 23 22:30:57 2007 +0100 (2007-08-23)
branchtrunk
changeset 825 32ec09590cf2
permissions -rw-r--r--
[svn r831] fixed bug on transcode file
     1 /* GStreamer
     2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
     3  *
     4  * gstdataqueue.c:
     5  *
     6  * This library is free software; you can redistribute it and/or
     7  * modify it under the terms of the GNU Library General Public
     8  * License as published by the Free Software Foundation; either
     9  * version 2 of the License, or (at your option) any later version.
    10  *
    11  * This library is distributed in the hope that it will be useful,
    12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
    13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    14  * Library General Public License for more details.
    15  *
    16  * You should have received a copy of the GNU Library General Public
    17  * License along with this library; if not, write to the
    18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
    19  * Boston, MA 02111-1307, USA.
    20  */
    21 
    22 /**
    23  * SECTION:gstdataqueue
    24  * @short_description: Threadsafe queueing object
    25  *
    26  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
    27  * also provides size-related functionality. This object should be used for
    28  * any #GstElement that wishes to provide some sort of queueing functionality.
    29  */
    30 
    31 #include <gst/gst.h>
    32 #include "gstdataqueue.h"
    33 
    34 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
    35 #define GST_CAT_DEFAULT (data_queue_debug)
    36 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
    37 
    38 
    39 /* Queue signals and args */
    40 enum
    41 {
    42   SIGNAL_EMPTY,
    43   SIGNAL_FULL,
    44   LAST_SIGNAL
    45 };
    46 
    47 enum
    48 {
    49   ARG_0,
    50   ARG_CUR_LEVEL_VISIBLE,
    51   ARG_CUR_LEVEL_BYTES,
    52   ARG_CUR_LEVEL_TIME
    53       /* FILL ME */
    54 };
    55 
    56 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
    57     GST_CAT_LOG (data_queue_dataflow,                                   \
    58       "locking qlock from thread %p",                                   \
    59       g_thread_self ());                                                \
    60   g_mutex_lock (q->qlock);                                              \
    61   GST_CAT_LOG (data_queue_dataflow,                                     \
    62       "locked qlock from thread %p",                                    \
    63       g_thread_self ());                                                \
    64 } G_STMT_END
    65 
    66 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
    67     GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
    68     if (q->flushing)                                                    \
    69       goto label;                                                       \
    70   } G_STMT_END
    71 
    72 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
    73     GST_CAT_LOG (data_queue_dataflow,                                   \
    74       "unlocking qlock from thread %p",                                 \
    75       g_thread_self ());                                                \
    76   g_mutex_unlock (q->qlock);                                            \
    77 } G_STMT_END
    78 
    79 #define STATUS(q, msg)                                                  \
    80   GST_CAT_LOG (data_queue_dataflow,                                     \
    81                "queue:%p " msg ": %u visible items, %u "                \
    82                "bytes, %"G_GUINT64_FORMAT                               \
    83                " ns, %u elements",                                      \
    84                queue,                                                   \
    85                q->cur_level.visible,                                    \
    86                q->cur_level.bytes,                                      \
    87                q->cur_level.time,                                       \
    88                q->queue->length)
    89 
    90 static void gst_data_queue_base_init (GstDataQueueClass * klass);
    91 static void gst_data_queue_class_init (GstDataQueueClass * klass);
    92 static void gst_data_queue_init (GstDataQueue * queue);
    93 static void gst_data_queue_finalize (GObject * object);
    94 
    95 static void gst_data_queue_set_property (GObject * object,
    96     guint prop_id, const GValue * value, GParamSpec * pspec);
    97 static void gst_data_queue_get_property (GObject * object,
    98     guint prop_id, GValue * value, GParamSpec * pspec);
    99 
   100 static GObjectClass *parent_class = NULL;
   101 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
   102 
   103 GType
   104 gst_data_queue_get_type (void)
   105 {
   106   static GType queue_type = 0;
   107 
   108   if (!queue_type) {
   109     static const GTypeInfo queue_info = {
   110       sizeof (GstDataQueueClass),
   111       (GBaseInitFunc) gst_data_queue_base_init,
   112       NULL,
   113       (GClassInitFunc) gst_data_queue_class_init,
   114       NULL,
   115       NULL,
   116       sizeof (GstDataQueue),
   117       0,
   118       (GInstanceInitFunc) gst_data_queue_init,
   119       NULL
   120     };
   121 
   122     queue_type = g_type_register_static (G_TYPE_OBJECT,
   123         "GstDataQueue", &queue_info, 0);
   124     GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0,
   125         "data queue object");
   126     GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0,
   127         "dataflow inside the data queue object");
   128   }
   129 
   130   return queue_type;
   131 }
   132 
   133 static void
   134 gst_data_queue_base_init (GstDataQueueClass * klass)
   135 {
   136   /* Do we need anything here ?? */
   137   return;
   138 }
   139 
   140 static void
   141 gst_data_queue_class_init (GstDataQueueClass * klass)
   142 {
   143   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   144 
   145   parent_class = g_type_class_peek_parent (klass);
   146 
   147   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property);
   148   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property);
   149 
   150   /* signals */
   151   /**
   152    * GstDataQueue::empty:
   153    * @queue: the queue instance
   154    *
   155    * Reports that the queue became empty (empty).
   156    * A queue is empty if the total amount of visible items inside it (num-visible, time,
   157    * size) is lower than the boundary values which can be set through the GObject
   158    * properties.
   159    */
   160   gst_data_queue_signals[SIGNAL_EMPTY] =
   161       g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   162       G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
   163       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   164 
   165   /**
   166    * GstDataQueue::full:
   167    * @queue: the queue instance
   168    *
   169    * Reports that the queue became full (full).
   170    * A queue is full if the total amount of data inside it (num-visible, time,
   171    * size) is higher than the boundary values which can be set through the GObject
   172    * properties.
   173    */
   174   gst_data_queue_signals[SIGNAL_FULL] =
   175       g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   176       G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
   177       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   178 
   179   /* properties */
   180   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
   181       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
   182           "Current amount of data in the queue (bytes)",
   183           0, G_MAXUINT, 0, G_PARAM_READABLE));
   184   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
   185       g_param_spec_uint ("current-level-visible",
   186           "Current level (visible items)",
   187           "Current number of visible items in the queue", 0, G_MAXUINT, 0,
   188           G_PARAM_READABLE));
   189   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
   190       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
   191           "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
   192           G_PARAM_READABLE));
   193 
   194   /* set several parent class virtual functions */
   195   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);
   196 
   197 }
   198 
   199 static void
   200 gst_data_queue_init (GstDataQueue * queue)
   201 {
   202   queue->cur_level.visible = 0; /* no content */
   203   queue->cur_level.bytes = 0;   /* no content */
   204   queue->cur_level.time = 0;    /* no content */
   205 
   206   queue->checkfull = NULL;
   207 
   208   queue->qlock = g_mutex_new ();
   209   queue->item_add = g_cond_new ();
   210   queue->item_del = g_cond_new ();
   211   queue->queue = g_queue_new ();
   212 
   213   GST_DEBUG ("initialized queue's not_empty & not_full conditions");
   214 }
   215 
   216 /**
   217  * gst_data_queue_new:
   218  * @checkfull: the callback used to tell if the element considers the queue full
   219  * or not.
   220  * @checkdata: a #gpointer that will be given in the @checkfull callback.
   221  *
   222  * Returns: a new #GstDataQueue.
   223  */
   224 
   225 GstDataQueue *
   226 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
   227 {
   228   GstDataQueue *ret;
   229 
   230   g_return_val_if_fail (checkfull != NULL, NULL);
   231 
   232   ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
   233   ret->checkfull = checkfull;
   234   ret->checkdata = checkdata;
   235 
   236   return ret;
   237 }
   238 
   239 static void
   240 gst_data_queue_cleanup (GstDataQueue * queue)
   241 {
   242   while (!g_queue_is_empty (queue->queue)) {
   243     GstDataQueueItem *item = g_queue_pop_head (queue->queue);
   244 
   245     /* Just call the destroy notify on the item */
   246     item->destroy (item);
   247   }
   248   queue->cur_level.visible = 0;
   249   queue->cur_level.bytes = 0;
   250   queue->cur_level.time = 0;
   251 }
   252 
   253 /* called only once, as opposed to dispose */
   254 static void
   255 gst_data_queue_finalize (GObject * object)
   256 {
   257   GstDataQueue *queue = GST_DATA_QUEUE (object);
   258 
   259   GST_DEBUG ("finalizing queue");
   260 
   261   gst_data_queue_cleanup (queue);
   262   g_queue_free (queue->queue);
   263 
   264   GST_DEBUG ("free mutex");
   265   g_mutex_free (queue->qlock);
   266   GST_DEBUG ("done free mutex");
   267 
   268   g_cond_free (queue->item_add);
   269   g_cond_free (queue->item_del);
   270 
   271   G_OBJECT_CLASS (parent_class)->finalize (object);
   272 }
   273 
   274 static void
   275 gst_data_queue_locked_flush (GstDataQueue * queue)
   276 {
   277   STATUS (queue, "before flushing");
   278   gst_data_queue_cleanup (queue);
   279   STATUS (queue, "after flushing");
   280   /* we deleted something... */
   281   g_cond_signal (queue->item_del);
   282 }
   283 
   284 static gboolean
   285 gst_data_queue_locked_is_empty (GstDataQueue * queue)
   286 {
   287   return (queue->queue->length == 0);
   288 }
   289 
   290 static gboolean
   291 gst_data_queue_locked_is_full (GstDataQueue * queue)
   292 {
   293   return queue->checkfull (queue, queue->cur_level.visible,
   294       queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
   295 }
   296 
   297 /**
   298  * gst_data_queue_flush:
   299  * @queue: a #GstDataQueue.
   300  *
   301  * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and
   302  * #gst_data_queue_pop will be released.
   303  * MT safe.
   304  */
   305 void
   306 gst_data_queue_flush (GstDataQueue * queue)
   307 {
   308   GST_DEBUG ("queue:%p", queue);
   309   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   310   gst_data_queue_locked_flush (queue);
   311   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   312 }
   313 
   314 /**
   315  * gst_data_queue_is_empty:
   316  * @queue: a #GstDataQueue.
   317  *
   318  * Queries if there are any items in the @queue.
   319  * MT safe.
   320  *
   321  * Returns: #TRUE if @queue is empty.
   322  */
   323 gboolean
   324 gst_data_queue_is_empty (GstDataQueue * queue)
   325 {
   326   gboolean res;
   327 
   328   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   329   res = gst_data_queue_locked_is_empty (queue);
   330   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   331 
   332   return res;
   333 }
   334 
   335 /**
   336  * gst_data_queue_is_full:
   337  * @queue: a #GstDataQueue.
   338  *
   339  * Queries if @queue is full. This check will be done using the
   340  * #GstDataQueueCheckFullCallback registered with @queue.
   341  * MT safe.
   342  *
   343  * Returns: #TRUE if @queue is full.
   344  */
   345 gboolean
   346 gst_data_queue_is_full (GstDataQueue * queue)
   347 {
   348   gboolean res;
   349 
   350   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   351   res = gst_data_queue_locked_is_full (queue);
   352   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   353 
   354   return res;
   355 }
   356 
   357 /**
   358  * gst_data_queue_set_flushing:
   359  * @queue: a #GstDataQueue.
   360  * @flushing: a #gboolean stating if the queue will be flushing or not.
   361  *
   362  * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
   363  * state, any incoming data on the @queue will be discarded. Any call currently
   364  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
   365  * away with a return value of #FALSE. While the @queue is in flushing state, 
   366  * all calls to those two functions will return #FALSE.
   367  *
   368  * MT Safe.
   369  */
   370 void
   371 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
   372 {
   373   GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
   374 
   375   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   376   queue->flushing = flushing;
   377   if (flushing) {
   378     /* release push/pop functions */
   379     g_cond_signal (queue->item_add);
   380     g_cond_signal (queue->item_del);
   381   }
   382   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   383 }
   384 
   385 /**
   386  * gst_data_queue_push:
   387  * @queue: a #GstDataQueue.
   388  * @item: a #GstDataQueueItem.
   389  *
   390  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
   391  * on the @queue. If the @queue is full, the call will block until space is
   392  * available, OR the @queue is set to flushing state.
   393  * MT safe.
   394  *
   395  * Note that this function has slightly different semantics than gst_pad_push()
   396  * and gst_pad_push_event(): this function only takes ownership of @item and
   397  * the #GstMiniObject contained in @item if the push was successful. If FALSE
   398  * is returned, the caller is responsible for freeing @item and its contents.
   399  *
   400  * Returns: #TRUE if the @item was successfully pushed on the @queue.
   401  */
   402 gboolean
   403 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
   404 {
   405   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
   406   g_return_val_if_fail (item != NULL, FALSE);
   407 
   408   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   409 
   410   STATUS (queue, "before pushing");
   411 
   412   /* We ALWAYS need to check for queue fillness */
   413   if (gst_data_queue_locked_is_full (queue)) {
   414     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   415     g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
   416     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   417 
   418     /* signal might have removed some items */
   419     while (gst_data_queue_locked_is_full (queue)) {
   420       g_cond_wait (queue->item_del, queue->qlock);
   421       if (queue->flushing)
   422         goto flushing;
   423     }
   424   }
   425 
   426   g_queue_push_tail (queue->queue, item);
   427 
   428   if (item->visible)
   429     queue->cur_level.visible++;
   430   queue->cur_level.bytes += item->size;
   431   queue->cur_level.time += item->duration;
   432 
   433   STATUS (queue, "after pushing");
   434   g_cond_signal (queue->item_add);
   435 
   436   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   437 
   438   return TRUE;
   439 
   440   /* ERRORS */
   441 flushing:
   442   {
   443     GST_DEBUG ("queue:%p, we are flushing", queue);
   444     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   445     return FALSE;
   446   }
   447 }
   448 
   449 /**
   450  * gst_data_queue_pop:
   451  * @queue: a #GstDataQueue.
   452  * @item: pointer to store the returned #GstDataQueueItem.
   453  *
   454  * Retrieves the first @item available on the @queue. If the queue is currently
   455  * empty, the call will block until at least one item is available, OR the
   456  * @queue is set to the flushing state.
   457  * MT safe.
   458  *
   459  * Returns: #TRUE if an @item was successfully retrieved from the @queue.
   460  */
   461 gboolean
   462 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
   463 {
   464   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
   465   g_return_val_if_fail (item != NULL, FALSE);
   466 
   467   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   468 
   469   STATUS (queue, "before popping");
   470 
   471   if (gst_data_queue_locked_is_empty (queue)) {
   472     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   473     g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
   474     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
   475 
   476     while (gst_data_queue_locked_is_empty (queue)) {
   477       g_cond_wait (queue->item_add, queue->qlock);
   478       if (queue->flushing)
   479         goto flushing;
   480     }
   481   }
   482 
   483   /* Get the item from the GQueue */
   484   *item = g_queue_pop_head (queue->queue);
   485 
   486   /* update current level counter */
   487   if ((*item)->visible)
   488     queue->cur_level.visible--;
   489   queue->cur_level.bytes -= (*item)->size;
   490   queue->cur_level.time -= (*item)->duration;
   491 
   492   STATUS (queue, "after popping");
   493   g_cond_signal (queue->item_del);
   494 
   495   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   496 
   497   return TRUE;
   498 
   499   /* ERRORS */
   500 flushing:
   501   {
   502     GST_DEBUG ("queue:%p, we are flushing", queue);
   503     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   504     return FALSE;
   505   }
   506 }
   507 
   508 /**
   509  * gst_data_queue_drop_head:
   510  * @queue: The #GstDataQueue to drop an item from.
   511  * @type: The #GType of the item to drop.
   512  *
   513  * Pop and unref the head-most #GstMiniObject with the given #GType.
   514  *
   515  * Returns: TRUE if an element was removed.
   516  */
   517 gboolean
   518 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
   519 {
   520   gboolean res = FALSE;
   521   GList *item;
   522   GstDataQueueItem *leak = NULL;
   523 
   524   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
   525 
   526   GST_DEBUG ("queue:%p", queue);
   527 
   528   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   529   for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
   530     GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
   531 
   532     if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
   533       leak = tmp;
   534       break;
   535     }
   536   }
   537 
   538   if (!leak)
   539     goto done;
   540 
   541   g_queue_delete_link (queue->queue, item);
   542 
   543   if (leak->visible)
   544     queue->cur_level.visible--;
   545   queue->cur_level.bytes -= leak->size;
   546   queue->cur_level.time -= leak->duration;
   547 
   548   leak->destroy (leak);
   549 
   550   res = TRUE;
   551 
   552 done:
   553   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   554 
   555   GST_DEBUG ("queue:%p , res:%d", queue, res);
   556 
   557   return res;
   558 }
   559 
   560 /**
   561  * gst_data_queue_limits_changed:
   562  * @queue: The #GstDataQueue 
   563  *
   564  * Inform the queue that the limits for the fullness check have changed and that
   565  * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
   566  */
   567 void
   568 gst_data_queue_limits_changed (GstDataQueue * queue)
   569 {
   570   g_return_if_fail (GST_IS_DATA_QUEUE (queue));
   571 
   572   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   573   GST_DEBUG ("signal del");
   574   g_cond_signal (queue->item_del);
   575   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   576 }
   577 
   578 /**
   579  * gst_data_queue_get_level:
   580  * @queue: The #GstDataQueue
   581  * @level: the location to store the result
   582  *
   583  * Get the current level of the queue.
   584  */
   585 void
   586 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
   587 {
   588   level->visible = queue->cur_level.visible;
   589   level->bytes = queue->cur_level.bytes;
   590   level->time = queue->cur_level.time;
   591 }
   592 
   593 static void
   594 gst_data_queue_set_property (GObject * object,
   595     guint prop_id, const GValue * value, GParamSpec * pspec)
   596 {
   597   switch (prop_id) {
   598     default:
   599       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   600       break;
   601   }
   602 }
   603 
   604 static void
   605 gst_data_queue_get_property (GObject * object,
   606     guint prop_id, GValue * value, GParamSpec * pspec)
   607 {
   608   GstDataQueue *queue = GST_DATA_QUEUE (object);
   609 
   610   GST_DATA_QUEUE_MUTEX_LOCK (queue);
   611 
   612   switch (prop_id) {
   613     case ARG_CUR_LEVEL_BYTES:
   614       g_value_set_uint (value, queue->cur_level.bytes);
   615       break;
   616     case ARG_CUR_LEVEL_VISIBLE:
   617       g_value_set_uint (value, queue->cur_level.visible);
   618       break;
   619     case ARG_CUR_LEVEL_TIME:
   620       g_value_set_uint64 (value, queue->cur_level.time);
   621       break;
   622     default:
   623       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   624       break;
   625   }
   626 
   627   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
   628 }