gst-gmyth/multiqueue/gstmultiqueue.c
author melunko
Thu Aug 23 20:24:11 2007 +0100 (2007-08-23)
branchtrunk
changeset 824 e711a64ba03d
permissions -rw-r--r--
[svn r830] fixed config file to expand dir
renatofilho@787
     1
/* GStreamer
renatofilho@787
     2
 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
renatofilho@787
     3
 * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
renatofilho@787
     4
 * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
renatofilho@787
     5
 *
renatofilho@787
     6
 * gstmultiqueue.c:
renatofilho@787
     7
 *
renatofilho@787
     8
 * This library is free software; you can redistribute it and/or
renatofilho@787
     9
 * modify it under the terms of the GNU Library General Public
renatofilho@787
    10
 * License as published by the Free Software Foundation; either
renatofilho@787
    11
 * version 2 of the License, or (at your option) any later version.
renatofilho@787
    12
 *
renatofilho@787
    13
 * This library is distributed in the hope that it will be useful,
renatofilho@787
    14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
renatofilho@787
    15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
renatofilho@787
    16
 * Library General Public License for more details.
renatofilho@787
    17
 *
renatofilho@787
    18
 * You should have received a copy of the GNU Library General Public
renatofilho@787
    19
 * License along with this library; if not, write to the
renatofilho@787
    20
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
renatofilho@787
    21
 * Boston, MA 02111-1307, USA.
renatofilho@787
    22
 */
renatofilho@787
    23
renatofilho@787
    24
#ifdef HAVE_CONFIG_H
renatofilho@787
    25
#  include "config.h"
renatofilho@787
    26
#endif
renatofilho@787
    27
renatofilho@787
    28
#include <gst/gst.h>
renatofilho@787
    29
#include "gstmultiqueue.h"
renatofilho@787
    30
renatofilho@787
    31
/**
renatofilho@787
    32
 * GstSingleQueue:
renatofilho@787
    33
 * @sinkpad: associated sink #GstPad
renatofilho@787
    34
 * @srcpad: associated source #GstPad
renatofilho@787
    35
 *
renatofilho@787
    36
 * Structure containing all information and properties about
renatofilho@787
    37
 * a single queue.
renatofilho@787
    38
 */
renatofilho@787
    39
typedef struct _GstSingleQueue GstSingleQueue;
renatofilho@787
    40
renatofilho@787
    41
struct _GstSingleQueue
renatofilho@787
    42
{
renatofilho@787
    43
  /* unique identifier of the queue */
renatofilho@787
    44
  guint id;
renatofilho@787
    45
renatofilho@787
    46
  GstMultiQueue *mqueue;
renatofilho@787
    47
renatofilho@787
    48
  GstPad *sinkpad;
renatofilho@787
    49
  GstPad *srcpad;
renatofilho@787
    50
renatofilho@787
    51
  /* flowreturn of previous srcpad push */
renatofilho@787
    52
  GstFlowReturn srcresult;
renatofilho@787
    53
  GstSegment sink_segment;
renatofilho@787
    54
  GstSegment src_segment;
renatofilho@787
    55
renatofilho@787
    56
  /* queue of data */
renatofilho@787
    57
  GstDataQueue *queue;
renatofilho@787
    58
  GstDataQueueSize max_size, extra_size;
renatofilho@787
    59
  GstClockTime cur_time;
renatofilho@787
    60
  gboolean is_eos;
renatofilho@787
    61
  gboolean inextra;             /* TRUE if the queue is currently in extradata mode */
renatofilho@787
    62
renatofilho@787
    63
  /* Protected by global lock */
renatofilho@787
    64
  guint32 nextid;               /* ID of the next object waiting to be pushed */
renatofilho@787
    65
  guint32 oldid;                /* ID of the last object pushed (last in a series) */
renatofilho@787
    66
  GCond *turn;                  /* SingleQueue turn waiting conditional */
renatofilho@787
    67
};
renatofilho@787
    68
renatofilho@787
    69
renatofilho@787
    70
/* Extension of GstDataQueueItem structure for our usage */
renatofilho@787
    71
typedef struct _GstMultiQueueItem GstMultiQueueItem;
renatofilho@787
    72
renatofilho@787
    73
struct _GstMultiQueueItem
renatofilho@787
    74
{
renatofilho@787
    75
  GstMiniObject *object;
renatofilho@787
    76
  guint size;
renatofilho@787
    77
  guint64 duration;
renatofilho@787
    78
  gboolean visible;
renatofilho@787
    79
renatofilho@787
    80
  GDestroyNotify destroy;
renatofilho@787
    81
  guint32 posid;
renatofilho@787
    82
};
renatofilho@787
    83
renatofilho@787
    84
static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
renatofilho@787
    85
static void gst_single_queue_free (GstSingleQueue * squeue);
renatofilho@787
    86
renatofilho@787
    87
static void wake_up_next_non_linked (GstMultiQueue * mq);
renatofilho@787
    88
static void compute_high_id (GstMultiQueue * mq);
renatofilho@787
    89
renatofilho@787
    90
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
renatofilho@787
    91
    GST_PAD_SINK,
renatofilho@787
    92
    GST_PAD_REQUEST,
renatofilho@787
    93
    GST_STATIC_CAPS_ANY);
renatofilho@787
    94
renatofilho@787
    95
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
renatofilho@787
    96
    GST_PAD_SRC,
renatofilho@787
    97
    GST_PAD_SOMETIMES,
renatofilho@787
    98
    GST_STATIC_CAPS_ANY);
renatofilho@787
    99
renatofilho@787
   100
GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
renatofilho@787
   101
#define GST_CAT_DEFAULT (multi_queue_debug)
renatofilho@787
   102
renatofilho@787
   103
/* default limits, we try to keep up to 2 seconds of data and if there is not
renatofilho@787
   104
 * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
renatofilho@787
   105
 * there is data in the queues. Normally, the byte and time limits are not hit
renatofilho@787
   106
 * in theses conditions. */
renatofilho@787
   107
#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
renatofilho@787
   108
#define DEFAULT_MAX_SIZE_BUFFERS 5
renatofilho@787
   109
#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
renatofilho@787
   110
renatofilho@787
   111
/* second limits. When we hit one of the above limits we are probably dealing
renatofilho@787
   112
 * with a badly muxed file and we scale the limits to these emergency values.
renatofilho@787
   113
 * This is currently not yet implemented. */
renatofilho@787
   114
#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
renatofilho@787
   115
#define DEFAULT_EXTRA_SIZE_BUFFERS 5
renatofilho@787
   116
#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
renatofilho@787
   117
renatofilho@787
   118
/* Signals and args */
renatofilho@787
   119
enum
renatofilho@787
   120
{
renatofilho@787
   121
  SIGNAL_UNDERRUN,
renatofilho@787
   122
  SIGNAL_OVERRUN,
renatofilho@787
   123
  LAST_SIGNAL
renatofilho@787
   124
};
renatofilho@787
   125
renatofilho@787
   126
enum
renatofilho@787
   127
{
renatofilho@787
   128
  ARG_0,
renatofilho@787
   129
  ARG_EXTRA_SIZE_BYTES,
renatofilho@787
   130
  ARG_EXTRA_SIZE_BUFFERS,
renatofilho@787
   131
  ARG_EXTRA_SIZE_TIME,
renatofilho@787
   132
  ARG_MAX_SIZE_BYTES,
renatofilho@787
   133
  ARG_MAX_SIZE_BUFFERS,
renatofilho@787
   134
  ARG_MAX_SIZE_TIME,
renatofilho@787
   135
};
renatofilho@787
   136
renatofilho@787
   137
renatofilho@787
   138
static const GstElementDetails  gst_multiqueue_details = 
renatofilho@787
   139
GST_ELEMENT_DETAILS ("MultiQueue",
renatofilho@787
   140
      "Generic",
renatofilho@787
   141
      "Multiple data queue",
renatofilho@787
   142
      "Edward Hervey <edward@fluendo.com>");
renatofilho@787
   143
renatofilho@787
   144
renatofilho@787
   145
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
renatofilho@787
   146
  g_mutex_lock (q->qlock);                                              \
renatofilho@787
   147
} G_STMT_END
renatofilho@787
   148
renatofilho@787
   149
#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
renatofilho@787
   150
  g_mutex_unlock (q->qlock);                                            \
renatofilho@787
   151
} G_STMT_END
renatofilho@787
   152
renatofilho@787
   153
static void gst_multi_queue_finalize (GObject * object);
renatofilho@787
   154
static void gst_multi_queue_set_property (GObject * object,
renatofilho@787
   155
    guint prop_id, const GValue * value, GParamSpec * pspec);
renatofilho@787
   156
static void gst_multi_queue_get_property (GObject * object,
renatofilho@787
   157
    guint prop_id, GValue * value, GParamSpec * pspec);
renatofilho@787
   158
renatofilho@787
   159
static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
renatofilho@787
   160
    GstPadTemplate * temp, const gchar * name);
