gst-gmyth/multiqueue/gstdataqueue.c
author renatofilho
Wed Aug 01 14:22:14 2007 +0100 (2007-08-01)
branchtrunk
changeset 789 f9cd59844f78
permissions -rw-r--r--
[svn r795] - fixed bug on transcode operation;
- some code clean;
renatofilho@787
     1
/* GStreamer
renatofilho@787
     2
 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
renatofilho@787
     3
 *
renatofilho@787
     4
 * gstdataqueue.c:
renatofilho@787
     5
 *
renatofilho@787
     6
 * This library is free software; you can redistribute it and/or
renatofilho@787
     7
 * modify it under the terms of the GNU Library General Public
renatofilho@787
     8
 * License as published by the Free Software Foundation; either
renatofilho@787
     9
 * version 2 of the License, or (at your option) any later version.
renatofilho@787
    10
 *
renatofilho@787
    11
 * This library is distributed in the hope that it will be useful,
renatofilho@787
    12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
renatofilho@787
    13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
renatofilho@787
    14
 * Library General Public License for more details.
renatofilho@787
    15
 *
renatofilho@787
    16
 * You should have received a copy of the GNU Library General Public
renatofilho@787
    17
 * License along with this library; if not, write to the
renatofilho@787
    18
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
renatofilho@787
    19
 * Boston, MA 02111-1307, USA.
renatofilho@787
    20
 */
renatofilho@787
    21
renatofilho@787
    22
/**
renatofilho@787
    23
 * SECTION:gstdataqueue
renatofilho@787
    24
 * @short_description: Threadsafe queueing object
renatofilho@787
    25
 *
renatofilho@787
    26
 * #GstDataQueue is an object that handles threadsafe queueing of objects. It
renatofilho@787
    27
 * also provides size-related functionality. This object should be used for
renatofilho@787
    28
 * any #GstElement that wishes to provide some sort of queueing functionality.
renatofilho@787
    29
 */
renatofilho@787
    30
renatofilho@787
    31
#include <gst/gst.h>
renatofilho@787
    32
#include "gstdataqueue.h"
renatofilho@787
    33
renatofilho@787
    34
GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
renatofilho@787
    35
#define GST_CAT_DEFAULT (data_queue_debug)
renatofilho@787
    36
GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
renatofilho@787
    37
renatofilho@787
    38
renatofilho@787
    39
/* Queue signals and args */
renatofilho@787
    40
enum
renatofilho@787
    41
{
renatofilho@787
    42
  SIGNAL_EMPTY,
renatofilho@787
    43
  SIGNAL_FULL,
renatofilho@787
    44
  LAST_SIGNAL
renatofilho@787
    45
};
renatofilho@787
    46
renatofilho@787
    47
enum
renatofilho@787
    48
{
renatofilho@787
    49
  ARG_0,
renatofilho@787
    50
  ARG_CUR_LEVEL_VISIBLE,
renatofilho@787
    51
  ARG_CUR_LEVEL_BYTES,
renatofilho@787
    52
  ARG_CUR_LEVEL_TIME
renatofilho@787
    53
      /* FILL ME */
renatofilho@787
    54
};
renatofilho@787
    55
renatofilho@787
    56
#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
renatofilho@787
    57
    GST_CAT_LOG (data_queue_dataflow,                                   \
renatofilho@787
    58
      "locking qlock from thread %p",                                   \
renatofilho@787
    59
      g_thread_self ());                                                \
renatofilho@787
    60
  g_mutex_lock (q->qlock);                                              \
renatofilho@787
    61
  GST_CAT_LOG (data_queue_dataflow,                                     \
renatofilho@787
    62
      "locked qlock from thread %p",                                    \
renatofilho@787
    63
      g_thread_self ());                                                \
renatofilho@787
    64
} G_STMT_END
renatofilho@787
    65
renatofilho@787
    66
#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
renatofilho@787
    67
    GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
renatofilho@787
    68
    if (q->flushing)                                                    \
renatofilho@787
    69
      goto label;                                                       \
renatofilho@787
    70
  } G_STMT_END
renatofilho@787
    71
renatofilho@787
    72
#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
renatofilho@787
    73
    GST_CAT_LOG (data_queue_dataflow,                                   \
renatofilho@787
    74
      "unlocking qlock from thread %p",                                 \
renatofilho@787
    75
      g_thread_self ());                                                \
renatofilho@787
    76
  g_mutex_unlock (q->qlock);                                            \
renatofilho@787
    77
} G_STMT_END
renatofilho@787
    78
renatofilho@787
    79
#define STATUS(q, msg)                                                  \
renatofilho@787
    80
  GST_CAT_LOG (data_queue_dataflow,                                     \
renatofilho@787
    81
               "queue:%p " msg ": %u visible items, %u "                \
renatofilho@787
    82
               "bytes, %"G_GUINT64_FORMAT                               \
renatofilho@787
    83
               " ns, %u elements",                                      \
renatofilho@787
    84
               queue,                                                   \
renatofilho@787
    85
               q->cur_level.visible,                                    \
renatofilho@787
    86
               q->cur_level.bytes,                                      \
renatofilho@787
    87
               q->cur_level.time,                                       \
renatofilho@787
    88
               q->queue->length)
renatofilho@787
    89
renatofilho@787
    90
static void gst_data_queue_base_init (GstDataQueueClass * klass);
renatofilho@787
    91
static void gst_data_queue_class_init (GstDataQueueClass * klass);
renatofilho@787
    92
static void gst_data_queue_init (GstDataQueue * queue);
renatofilho@787
    93
static void gst_data_queue_finalize (GObject * object);
renatofilho@787
    94
renatofilho@787
    95
static void gst_data_queue_set_property (GObject * object,
renatofilho@787
    96
    guint prop_id, const GValue * value, GParamSpec * pspec);
renatofilho@787
    97
static void gst_data_queue_get_property (GObject * object,
renatofilho@787
    98
    guint prop_id, GValue * value, GParamSpec * pspec);
renatofilho@787
    99
renatofilho@787
   100
static GObjectClass *parent_class = NULL;
renatofilho@787
   101
static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
renatofilho@787
   102
renatofilho@787
   103
GType
renatofilho@787
   104
gst_data_queue_get_type (void)
renatofilho@787
   105
{
renatofilho@787
   106
  static GType queue_type = 0;
renatofilho@787
   107
renatofilho@787
   108
  if (!queue_type) {
renatofilho@787
   109
    static const GTypeInfo queue_info = {
renatofilho@787
   110
      sizeof (GstDataQueueClass),
renatofilho@787
   111
      (GBaseInitFunc) gst_data_queue_base_init,
renatofilho@787
   112
      NULL,
renatofilho@787
   113
      (GClassInitFunc) gst_data_queue_class_init,
renatofilho@787
   114
      NULL,
renatofilho@787
   115
      NULL,
renatofilho@787
   116
      sizeof (GstDataQueue),
renatofilho@787
   117
      0,
renatofilho@787
   118
      (GInstanceInitFunc) gst_data_queue_init,
renatofilho@787
   119
      NULL
renatofilho@787
   120
    };
renatofilho@787
   121
renatofilho@787
   122
    queue_type = g_type_register_static (G_TYPE_OBJECT,
renatofilho@787
   123
        "GstDataQueue", &queue_info, 0);
renatofilho@787
   124
    GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0,
renatofilho@787
   125
        "data queue object");
renatofilho@787
   126
    GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0,
renatofilho@787
   127
        "dataflow inside the data queue object");
renatofilho@787
   128
  }
renatofilho@787
   129
renatofilho@787
   130
  return queue_type;
renatofilho@787
   131
}
renatofilho@787
   132
renatofilho@787
   133
static void
renatofilho@787
   134
gst_data_queue_base_init (GstDataQueueClass * klass)
renatofilho@787
   135
{
renatofilho@787
   136
  /* Do we need anything here ?? */
renatofilho@787
   137
  return;
renatofilho@787
   138
}
renatofilho@787
   139
renatofilho@787
   140
static void
renatofilho@787
   141