renatofilho@787
   161
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
renatofilho@787
   162
renatofilho@787
   163
static void gst_multi_queue_loop (GstPad * pad);
renatofilho@787
   164
renatofilho@787
   165
#define _do_init(bla) \
renatofilho@787
   166
  GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
renatofilho@787
   167
renatofilho@787
   168
GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
renatofilho@787
   169
    GST_TYPE_ELEMENT, _do_init);
renatofilho@787
   170
renatofilho@787
   171
static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
renatofilho@787
   172
renatofilho@787
   173
renatofilho@787
   174
renatofilho@787
   175
static void
renatofilho@787
   176
gst_multi_queue_base_init (gpointer g_class)
renatofilho@787
   177
{
renatofilho@787
   178
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
renatofilho@787
   179
renatofilho@787
   180
  gst_element_class_set_details (gstelement_class, &gst_multiqueue_details);
renatofilho@787
   181
  gst_element_class_add_pad_template (gstelement_class,
renatofilho@787
   182
      gst_static_pad_template_get (&sinktemplate));
renatofilho@787
   183
  gst_element_class_add_pad_template (gstelement_class,
renatofilho@787
   184
      gst_static_pad_template_get (&srctemplate));
renatofilho@787
   185
}
renatofilho@787
   186
renatofilho@787
   187
static void
renatofilho@787
   188
gst_multi_queue_class_init (GstMultiQueueClass * klass)
renatofilho@787
   189
{
renatofilho@787
   190
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
renatofilho@787
   191
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
renatofilho@787
   192
renatofilho@787
   193
  gobject_class->set_property =
renatofilho@787
   194
      GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
renatofilho@787
   195
  gobject_class->get_property =
renatofilho@787
   196
      GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
renatofilho@787
   197
renatofilho@787
   198
  /* SIGNALS */
renatofilho@787
   199
  gst_multi_queue_signals[SIGNAL_UNDERRUN] =
renatofilho@787
   200
      g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
renatofilho@787
   201
      G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
renatofilho@787
   202
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
renatofilho@787
   203
renatofilho@787
   204
  gst_multi_queue_signals[SIGNAL_OVERRUN] =
renatofilho@787
   205
      g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
renatofilho@787
   206
      G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
renatofilho@787
   207
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
renatofilho@787
   208
renatofilho@787
   209
  /* PROPERTIES */
renatofilho@787
   210
renatofilho@787
   211
  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
renatofilho@787
   212
      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
renatofilho@787
   213
          "Max. amount of data in the queue (bytes, 0=disable)",
renatofilho@787
   214
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
renatofilho@787
   215
  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
renatofilho@787
   216
      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
renatofilho@787
   217
          "Max. number of buffers in the queue (0=disable)",
renatofilho@787
   218
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
renatofilho@787
   219
  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
renatofilho@787
   220
      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
renatofilho@787
   221
          "Max. amount of data in the queue (in ns, 0=disable)",
renatofilho@787
   222
          0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
renatofilho@787
   223
renatofilho@787
   224
  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
renatofilho@787
   225
      g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
renatofilho@787
   226
          "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
renatofilho@787
   227
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
renatofilho@787
   228
  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
renatofilho@787
   229
      g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
renatofilho@787
   230
          "Amount of buffers the queues can grow if one of them is empty (0=disable)",
renatofilho@787
   231
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
renatofilho@787
   232
  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
renatofilho@787
   233
      g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
renatofilho@787
   234
          "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
renatofilho@787
   235
          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
renatofilho@787
   236
renatofilho@787
   237
  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
renatofilho@787
   238
renatofilho@787
   239
  gstelement_class->request_new_pad =
renatofilho@787
   240
      GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
renatofilho@787
   241
  gstelement_class->release_pad =
renatofilho@787
   242
      GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
renatofilho@787
   243
}
renatofilho@787
   244
renatofilho@787
   245
static void
renatofilho@787
   246
gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
renatofilho@787
   247
{
renatofilho@787
   248
  mqueue->nbqueues = 0;
renatofilho@787
   249
  mqueue->queues = NULL;
renatofilho@787
   250
renatofilho@787
   251
  mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
renatofilho@787
   252
  mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
renatofilho@787
   253
  mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
renatofilho@787
   254
renatofilho@787
   255
  mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
renatofilho@787
   256
  mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
renatofilho@787
   257
  mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
renatofilho@787
   258
renatofilho@787
   259
  mqueue->counter = 1;
renatofilho@787
   260
  mqueue->highid = -1;
renatofilho@787
   261
  mqueue->nextnotlinked = -1;
renatofilho@787
   262
renatofilho@787
   263
  mqueue->qlock = g_mutex_new ();
renatofilho@787
   264
}
renatofilho@787
   265
renatofilho@787
   266
static void
renatofilho@787
   267
gst_multi_queue_finalize (GObject * object)
renatofilho@787
   268
{
renatofilho@787
   269
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
renatofilho@787
   270
renatofilho@787
   271
  g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
renatofilho@787
   272
  g_list_free (mqueue->queues);
renatofilho@787
   273
  mqueue->queues = NULL;
renatofilho@787
   274
renatofilho@787
   275
  /* free/unref instance data */
renatofilho@787
   276
  g_mutex_free (mqueue->qlock);
renatofilho@787
   277
renatofilho@787
   278
  G_OBJECT_CLASS (parent_class)->finalize (object);
renatofilho@787
   279
}
renatofilho@787
   280
renatofilho@787
   281
renatofilho@787
   282
renatofilho@787
   283
#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {	        \
renatofilho@787
   284
    GList * tmp = mq->queues;					\
renatofilho@787
   285
    while (tmp) {						\
renatofilho@787
   286
      GstSingleQueue *q = (GstSingleQueue*)tmp->data;		\
renatofilho@787
   287
      q->max_size.format = mq->max_size.format;                 \
renatofilho@787
   288
      tmp = g_list_next(tmp);					\
renatofilho@787
   289
    };								\
renatofilho@787
   290
} G_STMT_END
renatofilho@787
   291
renatofilho@787
   292
static void
renatofilho@787
   293
gst_multi_queue_set_property (GObject * object, guint prop_id,
renatofilho@787
   294
    const GValue * value, GParamSpec * pspec)
renatofilho@787
   295
{
renatofilho@787
   296
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);
renatofilho@787
   297
renatofilho@787
   298
  switch (prop_id) {
renatofilho@787
   299
    case ARG_MAX_SIZE_BYTES:
renatofilho@787
   300
      mq->max_size.bytes = g_value_get_uint (value);
renatofilho@787
   301
      SET_CHILD_PROPERTY (mq, bytes);
renatofilho@787
   302
      break;
renatofilho@787
   303
    case ARG_MAX_SIZE_BUFFERS:
renatofilho@787
   304
      mq->max_size.visible = g_value_get_uint (value);
renatofilho@787
   305
      SET_CHILD_PROPERTY (mq, visible);
renatofilho@787
   306
      break;
renatofilho@787
   307
    case ARG_MAX_SIZE_TIME:
renatofilho@787
   308
      mq->max_size.time = g_value_get_uint64 (value);
renatofilho@787
   309
      SET_CHILD_PROPERTY (mq, time);
renatofilho@787
   310
      break;
renatofilho@787
   311
    case ARG_EXTRA_SIZE_BYTES:
renatofilho@787
   312
      mq->extra_size.bytes = g_value_get_uint (value);
renatofilho@787
   313
      break;
renatofilho@787
   314
    case ARG_EXTRA_SIZE_BUFFERS:
renatofilho@787
   315
      mq->extra_size.visible = g_value_get_uint (value);
renatofilho@787
   316
      break;
renatofilho@787
   317
    case ARG_EXTRA_SIZE_TIME:
renatofilho@787
   318
      mq->extra_size.time = g_value_get_uint64 (value);
renatofilho@787
   319
      break;
renatofilho@787
   320
    default:
renatofilho@787
   321
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
renatofilho@787
   322
      break;
renatofilho@787
   323
  }
renatofilho@787
   324
}
renatofilho@787
   325
renatofilho@787
   326
static void
renatofilho@787
   327
gst_multi_queue_get_property (GObject * object, guint prop_id,
renatofilho@787
   328
    GValue * value, GParamSpec * pspec)
renatofilho@787
   329
{
renatofilho@787
   330
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);
renatofilho@787
   331