gst_data_queue_class_init (GstDataQueueClass * klass)
renatofilho@787
   142
{
renatofilho@787
   143
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
renatofilho@787
   144
renatofilho@787
   145
  parent_class = g_type_class_peek_parent (klass);
renatofilho@787
   146
renatofilho@787
   147
  gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property);
renatofilho@787
   148
  gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property);
renatofilho@787
   149
renatofilho@787
   150
  /* signals */
renatofilho@787
   151
  /**
renatofilho@787
   152
   * GstDataQueue::empty:
renatofilho@787
   153
   * @queue: the queue instance
renatofilho@787
   154
   *
renatofilho@787
   155
   * Reports that the queue became empty (empty).
renatofilho@787
   156
   * A queue is empty if the total amount of visible items inside it (num-visible, time,
renatofilho@787
   157
   * size) is lower than the boundary values which can be set through the GObject
renatofilho@787
   158
   * properties.
renatofilho@787
   159
   */
renatofilho@787
   160
  gst_data_queue_signals[SIGNAL_EMPTY] =
renatofilho@787
   161
      g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
renatofilho@787
   162
      G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
renatofilho@787
   163
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
renatofilho@787
   164
renatofilho@787
   165
  /**
renatofilho@787
   166
   * GstDataQueue::full:
renatofilho@787
   167
   * @queue: the queue instance
renatofilho@787
   168
   *
renatofilho@787
   169
   * Reports that the queue became full (full).
renatofilho@787
   170
   * A queue is full if the total amount of data inside it (num-visible, time,
renatofilho@787
   171
   * size) is higher than the boundary values which can be set through the GObject
renatofilho@787
   172
   * properties.
renatofilho@787
   173
   */
renatofilho@787
   174
  gst_data_queue_signals[SIGNAL_FULL] =
renatofilho@787
   175
      g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
renatofilho@787
   176
      G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
renatofilho@787
   177
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
renatofilho@787
   178
renatofilho@787
   179
  /* properties */
renatofilho@787
   180
  g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
renatofilho@787
   181
      g_param_spec_uint ("current-level-bytes", "Current level (kB)",
renatofilho@787
   182
          "Current amount of data in the queue (bytes)",
renatofilho@787
   183
          0, G_MAXUINT, 0, G_PARAM_READABLE));
renatofilho@787
   184
  g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
renatofilho@787
   185
      g_param_spec_uint ("current-level-visible",
renatofilho@787
   186
          "Current level (visible items)",
renatofilho@787
   187
          "Current number of visible items in the queue", 0, G_MAXUINT, 0,
renatofilho@787
   188
          G_PARAM_READABLE));
renatofilho@787
   189
  g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
renatofilho@787
   190
      g_param_spec_uint64 ("current-level-time", "Current level (ns)",
renatofilho@787
   191
          "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
renatofilho@787
   192
          G_PARAM_READABLE));
renatofilho@787
   193
renatofilho@787
   194
  /* set several parent class virtual functions */
renatofilho@787
   195
  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);
renatofilho@787
   196
renatofilho@787
   197
}
renatofilho@787
   198
renatofilho@787
   199
static void
renatofilho@787
   200
gst_data_queue_init (GstDataQueue * queue)
renatofilho@787
   201
{
renatofilho@787
   202
  queue->cur_level.visible = 0; /* no content */
renatofilho@787
   203
  queue->cur_level.bytes = 0;   /* no content */
renatofilho@787
   204
  queue->cur_level.time = 0;    /* no content */
renatofilho@787
   205
renatofilho@787
   206
  queue->checkfull = NULL;
renatofilho@787
   207
renatofilho@787
   208
  queue->qlock = g_mutex_new ();
renatofilho@787
   209
  queue->item_add = g_cond_new ();
renatofilho@787
   210
  queue->item_del = g_cond_new ();
renatofilho@787
   211
  queue->queue = g_queue_new ();
renatofilho@787
   212
renatofilho@787
   213
  GST_DEBUG ("initialized queue's not_empty & not_full conditions");
renatofilho@787
   214
}
renatofilho@787
   215
renatofilho@787
   216