renatofilho@787
   332
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   333
renatofilho@787
   334
  switch (prop_id) {
renatofilho@787
   335
    case ARG_EXTRA_SIZE_BYTES:
renatofilho@787
   336
      g_value_set_uint (value, mq->extra_size.bytes);
renatofilho@787
   337
      break;
renatofilho@787
   338
    case ARG_EXTRA_SIZE_BUFFERS:
renatofilho@787
   339
      g_value_set_uint (value, mq->extra_size.visible);
renatofilho@787
   340
      break;
renatofilho@787
   341
    case ARG_EXTRA_SIZE_TIME:
renatofilho@787
   342
      g_value_set_uint64 (value, mq->extra_size.time);
renatofilho@787
   343
      break;
renatofilho@787
   344
    case ARG_MAX_SIZE_BYTES:
renatofilho@787
   345
      g_value_set_uint (value, mq->max_size.bytes);
renatofilho@787
   346
      break;
renatofilho@787
   347
    case ARG_MAX_SIZE_BUFFERS:
renatofilho@787
   348
      g_value_set_uint (value, mq->max_size.visible);
renatofilho@787
   349
      break;
renatofilho@787
   350
    case ARG_MAX_SIZE_TIME:
renatofilho@787
   351
      g_value_set_uint64 (value, mq->max_size.time);
renatofilho@787
   352
      break;
renatofilho@787
   353
    default:
renatofilho@787
   354
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
renatofilho@787
   355
      break;
renatofilho@787
   356
  }
renatofilho@787
   357
renatofilho@787
   358
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   359
}
renatofilho@787
   360
renatofilho@787
   361
GList *
renatofilho@787
   362
gst_multi_queue_get_internal_links (GstPad * pad)
renatofilho@787
   363
{
renatofilho@787
   364
  GList *res = NULL;
renatofilho@787
   365
  GstMultiQueue *mqueue;
renatofilho@787
   366
  GstSingleQueue *sq = NULL;
renatofilho@787
   367
  GList *tmp;
renatofilho@787
   368
renatofilho@787
   369
  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
renatofilho@787
   370
renatofilho@787
   371
  mqueue = GST_MULTI_QUEUE (GST_PAD_PARENT (pad));
renatofilho@787
   372
  if (!mqueue)
renatofilho@787
   373
    goto no_parent;
renatofilho@787
   374
renatofilho@787
   375
  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
renatofilho@787
   376
  /* Find which single queue it belongs to */
renatofilho@787
   377
  for (tmp = mqueue->queues; tmp && !res; tmp = g_list_next (tmp)) {
renatofilho@787
   378
    sq = (GstSingleQueue *) tmp->data;
renatofilho@787
   379
renatofilho@787
   380
    if (sq->sinkpad == pad)
renatofilho@787
   381
      res = g_list_prepend (res, sq->srcpad);
renatofilho@787
   382
    if (sq->srcpad == pad)
renatofilho@787
   383
      res = g_list_prepend (res, sq->sinkpad);
renatofilho@787
   384
  }
renatofilho@787
   385
renatofilho@787
   386
  if (!res)
renatofilho@787
   387
    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
renatofilho@787
   388
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
renatofilho@787
   389
renatofilho@787
   390
  return res;
renatofilho@787
   391
renatofilho@787
   392
no_parent:
renatofilho@787
   393
  {
renatofilho@787
   394
    GST_DEBUG_OBJECT (pad, "no parent");
renatofilho@787
   395
    return NULL;
renatofilho@787
   396
  }
renatofilho@787
   397
}
renatofilho@787
   398
renatofilho@787
   399
renatofilho@787
   400
/*
renatofilho@787
   401
 * GstElement methods
renatofilho@787
   402
 */
renatofilho@787
   403
renatofilho@787
   404
static GstPad *
renatofilho@787
   405
gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
renatofilho@787
   406
    const gchar * name)
renatofilho@787
   407
{
renatofilho@787
   408
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
renatofilho@787
   409
  GstSingleQueue *squeue;
renatofilho@787
   410
renatofilho@787
   411
  GST_LOG_OBJECT (element, "name : %s", name);
renatofilho@787
   412
renatofilho@787
   413
  /* Create a new single queue, add the sink and source pad and return the sink pad */
renatofilho@787
   414
  squeue = gst_single_queue_new (mqueue);
renatofilho@787
   415
renatofilho@787
   416
  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
renatofilho@787
   417
  mqueue->queues = g_list_append (mqueue->queues, squeue);
renatofilho@787
   418
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
renatofilho@787
   419
renatofilho@787
   420
  /*
renatofilho@787
   421
  GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
renatofilho@787
   422
      GST_DEBUG_PAD_NAME (squeue->sinkpad));
renatofilho@787
   423
    */
renatofilho@787
   424
  return squeue->sinkpad;
renatofilho@787
   425
}
renatofilho@787
   426
renatofilho@787
   427
static void
renatofilho@787
   428
gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
renatofilho@787
   429
{
renatofilho@787
   430
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
renatofilho@787
   431
  GstSingleQueue *sq = NULL;
renatofilho@787
   432
  GList *tmp;
renatofilho@787
   433
renatofilho@787
   434
//  GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
renatofilho@787
   435
renatofilho@787
   436
  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
renatofilho@787
   437
  /* Find which single queue it belongs to, knowing that it should be a sinkpad */
renatofilho@787
   438
  for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
renatofilho@787
   439
    sq = (GstSingleQueue *) tmp->data;
renatofilho@787
   440
renatofilho@787
   441
    if (sq->sinkpad == pad)
renatofilho@787
   442
      break;
renatofilho@787
   443
  }
renatofilho@787
   444
renatofilho@787
   445
  if (!tmp) {
renatofilho@787
   446
    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
renatofilho@787
   447
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
renatofilho@787
   448
    return;
renatofilho@787
   449
  }
renatofilho@787
   450
renatofilho@787
   451
  /* FIXME: The removal of the singlequeue should probably not happen until it
renatofilho@787
   452
   * finishes draining */
renatofilho@787
   453
renatofilho@787
   454
  /* remove it from the list */
renatofilho@787
   455
  mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
renatofilho@787
   456
renatofilho@787
   457
  /* FIXME : recompute next-non-linked */
renatofilho@787
   458
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
renatofilho@787
   459
renatofilho@787
   460
  /* delete SingleQueue */
renatofilho@787
   461
  gst_data_queue_set_flushing (sq->queue, TRUE);
renatofilho@787
   462
renatofilho@787
   463
  gst_pad_set_active (sq->srcpad, FALSE);
renatofilho@787
   464
  gst_pad_set_active (sq->sinkpad, FALSE);
renatofilho@787
   465
  gst_element_remove_pad (element, sq->srcpad);
renatofilho@787
   466
  gst_element_remove_pad (element, sq->sinkpad);
renatofilho@787
   467
  gst_single_queue_free (sq);
renatofilho@787
   468
}
renatofilho@787
   469
renatofilho@787
   470
static gboolean
renatofilho@787
   471
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
renatofilho@787
   472
{
renatofilho@787
   473
  gboolean result;
renatofilho@787
   474
renatofilho@787
   475
  GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
renatofilho@787
   476
      sq->id);
renatofilho@787
   477
renatofilho@787
   478
  if (flush) {
renatofilho@787
   479
    sq->srcresult = GST_FLOW_WRONG_STATE;
renatofilho@787
   480
    gst_data_queue_set_flushing (sq->queue, TRUE);
renatofilho@787
   481
renatofilho@787
   482
    /* wake up non-linked task */
renatofilho@787
   483
    GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
renatofilho@787
   484
        sq->id);
renatofilho@787
   485
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   486
    g_cond_signal (sq->turn);
renatofilho@787
   487
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   488
renatofilho@787
   489
    GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
renatofilho@787
   490
    result = gst_pad_pause_task (sq->srcpad);
renatofilho@787
   491
  } else {
renatofilho@787
   492
    gst_data_queue_flush (sq->queue);
renatofilho@787
   493
    gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
renatofilho@787
   494
    gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
renatofilho@787
   495
    /* All pads start off not-linked for a smooth kick-off */
renatofilho@787
   496
    sq->srcresult = GST_FLOW_NOT_LINKED;
renatofilho@787
   497
    sq->cur_time = 0;
renatofilho@787
   498
    sq->max_size.visible = mq->max_size.visible;
renatofilho@787
   499
    sq->is_eos = FALSE;
renatofilho@787
   500
    sq->inextra = FALSE;
renatofilho@787
   501
    sq->nextid = 0;
renatofilho@787
   502
    sq->oldid = 0;
renatofilho@787
   503
    gst_data_queue_set_flushing (sq->queue, FALSE);
renatofilho@787
   504
renatofilho@787
   505
    GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
renatofilho@787
   506
    result =
renatofilho@787
   507
        gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
renatofilho@787
   508
        sq->srcpad);
renatofilho@787
   509
  }
renatofilho@787
   510
  return result;
renatofilho@787
   511
}
renatofilho@787
   512
renatofilho@787
   513
/* calculate the diff between running time on the sink and src of the queue.
renatofilho@787
   514
 * This is the total amount of time in the queue. 
renatofilho@787
   515
 * WITH LOCK TAKEN */
renatofilho@787
   516
static void
renatofilho@787
   517
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
renatofilho@787
   518
{
renatofilho@787
   519
  gint64 sink_time, src_time;
renatofilho@787
   520
renatofilho@787
   521
  sink_time =
renatofilho@787
   522
      gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
renatofilho@787
   523
      sq->sink_segment.last_stop);
renatofilho@787
   524
renatofilho@787
   525
  src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