/**
renatofilho@787
   217
 * gst_data_queue_new:
renatofilho@787
   218
 * @checkfull: the callback used to tell if the element considers the queue full
renatofilho@787
   219
 * or not.
renatofilho@787
   220
 * @checkdata: a #gpointer that will be given in the @checkfull callback.
renatofilho@787
   221
 *
renatofilho@787
   222
 * Returns: a new #GstDataQueue.
renatofilho@787
   223
 */
renatofilho@787
   224
renatofilho@787
   225
GstDataQueue *
renatofilho@787
   226
gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
renatofilho@787
   227
{
renatofilho@787
   228
  GstDataQueue *ret;
renatofilho@787
   229
renatofilho@787
   230
  g_return_val_if_fail (checkfull != NULL, NULL);
renatofilho@787
   231
renatofilho@787
   232
  ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
renatofilho@787
   233
  ret->checkfull = checkfull;
renatofilho@787
   234
  ret->checkdata = checkdata;
renatofilho@787
   235
renatofilho@787
   236
  return ret;
renatofilho@787
   237
}
renatofilho@787
   238
renatofilho@787
   239
static void
renatofilho@787
   240
gst_data_queue_cleanup (GstDataQueue * queue)
renatofilho@787
   241
{
renatofilho@787
   242
  while (!g_queue_is_empty (queue->queue)) {
renatofilho@787
   243
    GstDataQueueItem *item = g_queue_pop_head (queue->queue);
renatofilho@787
   244
renatofilho@787
   245
    /* Just call the destroy notify on the item */
renatofilho@787
   246
    item->destroy (item);
renatofilho@787
   247
  }
renatofilho@787
   248
  queue->cur_level.visible = 0;
renatofilho@787
   249
  queue->cur_level.bytes = 0;
renatofilho@787
   250
  queue->cur_level.time = 0;
renatofilho@787
   251
}
renatofilho@787
   252
renatofilho@787
   253
/* called only once, as opposed to dispose */
renatofilho@787
   254
static void
renatofilho@787
   255
gst_data_queue_finalize (GObject * object)
renatofilho@787
   256
{
renatofilho@787
   257
  GstDataQueue *queue = GST_DATA_QUEUE (object);
renatofilho@787
   258
renatofilho@787
   259
  GST_DEBUG ("finalizing queue");
renatofilho@787
   260
renatofilho@787
   261
  gst_data_queue_cleanup (queue);
renatofilho@787
   262
  g_queue_free (queue->queue);
renatofilho@787
   263
renatofilho@787
   264
  GST_DEBUG ("free mutex");
renatofilho@787
   265
  g_mutex_free (queue->qlock);
renatofilho@787
   266
  GST_DEBUG ("done free mutex");
renatofilho@787
   267
renatofilho@787
   268
  g_cond_free (queue->item_add);
renatofilho@787
   269
  g_cond_free (queue->item_del);
renatofilho@787
   270
renatofilho@787
   271
  G_OBJECT_CLASS (parent_class)->finalize (object);
renatofilho@787
   272
}
renatofilho@787
   273
renatofilho@787
   274
static void
renatofilho@787
   275
gst_data_queue_locked_flush (GstDataQueue * queue)
renatofilho@787
   276
{
renatofilho@787
   277
  STATUS (queue, "before flushing");
renatofilho@787
   278
  gst_data_queue_cleanup (queue);
renatofilho@787
   279
  STATUS (queue, "after flushing");
renatofilho@787
   280
  /* we deleted something... */
renatofilho@787
   281
  g_cond_signal (queue->item_del);
renatofilho@787
   282
}
renatofilho@787
   283
renatofilho@787
   284
static gboolean
renatofilho@787
   285
gst_data_queue_locked_is_empty (GstDataQueue * queue)
renatofilho@787
   286
{
renatofilho@787
   287
  return (queue->queue->length == 0);
renatofilho@787
   288
}
renatofilho@787
   289
renatofilho@787
   290
static gboolean
renatofilho@787
   291