renatofilho@787
   526
      sq->src_segment.last_stop);
renatofilho@787
   527
renatofilho@787
   528
  GST_DEBUG_OBJECT (mq,
renatofilho@787
   529
      "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
renatofilho@787
   530
      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
renatofilho@787
   531
renatofilho@787
   532
  /* This allows for streams with out of order timestamping - sometimes the 
renatofilho@787
   533
   * emerging timestamp is later than the arriving one(s) */
renatofilho@787
   534
  if (sink_time >= src_time)
renatofilho@787
   535
    sq->cur_time = sink_time - src_time;
renatofilho@787
   536
  else
renatofilho@787
   537
    sq->cur_time = 0;
renatofilho@787
   538
}
renatofilho@787
   539
renatofilho@787
   540
/* take a NEWSEGMENT event and apply the values to segment, updating the time
renatofilho@787
   541
 * level of queue. */
renatofilho@787
   542
static void
renatofilho@787
   543
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
renatofilho@787
   544
    GstSegment * segment)
renatofilho@787
   545
{
renatofilho@787
   546
  gboolean update;
renatofilho@787
   547
  GstFormat format;
renatofilho@787
   548
  gdouble rate, arate;
renatofilho@787
   549
  gint64 start, stop, time;
renatofilho@787
   550
renatofilho@787
   551
  gst_event_parse_new_segment_full (event, &update, &rate, &arate,
renatofilho@787
   552
      &format, &start, &stop, &time);
renatofilho@787
   553
renatofilho@787
   554
  /* now configure the values, we use these to track timestamps on the
renatofilho@787
   555
   * sinkpad. */
renatofilho@787
   556
  if (format != GST_FORMAT_TIME) {
renatofilho@787
   557
    /* non-time format, pretent the current time segment is closed with a
renatofilho@787
   558
     * 0 start and unknown stop time. */
renatofilho@787
   559
    update = FALSE;
renatofilho@787
   560
    format = GST_FORMAT_TIME;
renatofilho@787
   561
    start = 0;
renatofilho@787
   562
    stop = -1;
renatofilho@787
   563
    time = 0;
renatofilho@787
   564
  }
renatofilho@787
   565
renatofilho@787
   566
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   567
renatofilho@787
   568
  gst_segment_set_newsegment_full (segment, update,
renatofilho@787
   569
      rate, arate, format, start, stop, time);
renatofilho@787
   570
renatofilho@787
   571
  GST_DEBUG_OBJECT (mq,
renatofilho@787
   572
      "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
renatofilho@787
   573
renatofilho@787
   574
  /* segment can update the time level of the queue */
renatofilho@787
   575
  update_time_level (mq, sq);
renatofilho@787
   576
renatofilho@787
   577
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   578
}
renatofilho@787
   579
renatofilho@787
   580
/* take a buffer and update segment, updating the time level of the queue. */
renatofilho@787
   581
static void
renatofilho@787
   582
apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
renatofilho@787
   583
    GstClockTime duration, GstSegment * segment)
renatofilho@787
   584
{
renatofilho@787
   585
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   586
renatofilho@787
   587
  /* if no timestamp is set, assume it's continuous with the previous 
renatofilho@787
   588
   * time */
renatofilho@787
   589
  if (timestamp == GST_CLOCK_TIME_NONE)
renatofilho@787
   590
    timestamp = segment->last_stop;
renatofilho@787
   591
renatofilho@787
   592
  /* add duration */
renatofilho@787
   593
  if (duration != GST_CLOCK_TIME_NONE)
renatofilho@787
   594
    timestamp += duration;
renatofilho@787
   595
renatofilho@787
   596
  GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
renatofilho@787
   597
      sq->id, GST_TIME_ARGS (timestamp));
renatofilho@787
   598
renatofilho@787
   599
  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
renatofilho@787
   600
renatofilho@787
   601
  /* calc diff with other end */
renatofilho@787
   602
  update_time_level (mq, sq);
renatofilho@787
   603
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   604
}
renatofilho@787
   605
renatofilho@787
   606
static GstFlowReturn
renatofilho@787
   607
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
renatofilho@787
   608
    GstMiniObject * object)
renatofilho@787
   609
{
renatofilho@787
   610
  GstFlowReturn result = GST_FLOW_OK;
renatofilho@787
   611
renatofilho@787
   612
  if (GST_IS_BUFFER (object)) {
renatofilho@787
   613
    GstBuffer *buffer;
renatofilho@787
   614
    GstClockTime timestamp, duration;
renatofilho@787
   615
renatofilho@787
   616
    buffer = GST_BUFFER_CAST (object);
renatofilho@787
   617
    timestamp = GST_BUFFER_TIMESTAMP (buffer);
renatofilho@787
   618
    duration = GST_BUFFER_DURATION (buffer);
renatofilho@787
   619
renatofilho@787
   620
    apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
renatofilho@787
   621
renatofilho@787
   622
    /* Applying the buffer may have made the queue non-full again, unblock it if needed */
renatofilho@787
   623
    gst_data_queue_limits_changed (sq->queue);
renatofilho@787
   624
renatofilho@787
   625
    GST_DEBUG_OBJECT (mq,
renatofilho@787
   626
        "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
renatofilho@787
   627
        sq->id, buffer, GST_TIME_ARGS (timestamp));
renatofilho@787
   628
renatofilho@787
   629
    result = gst_pad_push (sq->srcpad, buffer);
renatofilho@787
   630
  } else if (GST_IS_EVENT (object)) {
renatofilho@787
   631
    GstEvent *event;
renatofilho@787
   632
renatofilho@787
   633
    event = GST_EVENT_CAST (object);
renatofilho@787
   634
renatofilho@787
   635
    switch (GST_EVENT_TYPE (event)) {
renatofilho@787
   636
      case GST_EVENT_EOS:
renatofilho@787
   637
        result = GST_FLOW_UNEXPECTED;
renatofilho@787
   638
        break;
renatofilho@787
   639
      case GST_EVENT_NEWSEGMENT:
renatofilho@787
   640
        apply_segment (mq, sq, event, &sq->src_segment);
renatofilho@787
   641
        /* Applying the segment may have made the queue non-full again, unblock it if needed */
renatofilho@787
   642
        gst_data_queue_limits_changed (sq->queue);
renatofilho@787
   643
        break;
renatofilho@787
   644
      default:
renatofilho@787
   645
        break;
renatofilho@787
   646
    }
renatofilho@787
   647
renatofilho@787
   648
    GST_DEBUG_OBJECT (mq,
renatofilho@787
   649
        "SingleQueue %d : Pushing event %p of type %s",
renatofilho@787
   650
        sq->id, event, GST_EVENT_TYPE_NAME (event));
renatofilho@787
   651
renatofilho@787
   652
    gst_pad_push_event (sq->srcpad, event);
renatofilho@787
   653
  } else {
renatofilho@787
   654
    g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
renatofilho@787
   655
        sq->id);
renatofilho@787
   656
  }
renatofilho@787
   657
  return result;
renatofilho@787
   658
renatofilho@787
   659
  /* ERRORS */
renatofilho@787
   660
}
renatofilho@787
   661
renatofilho@787
   662
static GstMiniObject *
renatofilho@787
   663
gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
renatofilho@787
   664
{
renatofilho@787
   665
  GstMiniObject *res;
renatofilho@787
   666
renatofilho@787
   667
  res = item->object;
renatofilho@787
   668
  item->object = NULL;
renatofilho@787
   669
renatofilho@787
   670
  return res;
renatofilho@787
   671
}
renatofilho@787
   672
renatofilho@787
   673
static void
renatofilho@787
   674
gst_multi_queue_item_destroy (GstMultiQueueItem * item)
renatofilho@787
   675
{
renatofilho@787
   676
  if (item->object)
renatofilho@787
   677
    gst_mini_object_unref (item->object);
renatofilho@787
   678
  g_free (item);
renatofilho@787
   679
}
renatofilho@787
   680
renatofilho@787
   681
/* takes ownership of passed mini object! */
renatofilho@787
   682
static GstMultiQueueItem *
renatofilho@787
   683
gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
renatofilho@787
   684
{
renatofilho@787
   685
  GstMultiQueueItem *item;
renatofilho@787
   686
renatofilho@787
   687
  item = g_new (GstMultiQueueItem, 1);
renatofilho@787
   688
  item->object = object;
renatofilho@787
   689
  item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
renatofilho@787
   690
  item->posid = curid;
renatofilho@787
   691
renatofilho@787
   692
  if (GST_IS_BUFFER (object)) {
renatofilho@787
   693
    item->size = GST_BUFFER_SIZE (object);
renatofilho@787
   694
    item->duration = GST_BUFFER_DURATION (object);
renatofilho@787
   695
    if (item->duration == GST_CLOCK_TIME_NONE)
renatofilho@787
   696
      item->duration = 0;
renatofilho@787
   697
    item->visible = TRUE;
renatofilho@787
   698
  } else {
renatofilho@787
   699
    item->size = 0;
renatofilho@787
   700
    item->duration = 0;
renatofilho@787
   701
    item->visible = FALSE;
renatofilho@787
   702
  }
renatofilho@787
   703
  return item;
renatofilho@787
   704
}
renatofilho@787
   705