gst_data_queue_locked_is_full (GstDataQueue * queue)
renatofilho@787
   292
{
renatofilho@787
   293
  return queue->checkfull (queue, queue->cur_level.visible,
renatofilho@787
   294
      queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
renatofilho@787
   295
}
renatofilho@787
   296
renatofilho@787
   297
/**
renatofilho@787
   298
 * gst_data_queue_flush:
renatofilho@787
   299
 * @queue: a #GstDataQueue.
renatofilho@787
   300
 *
renatofilho@787
   301
 * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and
renatofilho@787
   302
 * #gst_data_queue_pop will be released.
renatofilho@787
   303
 * MT safe.
renatofilho@787
   304
 */
renatofilho@787
   305
void
renatofilho@787
   306
gst_data_queue_flush (GstDataQueue * queue)
renatofilho@787
   307
{
renatofilho@787
   308
  GST_DEBUG ("queue:%p", queue);
renatofilho@787
   309
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   310
  gst_data_queue_locked_flush (queue);
renatofilho@787
   311
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   312
}
renatofilho@787
   313
renatofilho@787
   314
/**
renatofilho@787
   315
 * gst_data_queue_is_empty:
renatofilho@787
   316
 * @queue: a #GstDataQueue.
renatofilho@787
   317
 *
renatofilho@787
   318
 * Queries if there are any items in the @queue.
renatofilho@787
   319
 * MT safe.
renatofilho@787
   320
 *
renatofilho@787
   321
 * Returns: #TRUE if @queue is empty.
renatofilho@787
   322
 */
renatofilho@787
   323
gboolean
renatofilho@787
   324
gst_data_queue_is_empty (GstDataQueue * queue)
renatofilho@787
   325
{
renatofilho@787
   326
  gboolean res;
renatofilho@787
   327
renatofilho@787
   328
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   329
  res = gst_data_queue_locked_is_empty (queue);
renatofilho@787
   330
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   331
renatofilho@787
   332
  return res;
renatofilho@787
   333
}
renatofilho@787
   334
renatofilho@787
   335
/**
renatofilho@787
   336
 * gst_data_queue_is_full:
renatofilho@787
   337
 * @queue: a #GstDataQueue.
renatofilho@787
   338
 *
renatofilho@787
   339
 * Queries if @queue is full. This check will be done using the
renatofilho@787
   340
 * #GstDataQueueCheckFullCallback registered with @queue.
renatofilho@787
   341
 * MT safe.
renatofilho@787
   342
 *
renatofilho@787
   343
 * Returns: #TRUE if @queue is full.
renatofilho@787
   344
 */
renatofilho@787
   345
gboolean
renatofilho@787
   346
gst_data_queue_is_full (GstDataQueue * queue)
renatofilho@787
   347
{
renatofilho@787
   348
  gboolean res;
renatofilho@787
   349
renatofilho@787
   350
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   351
  res = gst_data_queue_locked_is_full (queue);
renatofilho@787
   352
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   353
renatofilho@787
   354
  return res;
renatofilho@787
   355
}
renatofilho@787
   356
renatofilho@787
   357
/**
renatofilho@787
   358
 * gst_data_queue_set_flushing:
renatofilho@787
   359
 * @queue: a #GstDataQueue.
renatofilho@787
   360
 * @flushing: a #gboolean stating if the queue will be flushing or not.
renatofilho@787
   361
 *
renatofilho@787
   362
 * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
renatofilho@787
   363
 * state, any incoming data on the @queue will be discarded. Any call currently
renatofilho@787
   364
 * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
renatofilho@787
   365
 * away with a return value of #FALSE. While the @queue is in flushing state, 
renatofilho@787
   366
 * all calls to those two functions will return #FALSE.
renatofilho@787
   367
 *
renatofilho@787
   368
 * MT Safe.
renatofilho@787
   369
 */
renatofilho@787
   370
void
renatofilho@787
   371
gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
renatofilho@787
   372
{
renatofilho@787
   373
  GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
renatofilho@787
   374
renatofilho@787
   375
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   376
  queue->flushing = flushing;
renatofilho@787
   377
  if (flushing) {
renatofilho@787
   378
    /* release push/pop functions */
renatofilho@787
   379
    g_cond_signal (queue->item_add);
renatofilho@787
   380
    g_cond_signal (queue->item_del);
renatofilho@787
   381
  }
renatofilho@787
   382
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   383
}
renatofilho@787
   384
renatofilho@787
   385
/**
renatofilho@787
   386
 * gst_data_queue_push:
renatofilho@787
   387
 * @queue: a #GstDataQueue.
renatofilho@787
   388
 * @item: a #GstDataQueueItem.
renatofilho@787
   389
 *
renatofilho@787
   390
 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
renatofilho@787
   391
 * on the @queue. If the @queue is full, the call will block until space is
renatofilho@787
   392
 * available, OR the @queue is set to flushing state.
renatofilho@787
   393
 * MT safe.
renatofilho@787
   394
 *
renatofilho@787
   395
 * Note that this function has slightly different semantics than gst_pad_push()
renatofilho@787
   396
 * and gst_pad_push_event(): this function only takes ownership of @item and
renatofilho@787
   397
 * the #GstMiniObject contained in @item if the push was successful. If FALSE
renatofilho@787
   398
 * is returned, the caller is responsible for freeing @item and its contents.
renatofilho@787
   399
 *
renatofilho@787
   400
 * Returns: #TRUE if the @item was successfully pushed on the @queue.
renatofilho@787
   401
 */
renatofilho@787
   402
gboolean
renatofilho@787
   403
gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
renatofilho@787
   404
{
renatofilho@787
   405
  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
renatofilho@787
   406
  g_return_val_if_fail (item != NULL, FALSE);
renatofilho@787
   407
renatofilho@787
   408
  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
renatofilho@787
   409
renatofilho@787
   410
  STATUS (queue, "before pushing");
renatofilho@787
   411
renatofilho@787
   412
  /* We ALWAYS need to check for queue fillness */
renatofilho@787
   413
  if (gst_data_queue_locked_is_full (queue)) {
renatofilho@787
   414
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   415
    g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
renatofilho@787
   416
    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
renatofilho@787
   417
renatofilho@787
   418
    /* signal might have removed some items */
renatofilho@787
   419
    while (gst_data_queue_locked_is_full (queue)) {
renatofilho@787
   420
      g_cond_wait (queue->item_del, queue->qlock);
renatofilho@787
   421
      if (queue->flushing)
renatofilho@787
   422
        goto flushing;
renatofilho@787
   423
    }
renatofilho@787
   424
  }
renatofilho@787
   425
renatofilho@787
   426
  g_queue_push_tail (queue->queue, item);
renatofilho@787
   427
renatofilho@787
   428
  if (item->visible)
renatofilho@787
   429
    queue->cur_level.visible++;
renatofilho@787
   430
  queue->cur_level.bytes += item->size;
renatofilho@787
   431
  queue->cur_level.time += item->duration;
renatofilho@787
   432
renatofilho@787
   433
  STATUS (queue, "after pushing");
renatofilho@787
   434
  g_cond_signal (queue->item_add);
renatofilho@787
   435
renatofilho@787
   436
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   437
renatofilho@787
   438
  return TRUE;
renatofilho@787
   439
renatofilho@787
   440
  /* ERRORS */
renatofilho@787
   441
flushing:
renatofilho@787
   442
  {
renatofilho@787
   443
    GST_DEBUG ("queue:%p, we are flushing", queue);
renatofilho@787
   444
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   445
    return FALSE;
renatofilho@787
   446
  }
renatofilho@787
   447
}
renatofilho@787
   448
renatofilho@787
   449
/**
renatofilho@787
   450
 * gst_data_queue_pop:
renatofilho@787
   451
 * @queue: a #GstDataQueue.
renatofilho@787
   452
 * @item: pointer to store the returned #GstDataQueueItem.
renatofilho@787
   453
 *
renatofilho@787
   454
 * Retrieves the first @item available on the @queue. If the queue is currently
renatofilho@787
   455
 * empty, the call will block until at least one item is available, OR the
renatofilho@787
   456
 * @queue is set to the flushing state.
renatofilho@787
   457
 * MT safe.
renatofilho@787
   458
 *
renatofilho@787
   459
 * Returns: #TRUE if an @item was successfully retrieved from the @queue.
renatofilho@787
   460
 */
renatofilho@787
   461
gboolean
renatofilho@787
   462
gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
renatofilho@787
   463
{
renatofilho@787
   464
  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
renatofilho@787
   465
  g_return_val_if_fail (item != NULL, FALSE);
renatofilho@787
   466
renatofilho@787
   467
  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
renatofilho@787
   468
renatofilho@787
   469
  STATUS (queue, "before popping");
renatofilho@787
   470
renatofilho@787
   471
  if (gst_data_queue_locked_is_empty (queue)) {
renatofilho@787
   472
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   473
    g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
renatofilho@787
   474
    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
renatofilho@787
   475
renatofilho@787
   476
    while (gst_data_queue_locked_is_empty (queue)) {
renatofilho@787
   477
      g_cond_wait (queue->item_add, queue->qlock);
renatofilho@787
   478
      if (queue->flushing)
renatofilho@787
   479
        goto flushing;
renatofilho@787
   480
    }
renatofilho@787
   481
  }
renatofilho@787
   482
renatofilho@787
   483
  /* Get the item from the GQueue */
renatofilho@787
   484
  *item = g_queue_pop_head (queue->queue);
renatofilho@787
   485
renatofilho@787
   486
  /* update current level counter */
renatofilho@787
   487
  if ((*item)->visible)
renatofilho@787
   488
    queue->cur_level.visible--;
renatofilho@787
   489
  queue->cur_level.bytes -= (*item)->size;
renatofilho@787
   490
  queue->cur_level.time -= (*item)->duration;
renatofilho@787
   491
renatofilho@787
   492
  STATUS (queue, "after popping");
renatofilho@787
   493
  g_cond_signal (queue->item_del);
renatofilho@787
   494
renatofilho@787
   495
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   496
renatofilho@787
   497
  return TRUE;
renatofilho@787
   498
renatofilho@787
   499
  /* ERRORS */
renatofilho@787
   500
flushing:
renatofilho@787
   501
  {
renatofilho@787
   502
    GST_DEBUG ("queue:%p, we are flushing", queue);
renatofilho@787
   503
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   504
    return FALSE;
renatofilho@787
   505
  }
renatofilho@787
   506
}
renatofilho@787
   507
renatofilho@787
   508
/**
renatofilho@787
   509
 * gst_data_queue_drop_head:
renatofilho@787
   510
 * @queue: The #GstDataQueue to drop an item from.
renatofilho@787
   511
 * @type: The #GType of the item to drop.
renatofilho@787
   512
 *
renatofilho@787
   513
 * Pop and unref the head-most #GstMiniObject with the given #GType.
renatofilho@787
   514
 *
renatofilho@787
   515
 * Returns: TRUE if an element was removed.
renatofilho@787
   516
 */
renatofilho@787
   517
gboolean
renatofilho@787
   518
gst_data_queue_drop_head (GstDataQueue * queue, GType type)
renatofilho@787
   519
{
renatofilho@787
   520
  gboolean res = FALSE;
renatofilho@787
   521
  GList *item;
renatofilho@787
   522
  GstDataQueueItem *leak = NULL;
renatofilho@787
   523
renatofilho@787
   524
  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
renatofilho@787
   525
renatofilho@787
   526
  GST_DEBUG ("queue:%p", queue);
renatofilho@787
   527
renatofilho@787
   528
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   529
  for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
renatofilho@787
   530
    GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
renatofilho@787
   531
renatofilho@787
   532
    if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
renatofilho@787
   533
      leak = tmp;
renatofilho@787
   534
      break;
renatofilho@787
   535
    }
renatofilho@787
   536
  }