renatofilho@787
   706
/* Each main loop attempts to push buffers until the return value
renatofilho@787
   707
 * is not-linked. not-linked pads are not allowed to push data beyond
renatofilho@787
   708
 * any linked pads, so they don't 'rush ahead of the pack'.
renatofilho@787
   709
 */
renatofilho@787
   710
static void
renatofilho@787
   711
gst_multi_queue_loop (GstPad * pad)
renatofilho@787
   712
{
renatofilho@787
   713
  GstSingleQueue *sq;
renatofilho@787
   714
  GstMultiQueueItem *item;
renatofilho@787
   715
  GstDataQueueItem *sitem;
renatofilho@787
   716
  GstMultiQueue *mq;
renatofilho@787
   717
  GstMiniObject *object;
renatofilho@787
   718
  guint32 newid;
renatofilho@787
   719
  guint32 oldid = -1;
renatofilho@787
   720
  GstFlowReturn result;
renatofilho@787
   721
renatofilho@787
   722
  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
renatofilho@787
   723
  mq = sq->mqueue;
renatofilho@787
   724
renatofilho@787
   725
  do {
renatofilho@787
   726
    GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
renatofilho@787
   727
renatofilho@787
   728
    /* Get something from the queue, blocking until that happens, or we get
renatofilho@787
   729
     * flushed */
renatofilho@787
   730
    if (!(gst_data_queue_pop (sq->queue, &sitem)))
renatofilho@787
   731
      goto out_flushing;
renatofilho@787
   732
renatofilho@787
   733
    item = (GstMultiQueueItem *) sitem;
renatofilho@787
   734
    newid = item->posid;
renatofilho@787
   735
renatofilho@787
   736
    /* steal the object and destroy the item */
renatofilho@787
   737
    object = gst_multi_queue_item_steal_object (item);
renatofilho@787
   738
    gst_multi_queue_item_destroy (item);
renatofilho@787
   739
renatofilho@787
   740
    GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
renatofilho@787
   741
        sq->id, newid, oldid);
renatofilho@787
   742
renatofilho@787
   743
    /* If we're not-linked, we do some extra work because we might need to
renatofilho@787
   744
     * wait before pushing. If we're linked but there's a gap in the IDs,
renatofilho@787
   745
     * or it's the first loop, or we just passed the previous highid, 
renatofilho@787
   746
     * we might need to wake some sleeping pad up, so there's extra work 
renatofilho@787
   747
     * there too */
renatofilho@787
   748
    if (sq->srcresult == GST_FLOW_NOT_LINKED ||
renatofilho@787
   749
        (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) {
renatofilho@787
   750
      GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
renatofilho@787
   751
          gst_flow_get_name (sq->srcresult));
renatofilho@787
   752
renatofilho@787
   753
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   754
renatofilho@787
   755
      /* Update the nextid so other threads know when to wake us up */
renatofilho@787
   756
      sq->nextid = newid;
renatofilho@787
   757
renatofilho@787
   758
      /* Update the oldid (the last ID we output) for highid tracking */
renatofilho@787
   759
      if (oldid != -1)
renatofilho@787
   760
        sq->oldid = oldid;
renatofilho@787
   761
renatofilho@787
   762
      if (sq->srcresult == GST_FLOW_NOT_LINKED) {
renatofilho@787
   763
        /* Go to sleep until it's time to push this buffer */
renatofilho@787
   764
renatofilho@787
   765
        /* Recompute the highid */
renatofilho@787
   766
        compute_high_id (mq);
renatofilho@787
   767
        while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
renatofilho@787
   768
          GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
renatofilho@787
   769
              "newid %u and highid %u", sq->id, newid, mq->highid);
renatofilho@787
   770
renatofilho@787
   771
renatofilho@787
   772
          /* Wake up all non-linked pads before we sleep */
renatofilho@787
   773
          wake_up_next_non_linked (mq);
renatofilho@787
   774
renatofilho@787
   775
          mq->numwaiting++;
renatofilho@787
   776
          g_cond_wait (sq->turn, mq->qlock);
renatofilho@787
   777
          mq->numwaiting--;
renatofilho@787
   778
renatofilho@787
   779
          GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
renatofilho@787
   780
              "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
renatofilho@787
   781
        }
renatofilho@787
   782
renatofilho@787
   783
        /* Re-compute the high_id in case someone else pushed */
renatofilho@787
   784
        compute_high_id (mq);
renatofilho@787
   785
      } else {
renatofilho@787
   786
        compute_high_id (mq);
renatofilho@787
   787
        /* Wake up all non-linked pads */
renatofilho@787
   788
        wake_up_next_non_linked (mq);
renatofilho@787
   789
      }
renatofilho@787
   790
      /* We're done waiting, we can clear the nextid */
renatofilho@787
   791
      sq->nextid = 0;
renatofilho@787
   792
renatofilho@787
   793
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   794
    }
renatofilho@787
   795
renatofilho@787
   796
    GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
renatofilho@787
   797
        gst_flow_get_name (sq->srcresult));
renatofilho@787
   798
renatofilho@787
   799
    /* Try to push out the new object */
renatofilho@787
   800
    result = gst_single_queue_push_one (mq, sq, object);
renatofilho@787
   801
    sq->srcresult = result;
renatofilho@787
   802
renatofilho@787
   803
    if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
renatofilho@787
   804
      goto out_flushing;
renatofilho@787
   805
renatofilho@787
   806
    GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
renatofilho@787
   807
        gst_flow_get_name (sq->srcresult));
renatofilho@787
   808
renatofilho@787
   809
    oldid = newid;
renatofilho@787
   810
  }
renatofilho@787
   811
  while (TRUE);
renatofilho@787
   812
renatofilho@787
   813
out_flushing:
renatofilho@787
   814
  {
renatofilho@787
   815
    /* Need to make sure wake up any sleeping pads when we exit */
renatofilho@787
   816
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   817
    compute_high_id (mq);
renatofilho@787
   818
    wake_up_next_non_linked (mq);
renatofilho@787
   819
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   820
renatofilho@787
   821
    gst_data_queue_set_flushing (sq->queue, TRUE);
renatofilho@787
   822
    gst_pad_pause_task (sq->srcpad);
renatofilho@787
   823
    GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
renatofilho@787
   824
        "SingleQueue[%d] task paused, reason:%s",
renatofilho@787
   825
        sq->id, gst_flow_get_name (sq->srcresult));
renatofilho@787
   826
    return;
renatofilho@787
   827
  }
renatofilho@787
   828
}
renatofilho@787
   829
renatofilho@787
   830
/**
renatofilho@787
   831
 * gst_multi_queue_chain:
renatofilho@787
   832
 *
renatofilho@787
   833
 * This is similar to GstQueue's chain function, except:
renatofilho@787
   834
 * _ we don't have leak behavioures,
renatofilho@787
   835
 * _ we push with a unique id (curid)
renatofilho@787
   836
 */
renatofilho@787
   837
static GstFlowReturn
renatofilho@787
   838
gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
renatofilho@787
   839
{
renatofilho@787
   840
  GstSingleQueue *sq;
renatofilho@787
   841
  GstMultiQueue *mq;
renatofilho@787
   842
  GstMultiQueueItem *item;
renatofilho@787
   843
  GstFlowReturn ret = GST_FLOW_OK;
renatofilho@787
   844
  guint32 curid;
renatofilho@787
   845
  GstClockTime timestamp, duration;
renatofilho@787
   846
renatofilho@787
   847
  sq = gst_pad_get_element_private (pad);
renatofilho@787
   848
  mq = (GstMultiQueue *) gst_pad_get_parent (pad);
renatofilho@787
   849
renatofilho@787
   850
  /* Get a unique incrementing id */
renatofilho@787
   851
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   852
  curid = mq->counter++;
renatofilho@787
   853
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   854
renatofilho@787
   855
  GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
renatofilho@787
   856
      sq->id, buffer, curid);
renatofilho@787
   857
renatofilho@787
   858
  item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
renatofilho@787
   859
renatofilho@787
   860
  timestamp = GST_BUFFER_TIMESTAMP (buffer);
renatofilho@787
   861
  duration = GST_BUFFER_DURATION (buffer);
renatofilho@787
   862
renatofilho@787
   863
  if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
renatofilho@787
   864
    goto flushing;
renatofilho@787
   865
renatofilho@787
   866
  /* update time level, we must do this after pushing the data in the queue so
renatofilho@787
   867
   * that we never end up filling the queue first. */
renatofilho@787
   868
  apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
renatofilho@787
   869
renatofilho@787
   870
done:
renatofilho@787
   871
  gst_object_unref (mq);
renatofilho@787
   872
renatofilho@787
   873
  return ret;
renatofilho@787
   874
renatofilho@787
   875
  /* ERRORS */
renatofilho@787
   876
flushing:
renatofilho@787
   877
  {
renatofilho@787
   878
    ret = sq->srcresult;
renatofilho@787
   879
    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
renatofilho@787
   880
        sq->id, gst_flow_get_name (ret));
renatofilho@787
   881
    gst_multi_queue_item_destroy (item);
renatofilho@787
   882
    goto done;
renatofilho@787
   883
  }
renatofilho@787
   884
}
renatofilho@787
   885
renatofilho@787
   886
static gboolean
renatofilho@787
   887
gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
renatofilho@787
   888
{
renatofilho@787
   889
  GstSingleQueue *sq;
renatofilho@787
   890
renatofilho@787
   891
  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
renatofilho@787
   892
renatofilho@787
   893
  if (active) {
renatofilho@787
   894
    /* All pads start off not-linked for a smooth kick-off */
renatofilho@787
   895
    sq->srcresult = GST_FLOW_NOT_LINKED;
renatofilho@787
   896
  } else {
renatofilho@787
   897
    sq->srcresult = GST_FLOW_WRONG_STATE;
renatofilho@787
   898
    gst_data_queue_flush (sq->queue);
renatofilho@787
   899
  }
renatofilho@787
   900
  return TRUE;
renatofilho@787
   901
}
renatofilho@787
   902
renatofilho@787
   903
static gboolean
renatofilho@787
   904
gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
renatofilho@787
   905
{
renatofilho@787
   906
  GstSingleQueue *sq;
renatofilho@787
   907
  GstMultiQueue *mq;
renatofilho@787
   908
  guint32 curid;
renatofilho@787
   909
  GstMultiQueueItem *item;
renatofilho@787
   910
  gboolean res;
renatofilho@787
   911
  GstEventType type;
renatofilho@787
   912
  GstEvent *sref = NULL;
renatofilho@787
   913
renatofilho@787
   914
  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
renatofilho@787
   915
  mq = (GstMultiQueue *) gst_pad_get_parent (pad);
renatofilho@787
   916
renatofilho@787
   917
  type = GST_EVENT_TYPE (event);
renatofilho@787
   918
renatofilho@787
   919
  switch (type) {
renatofilho@787
   920
    case GST_EVENT_FLUSH_START:
renatofilho@787
   921
      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
renatofilho@787
   922
          sq->id);
renatofilho@787
   923
renatofilho@787
   924
      res = gst_pad_push_event (sq->srcpad, event);
renatofilho@787
   925
renatofilho@787
   926
      gst_single_queue_flush (mq, sq, TRUE);
renatofilho@787
   927
      goto done;
renatofilho@787
   928
renatofilho@787
   929
    case GST_EVENT_FLUSH_STOP:
renatofilho@787
   930
      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
renatofilho@787
   931
          sq->id);
renatofilho@787
   932
renatofilho@787
   933
      res = gst_pad_push_event (sq->srcpad, event);
renatofilho@787
   934
renatofilho@787
   935
      gst_single_queue_flush (mq, sq, FALSE);
renatofilho@787
   936
      goto done;
renatofilho@787
   937
    case GST_EVENT_NEWSEGMENT:
renatofilho@787
   938
      /* take ref because the queue will take ownership and we need the event
renatofilho@787
   939
       * afterwards to update the segment */
renatofilho@787
   940
      sref = gst_event_ref (event);
renatofilho@787
   941
      break;
renatofilho@787
   942
renatofilho@787
   943
    default:
renatofilho@787
   944
      if (!(GST_EVENT_IS_SERIALIZED (event))) {
renatofilho@787
   945
        res = gst_pad_push_event (sq->srcpad, event);
renatofilho@787
   946
        goto done;
renatofilho@787
   947
      }
renatofilho@787
   948
      break;
renatofilho@787
   949
  }
renatofilho@787
   950
renatofilho@787
   951
  /* Get an unique incrementing id */
renatofilho@787
   952
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
   953
  curid = mq->counter++;
renatofilho@787
   954
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
   955
renatofilho@787
   956
  item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
renatofilho@787
   957
renatofilho@787
   958
  GST_DEBUG_OBJECT (mq,
renatofilho@787
   959
      "SingleQueue %d : Enqueuing event %p of type %s with id %d",
renatofilho@787
   960
      sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
renatofilho@787
   961
renatofilho@787
   962
  if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
renatofilho@787
   963
    goto flushing;
renatofilho@787
   964
renatofilho@787
   965
  /* mark EOS when we received one, we must do that after putting the
renatofilho@787
   966
   * buffer in the queue because EOS marks the buffer as filled. No need to take
renatofilho@787
   967
   * a lock, the _check_full happens from this thread only, right before pushing
renatofilho@787
   968
   * into dataqueue. */
renatofilho@787
   969
  switch (type) {
renatofilho@787
   970
    case GST_EVENT_EOS:
renatofilho@787
   971
      sq->is_eos = TRUE;
renatofilho@787
   972
      break;
renatofilho@787
   973
    case GST_EVENT_NEWSEGMENT:
renatofilho@787
   974
      apply_segment (mq, sq, sref, &sq->sink_segment);
renatofilho@787
   975
      gst_event_unref (sref);
renatofilho@787
   976
      break;
renatofilho@787
   977
    default:
renatofilho@787
   978
      break;
renatofilho@787
   979
  }
renatofilho@787
   980
done:
renatofilho@787
   981
  gst_object_unref (mq);
renatofilho@787
   982
  return res;
renatofilho@787
   983
renatofilho@787
   984
flushing:
renatofilho@787
   985
  {
renatofilho@787
   986
    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
renatofilho@787
   987
        sq->id, gst_flow_get_name (sq->srcresult));
renatofilho@787
   988
    if (sref)
renatofilho@787
   989
      gst_event_unref (sref);
renatofilho@787
   990
    gst_multi_queue_item_destroy (item);
renatofilho@787
   991
    goto done;
renatofilho@787
   992
  }
renatofilho@787
   993
}
renatofilho@787
   994
renatofilho@787
   995
static GstCaps *
renatofilho@787
   996
gst_multi_queue_getcaps (GstPad * pad)
renatofilho@787
   997
{
renatofilho@787
   998
  GstSingleQueue *sq = gst_pad_get_element_private (pad);
renatofilho@787
   999
  GstPad *otherpad;
renatofilho@787
  1000
  GstCaps *result;
renatofilho@787
  1001
renatofilho@787
  1002
  otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
renatofilho@787
  1003
renatofilho@787
  1004
  GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
renatofilho@787
  1005
renatofilho@787
  1006
  result = gst_pad_peer_get_caps (otherpad);
renatofilho@787
  1007
  if (result == NULL)
renatofilho@787
  1008
    result = gst_caps_new_any ();
renatofilho@787
  1009
renatofilho@787
  1010
  return result;
renatofilho@787
  1011
}
renatofilho@787
  1012
renatofilho@787
  1013
static GstFlowReturn
renatofilho@787
  1014
gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
renatofilho@787
  1015
    GstCaps * caps, GstBuffer ** buf)
renatofilho@787
  1016
{
renatofilho@787
  1017
  GstSingleQueue *sq = gst_pad_get_element_private (pad);
renatofilho@787
  1018
renatofilho@787
  1019
  return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
renatofilho@787
  1020
}
renatofilho@787
  1021
renatofilho@787
  1022
static gboolean
renatofilho@787
  1023
gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
renatofilho@787
  1024
{
renatofilho@787
  1025
  GstMultiQueue *mq;
renatofilho@787
  1026
  GstSingleQueue *sq;
renatofilho@787
  1027
  gboolean result = FALSE;
renatofilho@787
  1028
renatofilho@787
  1029
  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
renatofilho@787
  1030
  mq = sq->mqueue;
renatofilho@787
  1031
renatofilho@787
  1032
  GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
renatofilho@787
  1033
renatofilho@787
  1034
  if (active) {
renatofilho@787
  1035
    result = gst_single_queue_flush (mq, sq, FALSE);
renatofilho@787
  1036
  } else {
renatofilho@787
  1037
    result = gst_single_queue_flush (mq, sq, TRUE);
renatofilho@787
  1038
    /* make sure streaming finishes */
renatofilho@787
  1039
    result |= gst_pad_stop_task (pad);
renatofilho@787
  1040
  }
renatofilho@787
  1041
  return result;
renatofilho@787
  1042
}
renatofilho@787
  1043
renatofilho@787
  1044
static gboolean
renatofilho@787
  1045
gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
renatofilho@787
  1046
{
renatofilho@787
  1047
  return TRUE;
renatofilho@787
  1048
}
renatofilho@787
  1049
renatofilho@787
  1050
static gboolean
renatofilho@787
  1051
gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
renatofilho@787
  1052
{
renatofilho@787
  1053
  GstSingleQueue *sq = gst_pad_get_element_private (pad);
renatofilho@787
  1054
renatofilho@787
  1055
  return gst_pad_push_event (sq->sinkpad, event);
renatofilho@787
  1056
}
renatofilho@787
  1057