renatofilho@787
   537
renatofilho@787
   538
  if (!leak)
renatofilho@787
   539
    goto done;
renatofilho@787
   540
renatofilho@787
   541
  g_queue_delete_link (queue->queue, item);
renatofilho@787
   542
renatofilho@787
   543
  if (leak->visible)
renatofilho@787
   544
    queue->cur_level.visible--;
renatofilho@787
   545
  queue->cur_level.bytes -= leak->size;
renatofilho@787
   546
  queue->cur_level.time -= leak->duration;
renatofilho@787
   547
renatofilho@787
   548
  leak->destroy (leak);
renatofilho@787
   549
renatofilho@787
   550
  res = TRUE;
renatofilho@787
   551
renatofilho@787
   552
done:
renatofilho@787
   553
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   554
renatofilho@787
   555
  GST_DEBUG ("queue:%p , res:%d", queue, res);
renatofilho@787
   556
renatofilho@787
   557
  return res;
renatofilho@787
   558
}
renatofilho@787
   559
renatofilho@787
   560
/**
renatofilho@787
   561
 * gst_data_queue_limits_changed:
renatofilho@787
   562
 * @queue: The #GstDataQueue 
renatofilho@787
   563
 *
renatofilho@787
   564
 * Inform the queue that the limits for the fullness check have changed and that
renatofilho@787
   565
 * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
renatofilho@787
   566
 */