renatofilho@787
  1058
static gboolean
renatofilho@787
  1059
gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
renatofilho@787
  1060
{
renatofilho@787
  1061
  GstSingleQueue *sq = gst_pad_get_element_private (pad);
renatofilho@787
  1062
  GstPad *peerpad;
renatofilho@787
  1063
  gboolean res;
renatofilho@787
  1064
renatofilho@787
  1065
  /* FIXME, Handle position offset depending on queue size */
renatofilho@787
  1066
renatofilho@787
  1067
  /* default handling */
renatofilho@787
  1068
  if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
renatofilho@787
  1069
    goto no_peer;
renatofilho@787
  1070
renatofilho@787
  1071
  res = gst_pad_query (peerpad, query);
renatofilho@787
  1072
renatofilho@787
  1073
  gst_object_unref (peerpad);
renatofilho@787
  1074
renatofilho@787
  1075
  return res;
renatofilho@787
  1076
renatofilho@787
  1077
  /* ERRORS */
renatofilho@787
  1078
no_peer:
renatofilho@787
  1079
  {
renatofilho@787
  1080
    GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
renatofilho@787
  1081
    return FALSE;
renatofilho@787
  1082
  }
renatofilho@787
  1083
}
renatofilho@787
  1084
renatofilho@787
  1085
/*
renatofilho@787
  1086
 * Next-non-linked functions
renatofilho@787
  1087
 */
renatofilho@787
  1088
renatofilho@787
  1089
/* WITH LOCK TAKEN */
renatofilho@787
  1090
static void
renatofilho@787
  1091
wake_up_next_non_linked (GstMultiQueue * mq)
renatofilho@787
  1092
{
renatofilho@787
  1093
  GList *tmp;
renatofilho@787
  1094
renatofilho@787
  1095
  /* maybe no-one is waiting */
renatofilho@787
  1096
  if (mq->numwaiting < 1)
renatofilho@787
  1097
    return;
renatofilho@787
  1098
renatofilho@787
  1099
  /* Else figure out which singlequeue(s) need waking up */
renatofilho@787
  1100
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
renatofilho@787
  1101
    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
renatofilho@787
  1102
renatofilho@787
  1103
    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
renatofilho@787
  1104
      if (sq->nextid != 0 && sq->nextid <= mq->highid) {
renatofilho@787
  1105
        GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
renatofilho@787
  1106
        g_cond_signal (sq->turn);
renatofilho@787
  1107
      }
renatofilho@787
  1108
    }
renatofilho@787
  1109
  }
renatofilho@787
  1110
}
renatofilho@787
  1111
renatofilho@787
  1112
/* WITH LOCK TAKEN */
renatofilho@787
  1113
static void
renatofilho@787
  1114
compute_high_id (GstMultiQueue * mq)
renatofilho@787
  1115
{
renatofilho@787
  1116
  /* The high-id is either the highest id among the linked pads, or if all
renatofilho@787
  1117
   * pads are not-linked, it's the lowest not-linked pad */
renatofilho@787
  1118
  GList *tmp;
renatofilho@787
  1119
  guint32 lowest = G_MAXUINT32;
renatofilho@787
  1120
  guint32 highid = G_MAXUINT32;
renatofilho@787
  1121
renatofilho@787
  1122
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
renatofilho@787
  1123
    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
renatofilho@787
  1124
renatofilho@787
  1125
    GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
renatofilho@787
  1126
        sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
renatofilho@787
  1127
renatofilho@787
  1128
    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
renatofilho@787
  1129
      /* No need to consider queues which are not waiting */
renatofilho@787
  1130
      if (sq->nextid == 0) {
renatofilho@787
  1131
        GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
renatofilho@787
  1132
        continue;
renatofilho@787
  1133
      }
renatofilho@787
  1134
renatofilho@787
  1135
      if (sq->nextid < lowest)
renatofilho@787
  1136
        lowest = sq->nextid;
renatofilho@787
  1137
    } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
renatofilho@787
  1138
      /* If we don't have a global highid, or the global highid is lower than
renatofilho@787
  1139
       * this single queue's last outputted id, store the queue's one, 
renatofilho@787
  1140
       * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
renatofilho@787
  1141
      if ((highid == G_MAXUINT32) || (sq->oldid > highid))
renatofilho@787
  1142
        highid = sq->oldid;
renatofilho@787
  1143
    }
renatofilho@787
  1144
  }
renatofilho@787
  1145
renatofilho@787
  1146
  if (highid == G_MAXUINT32 || lowest < highid)
renatofilho@787
  1147
    mq->highid = lowest;
renatofilho@787
  1148
  else
renatofilho@787
  1149
    mq->highid = highid;
renatofilho@787
  1150
renatofilho@787
  1151
  GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
renatofilho@787
  1152
      lowest);
renatofilho@787
  1153
}
renatofilho@787
  1154
renatofilho@787
  1155
#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
renatofilho@787
  1156
     (sq->max_size.format) <= (value))
renatofilho@787
  1157
renatofilho@787
  1158
/*
renatofilho@787
  1159
 * GstSingleQueue functions
renatofilho@787
  1160
 */
renatofilho@787
  1161
static void
renatofilho@787
  1162
single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
renatofilho@787
  1163
{
renatofilho@787
  1164
  GstMultiQueue *mq = sq->mqueue;
renatofilho@787
  1165
  GList *tmp;
renatofilho@787
  1166
  GstDataQueueSize size;
renatofilho@787
  1167
  gboolean filled = FALSE;
renatofilho@787
  1168
renatofilho@787
  1169
  gst_data_queue_get_level (sq->queue, &size);
renatofilho@787
  1170
renatofilho@787
  1171
  GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
renatofilho@787
  1172
renatofilho@787
  1173
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
  1174
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
renatofilho@787
  1175
    GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
renatofilho@787
  1176
    GstDataQueueSize ssize;
renatofilho@787
  1177
renatofilho@787
  1178
    GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
renatofilho@787
  1179
renatofilho@787
  1180
    if (gst_data_queue_is_empty (ssq->queue)) {
renatofilho@787
  1181
      GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
renatofilho@787
  1182
      if (IS_FILLED (visible, size.visible)) {
renatofilho@787
  1183
        sq->max_size.visible++;
renatofilho@787
  1184
        GST_DEBUG_OBJECT (mq,
renatofilho@787
  1185
            "Another queue is empty, bumping single queue %d max visible to %d",
renatofilho@787
  1186
            sq->id, sq->max_size.visible);
renatofilho@787
  1187
      }
renatofilho@787
  1188
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
  1189
      goto beach;
renatofilho@787
  1190
    }
renatofilho@787
  1191
    /* check if we reached the hard time/bytes limits */
renatofilho@787
  1192
    gst_data_queue_get_level (ssq->queue, &ssize);
renatofilho@787
  1193
renatofilho@787
  1194
    GST_DEBUG_OBJECT (mq,
renatofilho@787
  1195
        "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
renatofilho@787
  1196
        G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
renatofilho@787
  1197
        ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
renatofilho@787
  1198
renatofilho@787
  1199
    /* if this queue is filled completely we must signal overrun */
renatofilho@787
  1200
    if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
renatofilho@787
  1201
      GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
renatofilho@787
  1202
      filled = TRUE;
renatofilho@787
  1203
    }
renatofilho@787
  1204
  }
renatofilho@787
  1205
  /* no queues were empty */
renatofilho@787
  1206
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
  1207
renatofilho@787
  1208
  /* Overrun is always forwarded, since this is blocking the upstream element */
renatofilho@787
  1209
  if (filled) {
renatofilho@787
  1210
    GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
renatofilho@787
  1211
    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
renatofilho@787
  1212
  }
renatofilho@787
  1213
renatofilho@787
  1214
beach:
renatofilho@787
  1215
  return;
renatofilho@787
  1216
}
renatofilho@787
  1217
renatofilho@787
  1218
static void
renatofilho@787
  1219
single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
renatofilho@787
  1220
{
renatofilho@787
  1221
  gboolean empty = TRUE;
renatofilho@787
  1222
  GstMultiQueue *mq = sq->mqueue;
renatofilho@787
  1223
  GList *tmp;
renatofilho@787
  1224
renatofilho@787
  1225
  GST_LOG_OBJECT (mq,
renatofilho@787
  1226
      "Single Queue %d is empty, Checking other single queues", sq->id);
renatofilho@787
  1227
renatofilho@787
  1228
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
renatofilho@787
  1229
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
renatofilho@787
  1230
    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
renatofilho@787
  1231
renatofilho@787
  1232
    if (gst_data_queue_is_full (sq->queue)) {
renatofilho@787
  1233
      GstDataQueueSize size;
renatofilho@787
  1234
renatofilho@787
  1235
      gst_data_queue_get_level (sq->queue, &size);
renatofilho@787
  1236
      if (IS_FILLED (visible, size.visible)) {
renatofilho@787
  1237
        sq->max_size.visible++;
renatofilho@787
  1238
        GST_DEBUG_OBJECT (mq,
renatofilho@787
  1239
            "queue %d is filled, bumping its max visible to %d", sq->id,
renatofilho@787
  1240
            sq->max_size.visible);
renatofilho@787
  1241
        gst_data_queue_limits_changed (sq->queue);
renatofilho@787
  1242
      }
renatofilho@787
  1243
    }
renatofilho@787
  1244
    if (!gst_data_queue_is_empty (sq->queue))
renatofilho@787
  1245
      empty = FALSE;
renatofilho@787
  1246
  }