renatofilho@787
   567
void
renatofilho@787
   568
gst_data_queue_limits_changed (GstDataQueue * queue)
renatofilho@787
   569
{
renatofilho@787
   570
  g_return_if_fail (GST_IS_DATA_QUEUE (queue));
renatofilho@787
   571
renatofilho@787
   572
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   573
  GST_DEBUG ("signal del");
renatofilho@787
   574
  g_cond_signal (queue->item_del);
renatofilho@787
   575
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   576
}
renatofilho@787
   577
renatofilho@787
   578
/**
renatofilho@787
   579
 * gst_data_queue_get_level:
renatofilho@787
   580
 * @queue: The #GstDataQueue
renatofilho@787
   581
 * @level: the location to store the result
renatofilho@787
   582
 *
renatofilho@787
   583
 * Get the current level of the queue.
renatofilho@787
   584
 */
renatofilho@787
   585
void
renatofilho@787
   586
gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
renatofilho@787
   587
{
renatofilho@787
   588
  level->visible = queue->cur_level.visible;
renatofilho@787
   589
  level->bytes = queue->cur_level.bytes;
renatofilho@787
   590
  level->time = queue->cur_level.time;
renatofilho@787
   591
}
renatofilho@787
   592
renatofilho@787
   593
static void
renatofilho@787
   594
gst_data_queue_set_property (GObject * object,
renatofilho@787
   595
    guint prop_id, const GValue * value, GParamSpec * pspec)
renatofilho@787
   596
{
renatofilho@787
   597
  switch (prop_id) {
renatofilho@787
   598
    default:
renatofilho@787
   599
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
renatofilho@787
   600
      break;
renatofilho@787
   601
  }
renatofilho@787
   602
}
renatofilho@787
   603
renatofilho@787
   604
static void
renatofilho@787
   605
gst_data_queue_get_property (GObject * object,
renatofilho@787
   606
    guint prop_id, GValue * value, GParamSpec * pspec)
renatofilho@787
   607
{
renatofilho@787
   608
  GstDataQueue *queue = GST_DATA_QUEUE (object);
renatofilho@787
   609
renatofilho@787
   610
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
renatofilho@787
   611
renatofilho@787
   612
  switch (prop_id) {
renatofilho@787
   613
    case ARG_CUR_LEVEL_BYTES:
renatofilho@787
   614
      g_value_set_uint (value, queue->cur_level.bytes);
renatofilho@787
   615
      break;
renatofilho@787
   616
    case ARG_CUR_LEVEL_VISIBLE:
renatofilho@787
   617
      g_value_set_uint (value, queue->cur_level.visible);
renatofilho@787
   618
      break;
renatofilho@787
   619
    case ARG_CUR_LEVEL_TIME:
renatofilho@787
   620
      g_value_set_uint64 (value, queue->cur_level.time);
renatofilho@787
   621
      break;
renatofilho@787
   622
    default:
renatofilho@787
   623
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
renatofilho@787
   624
      break;
renatofilho@787
   625
  }
renatofilho@787
   626
renatofilho@787
   627
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
renatofilho@787
   628
}