renatofilho@787
  1247
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
renatofilho@787
  1248
renatofilho@787
  1249
  if (empty) {
renatofilho@787
  1250
    GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
renatofilho@787
  1251
    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
renatofilho@787
  1252
  }
renatofilho@787
  1253
}
renatofilho@787
  1254
renatofilho@787
  1255
static gboolean
renatofilho@787
  1256
single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
renatofilho@787
  1257
    guint64 time, GstSingleQueue * sq)
renatofilho@787
  1258
{
renatofilho@787
  1259
  gboolean res;
renatofilho@787
  1260
renatofilho@787
  1261
  GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
renatofilho@787
  1262
      "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
renatofilho@787
  1263
      sq->max_size.bytes, sq->cur_time, sq->max_size.time);
renatofilho@787
  1264
renatofilho@787
  1265
  /* we are always filled on EOS */
renatofilho@787
  1266
  if (sq->is_eos)
renatofilho@787
  1267
    return TRUE;
renatofilho@787
  1268
renatofilho@787
  1269
  /* we never go past the max visible items */
renatofilho@787
  1270
  if (IS_FILLED (visible, visible))
renatofilho@787
  1271
    return TRUE;
renatofilho@787
  1272
renatofilho@787
  1273
  if (sq->cur_time != 0) {
renatofilho@787
  1274
    /* if we have valid time in the queue, check */
renatofilho@787
  1275
    res = IS_FILLED (time, sq->cur_time);
renatofilho@787
  1276
  } else {
renatofilho@787
  1277
    /* no valid time, check bytes */
renatofilho@787
  1278
    res = IS_FILLED (bytes, bytes);
renatofilho@787
  1279
  }
renatofilho@787
  1280
  return res;
renatofilho@787
  1281
}
renatofilho@787
  1282
renatofilho@787
  1283
static void
renatofilho@787
  1284
gst_single_queue_free (GstSingleQueue * sq)
renatofilho@787
  1285
{
renatofilho@787
  1286
  /* DRAIN QUEUE */
renatofilho@787
  1287
  gst_data_queue_flush (sq->queue);
renatofilho@787
  1288
  g_object_unref (sq->queue);
renatofilho@787
  1289
  g_cond_free (sq->turn);
renatofilho@787
  1290
  g_free (sq);
renatofilho@787
  1291
}
renatofilho@787
  1292
renatofilho@787
  1293
static GstSingleQueue *
renatofilho@787
  1294
gst_single_queue_new (GstMultiQueue * mqueue)
renatofilho@787
  1295
{
renatofilho@787
  1296
  GstSingleQueue *sq;
renatofilho@787
  1297
  gchar *tmp;
renatofilho@787
  1298
renatofilho@787
  1299
  sq = g_new0 (GstSingleQueue, 1);
renatofilho@787
  1300
renatofilho@787
  1301
  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
renatofilho@787
  1302
  sq->id = mqueue->nbqueues++;
renatofilho@787
  1303
renatofilho@787
  1304
  /* copy over max_size and extra_size so we don't need to take the lock
renatofilho@787
  1305
   * any longer when checking if the queue is full. */
renatofilho@787
  1306
  sq->max_size.visible = mqueue->max_size.visible;
renatofilho@787
  1307
  sq->max_size.bytes = mqueue->max_size.bytes;
renatofilho@787
  1308
  sq->max_size.time = mqueue->max_size.time;
renatofilho@787
  1309
renatofilho@787
  1310
  sq->extra_size.visible = mqueue->extra_size.visible;
renatofilho@787
  1311
  sq->extra_size.bytes = mqueue->extra_size.bytes;
renatofilho@787
  1312
  sq->extra_size.time = mqueue->extra_size.time;
renatofilho@787
  1313
renatofilho@787
  1314
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
renatofilho@787
  1315
renatofilho@787
  1316
  GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
renatofilho@787
  1317
renatofilho@787
  1318
  sq->mqueue = mqueue;
renatofilho@787
  1319
  sq->srcresult = GST_FLOW_WRONG_STATE;
renatofilho@787
  1320
  sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
renatofilho@787
  1321
      single_queue_check_full, sq);
renatofilho@787
  1322
  sq->is_eos = FALSE;
renatofilho@787
  1323
  gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
renatofilho@787
  1324
  gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
renatofilho@787
  1325
renatofilho@787
  1326
  sq->nextid = 0;
renatofilho@787
  1327
  sq->oldid = 0;
renatofilho@787
  1328
  sq->turn = g_cond_new ();
renatofilho@787
  1329
renatofilho@787
  1330
  /* attach to underrun/overrun signals to handle non-starvation  */
renatofilho@787
  1331
  g_signal_connect (G_OBJECT (sq->queue), "full",
renatofilho@787
  1332
      G_CALLBACK (single_queue_overrun_cb), sq);
renatofilho@787
  1333
  g_signal_connect (G_OBJECT (sq->queue), "empty",
renatofilho@787
  1334
      G_CALLBACK (single_queue_underrun_cb), sq);
renatofilho@787
  1335
renatofilho@787
  1336
  tmp = g_strdup_printf ("sink%d", sq->id);
renatofilho@787
  1337
  sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
renatofilho@787
  1338
  g_free (tmp);
renatofilho@787
  1339
renatofilho@787
  1340
  gst_pad_set_chain_function (sq->sinkpad,
renatofilho@787
  1341
      GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
renatofilho@787
  1342
  gst_pad_set_activatepush_function (sq->sinkpad,
renatofilho@787
  1343
      GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
renatofilho@787
  1344
  gst_pad_set_event_function (sq->sinkpad,
renatofilho@787
  1345
      GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
renatofilho@787
  1346
  gst_pad_set_getcaps_function (sq->sinkpad,
renatofilho@787
  1347
      GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
renatofilho@787
  1348
  gst_pad_set_bufferalloc_function (sq->sinkpad,
renatofilho@787
  1349
      GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
renatofilho@787
  1350
  gst_pad_set_internal_link_function (sq->sinkpad,
renatofilho@787
  1351
      GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
renatofilho@787
  1352
renatofilho@787
  1353
  tmp = g_strdup_printf ("src%d", sq->id);
renatofilho@787
  1354
  sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
renatofilho@787
  1355
  g_free (tmp);
renatofilho@787
  1356
renatofilho@787
  1357
  gst_pad_set_activatepush_function (sq->srcpad,
renatofilho@787
  1358
      GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
renatofilho@787
  1359
  gst_pad_set_acceptcaps_function (sq->srcpad,
renatofilho@787
  1360
      GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
renatofilho@787
  1361
  gst_pad_set_getcaps_function (sq->srcpad,
renatofilho@787
  1362
      GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
renatofilho@787
  1363
  gst_pad_set_event_function (sq->srcpad,
renatofilho@787
  1364
      GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
renatofilho@787
  1365
  gst_pad_set_query_function (sq->srcpad,
renatofilho@787
  1366
      GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
renatofilho@787
  1367
  gst_pad_set_internal_link_function (sq->srcpad,
renatofilho@787
  1368
      GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
renatofilho@787
  1369
renatofilho@787
  1370
  gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
renatofilho@787
  1371
  gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
renatofilho@787
  1372
renatofilho@787
  1373
  gst_pad_set_active (sq->srcpad, TRUE);
renatofilho@787
  1374
  gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
renatofilho@787
  1375
renatofilho@787
  1376
  gst_pad_set_active (sq->sinkpad, TRUE);
renatofilho@787
  1377
  gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
renatofilho@787
  1378
renatofilho@787
  1379
  GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
renatofilho@787
  1380
      sq->id);
renatofilho@787
  1381
renatofilho@787
  1382
  return sq;
renatofilho@787
  1383
}
renatofilho@787
  1384
renatofilho@787
  1385
static gboolean
renatofilho@787
  1386
plugin_init (GstPlugin * plugin)
renatofilho@787
  1387
{
renatofilho@787
  1388
  return gst_element_register (plugin, "multiqueue", GST_RANK_NONE,
renatofilho@787
  1389
      GST_TYPE_MULTI_QUEUE);
renatofilho@787
  1390
}
renatofilho@787
  1391
renatofilho@787
  1392
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
renatofilho@787
  1393
    GST_VERSION_MINOR,
renatofilho@787
  1394
    "multiqueue",
renatofilho@787
  1395
    "multiqueue", plugin_init, VERSION, GST_LICENSE,
renatofilho@787
  1396
    GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
renatofilho@787
  1397