renatofilho@787
|
1 |
/* GStreamer
|
renatofilho@787
|
2 |
* Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
|
renatofilho@787
|
3 |
*
|
renatofilho@787
|
4 |
* gstdataqueue.c:
|
renatofilho@787
|
5 |
*
|
renatofilho@787
|
6 |
* This library is free software; you can redistribute it and/or
|
renatofilho@787
|
7 |
* modify it under the terms of the GNU Library General Public
|
renatofilho@787
|
8 |
* License as published by the Free Software Foundation; either
|
renatofilho@787
|
9 |
* version 2 of the License, or (at your option) any later version.
|
renatofilho@787
|
10 |
*
|
renatofilho@787
|
11 |
* This library is distributed in the hope that it will be useful,
|
renatofilho@787
|
12 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
renatofilho@787
|
13 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
renatofilho@787
|
14 |
* Library General Public License for more details.
|
renatofilho@787
|
15 |
*
|
renatofilho@787
|
16 |
* You should have received a copy of the GNU Library General Public
|
renatofilho@787
|
17 |
* License along with this library; if not, write to the
|
renatofilho@787
|
18 |
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
renatofilho@787
|
19 |
* Boston, MA 02111-1307, USA.
|
renatofilho@787
|
20 |
*/
|
renatofilho@787
|
21 |
|
renatofilho@787
|
22 |
/**
|
renatofilho@787
|
23 |
* SECTION:gstdataqueue
|
renatofilho@787
|
24 |
* @short_description: Threadsafe queueing object
|
renatofilho@787
|
25 |
*
|
renatofilho@787
|
26 |
* #GstDataQueue is an object that handles threadsafe queueing of objects. It
|
renatofilho@787
|
27 |
* also provides size-related functionality. This object should be used for
|
renatofilho@787
|
28 |
* any #GstElement that wishes to provide some sort of queueing functionality.
|
renatofilho@787
|
29 |
*/
|
renatofilho@787
|
30 |
|
renatofilho@787
|
31 |
#include <gst/gst.h>
|
renatofilho@787
|
32 |
#include "gstdataqueue.h"
|
renatofilho@787
|
33 |
|
renatofilho@787
|
34 |
GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
|
renatofilho@787
|
35 |
#define GST_CAT_DEFAULT (data_queue_debug)
|
renatofilho@787
|
36 |
GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
|
renatofilho@787
|
37 |
|
renatofilho@787
|
38 |
|
renatofilho@787
|
39 |
/* Queue signals and args */
|
renatofilho@787
|
40 |
enum
|
renatofilho@787
|
41 |
{
|
renatofilho@787
|
42 |
SIGNAL_EMPTY,
|
renatofilho@787
|
43 |
SIGNAL_FULL,
|
renatofilho@787
|
44 |
LAST_SIGNAL
|
renatofilho@787
|
45 |
};
|
renatofilho@787
|
46 |
|
renatofilho@787
|
47 |
enum
|
renatofilho@787
|
48 |
{
|
renatofilho@787
|
49 |
ARG_0,
|
renatofilho@787
|
50 |
ARG_CUR_LEVEL_VISIBLE,
|
renatofilho@787
|
51 |
ARG_CUR_LEVEL_BYTES,
|
renatofilho@787
|
52 |
ARG_CUR_LEVEL_TIME
|
renatofilho@787
|
53 |
/* FILL ME */
|
renatofilho@787
|
54 |
};
|
renatofilho@787
|
55 |
|
renatofilho@787
|
56 |
#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
|
renatofilho@787
|
57 |
GST_CAT_LOG (data_queue_dataflow, \
|
renatofilho@787
|
58 |
"locking qlock from thread %p", \
|
renatofilho@787
|
59 |
g_thread_self ()); \
|
renatofilho@787
|
60 |
g_mutex_lock (q->qlock); \
|
renatofilho@787
|
61 |
GST_CAT_LOG (data_queue_dataflow, \
|
renatofilho@787
|
62 |
"locked qlock from thread %p", \
|
renatofilho@787
|
63 |
g_thread_self ()); \
|
renatofilho@787
|
64 |
} G_STMT_END
|
renatofilho@787
|
65 |
|
renatofilho@787
|
66 |
#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \
|
renatofilho@787
|
67 |
GST_DATA_QUEUE_MUTEX_LOCK (q); \
|
renatofilho@787
|
68 |
if (q->flushing) \
|
renatofilho@787
|
69 |
goto label; \
|
renatofilho@787
|
70 |
} G_STMT_END
|
renatofilho@787
|
71 |
|
renatofilho@787
|
72 |
#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
|
renatofilho@787
|
73 |
GST_CAT_LOG (data_queue_dataflow, \
|
renatofilho@787
|
74 |
"unlocking qlock from thread %p", \
|
renatofilho@787
|
75 |
g_thread_self ()); \
|
renatofilho@787
|
76 |
g_mutex_unlock (q->qlock); \
|
renatofilho@787
|
77 |
} G_STMT_END
|
renatofilho@787
|
78 |
|
renatofilho@787
|
79 |
#define STATUS(q, msg) \
|
renatofilho@787
|
80 |
GST_CAT_LOG (data_queue_dataflow, \
|
renatofilho@787
|
81 |
"queue:%p " msg ": %u visible items, %u " \
|
renatofilho@787
|
82 |
"bytes, %"G_GUINT64_FORMAT \
|
renatofilho@787
|
83 |
" ns, %u elements", \
|
renatofilho@787
|
84 |
queue, \
|
renatofilho@787
|
85 |
q->cur_level.visible, \
|
renatofilho@787
|
86 |
q->cur_level.bytes, \
|
renatofilho@787
|
87 |
q->cur_level.time, \
|
renatofilho@787
|
88 |
q->queue->length)
|
renatofilho@787
|
89 |
|
renatofilho@787
|
90 |
static void gst_data_queue_base_init (GstDataQueueClass * klass);
|
renatofilho@787
|
91 |
static void gst_data_queue_class_init (GstDataQueueClass * klass);
|
renatofilho@787
|
92 |
static void gst_data_queue_init (GstDataQueue * queue);
|
renatofilho@787
|
93 |
static void gst_data_queue_finalize (GObject * object);
|
renatofilho@787
|
94 |
|
renatofilho@787
|
95 |
static void gst_data_queue_set_property (GObject * object,
|
renatofilho@787
|
96 |
guint prop_id, const GValue * value, GParamSpec * pspec);
|
renatofilho@787
|
97 |
static void gst_data_queue_get_property (GObject * object,
|
renatofilho@787
|
98 |
guint prop_id, GValue * value, GParamSpec * pspec);
|
renatofilho@787
|
99 |
|
renatofilho@787
|
100 |
static GObjectClass *parent_class = NULL;
|
renatofilho@787
|
101 |
static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
|
renatofilho@787
|
102 |
|
renatofilho@787
|
103 |
GType
|
renatofilho@787
|
104 |
gst_data_queue_get_type (void)
|
renatofilho@787
|
105 |
{
|
renatofilho@787
|
106 |
static GType queue_type = 0;
|
renatofilho@787
|
107 |
|
renatofilho@787
|
108 |
if (!queue_type) {
|
renatofilho@787
|
109 |
static const GTypeInfo queue_info = {
|
renatofilho@787
|
110 |
sizeof (GstDataQueueClass),
|
renatofilho@787
|
111 |
(GBaseInitFunc) gst_data_queue_base_init,
|
renatofilho@787
|
112 |
NULL,
|
renatofilho@787
|
113 |
(GClassInitFunc) gst_data_queue_class_init,
|
renatofilho@787
|
114 |
NULL,
|
renatofilho@787
|
115 |
NULL,
|
renatofilho@787
|
116 |
sizeof (GstDataQueue),
|
renatofilho@787
|
117 |
0,
|
renatofilho@787
|
118 |
(GInstanceInitFunc) gst_data_queue_init,
|
renatofilho@787
|
119 |
NULL
|
renatofilho@787
|
120 |
};
|
renatofilho@787
|
121 |
|
renatofilho@787
|
122 |
queue_type = g_type_register_static (G_TYPE_OBJECT,
|
renatofilho@787
|
123 |
"GstDataQueue", &queue_info, 0);
|
renatofilho@787
|
124 |
GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0,
|
renatofilho@787
|
125 |
"data queue object");
|
renatofilho@787
|
126 |
GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0,
|
renatofilho@787
|
127 |
"dataflow inside the data queue object");
|
renatofilho@787
|
128 |
}
|
renatofilho@787
|
129 |
|
renatofilho@787
|
130 |
return queue_type;
|
renatofilho@787
|
131 |
}
|
renatofilho@787
|
132 |
|
renatofilho@787
|
133 |
static void
|
renatofilho@787
|
134 |
gst_data_queue_base_init (GstDataQueueClass * klass)
|
renatofilho@787
|
135 |
{
|
renatofilho@787
|
136 |
/* Do we need anything here ?? */
|
renatofilho@787
|
137 |
return;
|
renatofilho@787
|
138 |
}
|
renatofilho@787
|
139 |
|
renatofilho@787
|
140 |
static void
|
renatofilho@787
|
141 |
gst_data_queue_class_init (GstDataQueueClass * klass)
|
renatofilho@787
|
142 |
{
|
renatofilho@787
|
143 |
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
|
renatofilho@787
|
144 |
|
renatofilho@787
|
145 |
parent_class = g_type_class_peek_parent (klass);
|
renatofilho@787
|
146 |
|
renatofilho@787
|
147 |
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property);
|
renatofilho@787
|
148 |
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property);
|
renatofilho@787
|
149 |
|
renatofilho@787
|
150 |
/* signals */
|
renatofilho@787
|
151 |
/**
|
renatofilho@787
|
152 |
* GstDataQueue::empty:
|
renatofilho@787
|
153 |
* @queue: the queue instance
|
renatofilho@787
|
154 |
*
|
renatofilho@787
|
155 |
* Reports that the queue became empty (empty).
|
renatofilho@787
|
156 |
* A queue is empty if the total amount of visible items inside it (num-visible, time,
|
renatofilho@787
|
157 |
* size) is lower than the boundary values which can be set through the GObject
|
renatofilho@787
|
158 |
* properties.
|
renatofilho@787
|
159 |
*/
|
renatofilho@787
|
160 |
gst_data_queue_signals[SIGNAL_EMPTY] =
|
renatofilho@787
|
161 |
g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
|
renatofilho@787
|
162 |
G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
|
renatofilho@787
|
163 |
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
|
renatofilho@787
|
164 |
|
renatofilho@787
|
165 |
/**
|
renatofilho@787
|
166 |
* GstDataQueue::full:
|
renatofilho@787
|
167 |
* @queue: the queue instance
|
renatofilho@787
|
168 |
*
|
renatofilho@787
|
169 |
* Reports that the queue became full (full).
|
renatofilho@787
|
170 |
* A queue is full if the total amount of data inside it (num-visible, time,
|
renatofilho@787
|
171 |
* size) is higher than the boundary values which can be set through the GObject
|
renatofilho@787
|
172 |
* properties.
|
renatofilho@787
|
173 |
*/
|
renatofilho@787
|
174 |
gst_data_queue_signals[SIGNAL_FULL] =
|
renatofilho@787
|
175 |
g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
|
renatofilho@787
|
176 |
G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
|
renatofilho@787
|
177 |
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
|
renatofilho@787
|
178 |
|
renatofilho@787
|
179 |
/* properties */
|
renatofilho@787
|
180 |
g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
|
renatofilho@787
|
181 |
g_param_spec_uint ("current-level-bytes", "Current level (kB)",
|
renatofilho@787
|
182 |
"Current amount of data in the queue (bytes)",
|
renatofilho@787
|
183 |
0, G_MAXUINT, 0, G_PARAM_READABLE));
|
renatofilho@787
|
184 |
g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
|
renatofilho@787
|
185 |
g_param_spec_uint ("current-level-visible",
|
renatofilho@787
|
186 |
"Current level (visible items)",
|
renatofilho@787
|
187 |
"Current number of visible items in the queue", 0, G_MAXUINT, 0,
|
renatofilho@787
|
188 |
G_PARAM_READABLE));
|
renatofilho@787
|
189 |
g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
|
renatofilho@787
|
190 |
g_param_spec_uint64 ("current-level-time", "Current level (ns)",
|
renatofilho@787
|
191 |
"Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
|
renatofilho@787
|
192 |
G_PARAM_READABLE));
|
renatofilho@787
|
193 |
|
renatofilho@787
|
194 |
/* set several parent class virtual functions */
|
renatofilho@787
|
195 |
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize);
|
renatofilho@787
|
196 |
|
renatofilho@787
|
197 |
}
|
renatofilho@787
|
198 |
|
renatofilho@787
|
199 |
static void
|
renatofilho@787
|
200 |
gst_data_queue_init (GstDataQueue * queue)
|
renatofilho@787
|
201 |
{
|
renatofilho@787
|
202 |
queue->cur_level.visible = 0; /* no content */
|
renatofilho@787
|
203 |
queue->cur_level.bytes = 0; /* no content */
|
renatofilho@787
|
204 |
queue->cur_level.time = 0; /* no content */
|
renatofilho@787
|
205 |
|
renatofilho@787
|
206 |
queue->checkfull = NULL;
|
renatofilho@787
|
207 |
|
renatofilho@787
|
208 |
queue->qlock = g_mutex_new ();
|
renatofilho@787
|
209 |
queue->item_add = g_cond_new ();
|
renatofilho@787
|
210 |
queue->item_del = g_cond_new ();
|
renatofilho@787
|
211 |
queue->queue = g_queue_new ();
|
renatofilho@787
|
212 |
|
renatofilho@787
|
213 |
GST_DEBUG ("initialized queue's not_empty & not_full conditions");
|
renatofilho@787
|
214 |
}
|
renatofilho@787
|
215 |
|
renatofilho@787
|
216 |
/**
|
renatofilho@787
|
217 |
* gst_data_queue_new:
|
renatofilho@787
|
218 |
* @checkfull: the callback used to tell if the element considers the queue full
|
renatofilho@787
|
219 |
* or not.
|
renatofilho@787
|
220 |
* @checkdata: a #gpointer that will be given in the @checkfull callback.
|
renatofilho@787
|
221 |
*
|
renatofilho@787
|
222 |
* Returns: a new #GstDataQueue.
|
renatofilho@787
|
223 |
*/
|
renatofilho@787
|
224 |
|
renatofilho@787
|
225 |
GstDataQueue *
|
renatofilho@787
|
226 |
gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
|
renatofilho@787
|
227 |
{
|
renatofilho@787
|
228 |
GstDataQueue *ret;
|
renatofilho@787
|
229 |
|
renatofilho@787
|
230 |
g_return_val_if_fail (checkfull != NULL, NULL);
|
renatofilho@787
|
231 |
|
renatofilho@787
|
232 |
ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
|
renatofilho@787
|
233 |
ret->checkfull = checkfull;
|
renatofilho@787
|
234 |
ret->checkdata = checkdata;
|
renatofilho@787
|
235 |
|
renatofilho@787
|
236 |
return ret;
|
renatofilho@787
|
237 |
}
|
renatofilho@787
|
238 |
|
renatofilho@787
|
239 |
static void
|
renatofilho@787
|
240 |
gst_data_queue_cleanup (GstDataQueue * queue)
|
renatofilho@787
|
241 |
{
|
renatofilho@787
|
242 |
while (!g_queue_is_empty (queue->queue)) {
|
renatofilho@787
|
243 |
GstDataQueueItem *item = g_queue_pop_head (queue->queue);
|
renatofilho@787
|
244 |
|
renatofilho@787
|
245 |
/* Just call the destroy notify on the item */
|
renatofilho@787
|
246 |
item->destroy (item);
|
renatofilho@787
|
247 |
}
|
renatofilho@787
|
248 |
queue->cur_level.visible = 0;
|
renatofilho@787
|
249 |
queue->cur_level.bytes = 0;
|
renatofilho@787
|
250 |
queue->cur_level.time = 0;
|
renatofilho@787
|
251 |
}
|
renatofilho@787
|
252 |
|
renatofilho@787
|
253 |
/* called only once, as opposed to dispose */
|
renatofilho@787
|
254 |
static void
|
renatofilho@787
|
255 |
gst_data_queue_finalize (GObject * object)
|
renatofilho@787
|
256 |
{
|
renatofilho@787
|
257 |
GstDataQueue *queue = GST_DATA_QUEUE (object);
|
renatofilho@787
|
258 |
|
renatofilho@787
|
259 |
GST_DEBUG ("finalizing queue");
|
renatofilho@787
|
260 |
|
renatofilho@787
|
261 |
gst_data_queue_cleanup (queue);
|
renatofilho@787
|
262 |
g_queue_free (queue->queue);
|
renatofilho@787
|
263 |
|
renatofilho@787
|
264 |
GST_DEBUG ("free mutex");
|
renatofilho@787
|
265 |
g_mutex_free (queue->qlock);
|
renatofilho@787
|
266 |
GST_DEBUG ("done free mutex");
|
renatofilho@787
|
267 |
|
renatofilho@787
|
268 |
g_cond_free (queue->item_add);
|
renatofilho@787
|
269 |
g_cond_free (queue->item_del);
|
renatofilho@787
|
270 |
|
renatofilho@787
|
271 |
G_OBJECT_CLASS (parent_class)->finalize (object);
|
renatofilho@787
|
272 |
}
|
renatofilho@787
|
273 |
|
renatofilho@787
|
274 |
static void
|
renatofilho@787
|
275 |
gst_data_queue_locked_flush (GstDataQueue * queue)
|
renatofilho@787
|
276 |
{
|
renatofilho@787
|
277 |
STATUS (queue, "before flushing");
|
renatofilho@787
|
278 |
gst_data_queue_cleanup (queue);
|
renatofilho@787
|
279 |
STATUS (queue, "after flushing");
|
renatofilho@787
|
280 |
/* we deleted something... */
|
renatofilho@787
|
281 |
g_cond_signal (queue->item_del);
|
renatofilho@787
|
282 |
}
|
renatofilho@787
|
283 |
|
renatofilho@787
|
284 |
static gboolean
|
renatofilho@787
|
285 |
gst_data_queue_locked_is_empty (GstDataQueue * queue)
|
renatofilho@787
|
286 |
{
|
renatofilho@787
|
287 |
return (queue->queue->length == 0);
|
renatofilho@787
|
288 |
}
|
renatofilho@787
|
289 |
|
renatofilho@787
|
290 |
static gboolean
|
renatofilho@787
|
291 |
gst_data_queue_locked_is_full (GstDataQueue * queue)
|
renatofilho@787
|
292 |
{
|
renatofilho@787
|
293 |
return queue->checkfull (queue, queue->cur_level.visible,
|
renatofilho@787
|
294 |
queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
|
renatofilho@787
|
295 |
}
|
renatofilho@787
|
296 |
|
renatofilho@787
|
297 |
/**
|
renatofilho@787
|
298 |
* gst_data_queue_flush:
|
renatofilho@787
|
299 |
* @queue: a #GstDataQueue.
|
renatofilho@787
|
300 |
*
|
renatofilho@787
|
301 |
* Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and
|
renatofilho@787
|
302 |
* #gst_data_queue_pop will be released.
|
renatofilho@787
|
303 |
* MT safe.
|
renatofilho@787
|
304 |
*/
|
renatofilho@787
|
305 |
void
|
renatofilho@787
|
306 |
gst_data_queue_flush (GstDataQueue * queue)
|
renatofilho@787
|
307 |
{
|
renatofilho@787
|
308 |
GST_DEBUG ("queue:%p", queue);
|
renatofilho@787
|
309 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
310 |
gst_data_queue_locked_flush (queue);
|
renatofilho@787
|
311 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
312 |
}
|
renatofilho@787
|
313 |
|
renatofilho@787
|
314 |
/**
|
renatofilho@787
|
315 |
* gst_data_queue_is_empty:
|
renatofilho@787
|
316 |
* @queue: a #GstDataQueue.
|
renatofilho@787
|
317 |
*
|
renatofilho@787
|
318 |
* Queries if there are any items in the @queue.
|
renatofilho@787
|
319 |
* MT safe.
|
renatofilho@787
|
320 |
*
|
renatofilho@787
|
321 |
* Returns: #TRUE if @queue is empty.
|
renatofilho@787
|
322 |
*/
|
renatofilho@787
|
323 |
gboolean
|
renatofilho@787
|
324 |
gst_data_queue_is_empty (GstDataQueue * queue)
|
renatofilho@787
|
325 |
{
|
renatofilho@787
|
326 |
gboolean res;
|
renatofilho@787
|
327 |
|
renatofilho@787
|
328 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
329 |
res = gst_data_queue_locked_is_empty (queue);
|
renatofilho@787
|
330 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
331 |
|
renatofilho@787
|
332 |
return res;
|
renatofilho@787
|
333 |
}
|
renatofilho@787
|
334 |
|
renatofilho@787
|
335 |
/**
|
renatofilho@787
|
336 |
* gst_data_queue_is_full:
|
renatofilho@787
|
337 |
* @queue: a #GstDataQueue.
|
renatofilho@787
|
338 |
*
|
renatofilho@787
|
339 |
* Queries if @queue is full. This check will be done using the
|
renatofilho@787
|
340 |
* #GstDataQueueCheckFullCallback registered with @queue.
|
renatofilho@787
|
341 |
* MT safe.
|
renatofilho@787
|
342 |
*
|
renatofilho@787
|
343 |
* Returns: #TRUE if @queue is full.
|
renatofilho@787
|
344 |
*/
|
renatofilho@787
|
345 |
gboolean
|
renatofilho@787
|
346 |
gst_data_queue_is_full (GstDataQueue * queue)
|
renatofilho@787
|
347 |
{
|
renatofilho@787
|
348 |
gboolean res;
|
renatofilho@787
|
349 |
|
renatofilho@787
|
350 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
351 |
res = gst_data_queue_locked_is_full (queue);
|
renatofilho@787
|
352 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
353 |
|
renatofilho@787
|
354 |
return res;
|
renatofilho@787
|
355 |
}
|
renatofilho@787
|
356 |
|
renatofilho@787
|
357 |
/**
|
renatofilho@787
|
358 |
* gst_data_queue_set_flushing:
|
renatofilho@787
|
359 |
* @queue: a #GstDataQueue.
|
renatofilho@787
|
360 |
* @flushing: a #gboolean stating if the queue will be flushing or not.
|
renatofilho@787
|
361 |
*
|
renatofilho@787
|
362 |
* Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
|
renatofilho@787
|
363 |
* state, any incoming data on the @queue will be discarded. Any call currently
|
renatofilho@787
|
364 |
* blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
|
renatofilho@787
|
365 |
* away with a return value of #FALSE. While the @queue is in flushing state,
|
renatofilho@787
|
366 |
* all calls to those two functions will return #FALSE.
|
renatofilho@787
|
367 |
*
|
renatofilho@787
|
368 |
* MT Safe.
|
renatofilho@787
|
369 |
*/
|
renatofilho@787
|
370 |
void
|
renatofilho@787
|
371 |
gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
|
renatofilho@787
|
372 |
{
|
renatofilho@787
|
373 |
GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
|
renatofilho@787
|
374 |
|
renatofilho@787
|
375 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
376 |
queue->flushing = flushing;
|
renatofilho@787
|
377 |
if (flushing) {
|
renatofilho@787
|
378 |
/* release push/pop functions */
|
renatofilho@787
|
379 |
g_cond_signal (queue->item_add);
|
renatofilho@787
|
380 |
g_cond_signal (queue->item_del);
|
renatofilho@787
|
381 |
}
|
renatofilho@787
|
382 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
383 |
}
|
renatofilho@787
|
384 |
|
renatofilho@787
|
385 |
/**
|
renatofilho@787
|
386 |
* gst_data_queue_push:
|
renatofilho@787
|
387 |
* @queue: a #GstDataQueue.
|
renatofilho@787
|
388 |
* @item: a #GstDataQueueItem.
|
renatofilho@787
|
389 |
*
|
renatofilho@787
|
390 |
* Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
|
renatofilho@787
|
391 |
* on the @queue. If the @queue is full, the call will block until space is
|
renatofilho@787
|
392 |
* available, OR the @queue is set to flushing state.
|
renatofilho@787
|
393 |
* MT safe.
|
renatofilho@787
|
394 |
*
|
renatofilho@787
|
395 |
* Note that this function has slightly different semantics than gst_pad_push()
|
renatofilho@787
|
396 |
* and gst_pad_push_event(): this function only takes ownership of @item and
|
renatofilho@787
|
397 |
* the #GstMiniObject contained in @item if the push was successful. If FALSE
|
renatofilho@787
|
398 |
* is returned, the caller is responsible for freeing @item and its contents.
|
renatofilho@787
|
399 |
*
|
renatofilho@787
|
400 |
* Returns: #TRUE if the @item was successfully pushed on the @queue.
|
renatofilho@787
|
401 |
*/
|
renatofilho@787
|
402 |
gboolean
|
renatofilho@787
|
403 |
gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
|
renatofilho@787
|
404 |
{
|
renatofilho@787
|
405 |
g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
|
renatofilho@787
|
406 |
g_return_val_if_fail (item != NULL, FALSE);
|
renatofilho@787
|
407 |
|
renatofilho@787
|
408 |
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
|
renatofilho@787
|
409 |
|
renatofilho@787
|
410 |
STATUS (queue, "before pushing");
|
renatofilho@787
|
411 |
|
renatofilho@787
|
412 |
/* We ALWAYS need to check for queue fillness */
|
renatofilho@787
|
413 |
if (gst_data_queue_locked_is_full (queue)) {
|
renatofilho@787
|
414 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
415 |
g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
|
renatofilho@787
|
416 |
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
|
renatofilho@787
|
417 |
|
renatofilho@787
|
418 |
/* signal might have removed some items */
|
renatofilho@787
|
419 |
while (gst_data_queue_locked_is_full (queue)) {
|
renatofilho@787
|
420 |
g_cond_wait (queue->item_del, queue->qlock);
|
renatofilho@787
|
421 |
if (queue->flushing)
|
renatofilho@787
|
422 |
goto flushing;
|
renatofilho@787
|
423 |
}
|
renatofilho@787
|
424 |
}
|
renatofilho@787
|
425 |
|
renatofilho@787
|
426 |
g_queue_push_tail (queue->queue, item);
|
renatofilho@787
|
427 |
|
renatofilho@787
|
428 |
if (item->visible)
|
renatofilho@787
|
429 |
queue->cur_level.visible++;
|
renatofilho@787
|
430 |
queue->cur_level.bytes += item->size;
|
renatofilho@787
|
431 |
queue->cur_level.time += item->duration;
|
renatofilho@787
|
432 |
|
renatofilho@787
|
433 |
STATUS (queue, "after pushing");
|
renatofilho@787
|
434 |
g_cond_signal (queue->item_add);
|
renatofilho@787
|
435 |
|
renatofilho@787
|
436 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
437 |
|
renatofilho@787
|
438 |
return TRUE;
|
renatofilho@787
|
439 |
|
renatofilho@787
|
440 |
/* ERRORS */
|
renatofilho@787
|
441 |
flushing:
|
renatofilho@787
|
442 |
{
|
renatofilho@787
|
443 |
GST_DEBUG ("queue:%p, we are flushing", queue);
|
renatofilho@787
|
444 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
445 |
return FALSE;
|
renatofilho@787
|
446 |
}
|
renatofilho@787
|
447 |
}
|
renatofilho@787
|
448 |
|
renatofilho@787
|
449 |
/**
|
renatofilho@787
|
450 |
* gst_data_queue_pop:
|
renatofilho@787
|
451 |
* @queue: a #GstDataQueue.
|
renatofilho@787
|
452 |
* @item: pointer to store the returned #GstDataQueueItem.
|
renatofilho@787
|
453 |
*
|
renatofilho@787
|
454 |
* Retrieves the first @item available on the @queue. If the queue is currently
|
renatofilho@787
|
455 |
* empty, the call will block until at least one item is available, OR the
|
renatofilho@787
|
456 |
* @queue is set to the flushing state.
|
renatofilho@787
|
457 |
* MT safe.
|
renatofilho@787
|
458 |
*
|
renatofilho@787
|
459 |
* Returns: #TRUE if an @item was successfully retrieved from the @queue.
|
renatofilho@787
|
460 |
*/
|
renatofilho@787
|
461 |
gboolean
|
renatofilho@787
|
462 |
gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
|
renatofilho@787
|
463 |
{
|
renatofilho@787
|
464 |
g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
|
renatofilho@787
|
465 |
g_return_val_if_fail (item != NULL, FALSE);
|
renatofilho@787
|
466 |
|
renatofilho@787
|
467 |
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
|
renatofilho@787
|
468 |
|
renatofilho@787
|
469 |
STATUS (queue, "before popping");
|
renatofilho@787
|
470 |
|
renatofilho@787
|
471 |
if (gst_data_queue_locked_is_empty (queue)) {
|
renatofilho@787
|
472 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
473 |
g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
|
renatofilho@787
|
474 |
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
|
renatofilho@787
|
475 |
|
renatofilho@787
|
476 |
while (gst_data_queue_locked_is_empty (queue)) {
|
renatofilho@787
|
477 |
g_cond_wait (queue->item_add, queue->qlock);
|
renatofilho@787
|
478 |
if (queue->flushing)
|
renatofilho@787
|
479 |
goto flushing;
|
renatofilho@787
|
480 |
}
|
renatofilho@787
|
481 |
}
|
renatofilho@787
|
482 |
|
renatofilho@787
|
483 |
/* Get the item from the GQueue */
|
renatofilho@787
|
484 |
*item = g_queue_pop_head (queue->queue);
|
renatofilho@787
|
485 |
|
renatofilho@787
|
486 |
/* update current level counter */
|
renatofilho@787
|
487 |
if ((*item)->visible)
|
renatofilho@787
|
488 |
queue->cur_level.visible--;
|
renatofilho@787
|
489 |
queue->cur_level.bytes -= (*item)->size;
|
renatofilho@787
|
490 |
queue->cur_level.time -= (*item)->duration;
|
renatofilho@787
|
491 |
|
renatofilho@787
|
492 |
STATUS (queue, "after popping");
|
renatofilho@787
|
493 |
g_cond_signal (queue->item_del);
|
renatofilho@787
|
494 |
|
renatofilho@787
|
495 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
496 |
|
renatofilho@787
|
497 |
return TRUE;
|
renatofilho@787
|
498 |
|
renatofilho@787
|
499 |
/* ERRORS */
|
renatofilho@787
|
500 |
flushing:
|
renatofilho@787
|
501 |
{
|
renatofilho@787
|
502 |
GST_DEBUG ("queue:%p, we are flushing", queue);
|
renatofilho@787
|
503 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
504 |
return FALSE;
|
renatofilho@787
|
505 |
}
|
renatofilho@787
|
506 |
}
|
renatofilho@787
|
507 |
|
renatofilho@787
|
508 |
/**
|
renatofilho@787
|
509 |
* gst_data_queue_drop_head:
|
renatofilho@787
|
510 |
* @queue: The #GstDataQueue to drop an item from.
|
renatofilho@787
|
511 |
* @type: The #GType of the item to drop.
|
renatofilho@787
|
512 |
*
|
renatofilho@787
|
513 |
* Pop and unref the head-most #GstMiniObject with the given #GType.
|
renatofilho@787
|
514 |
*
|
renatofilho@787
|
515 |
* Returns: TRUE if an element was removed.
|
renatofilho@787
|
516 |
*/
|
renatofilho@787
|
517 |
gboolean
|
renatofilho@787
|
518 |
gst_data_queue_drop_head (GstDataQueue * queue, GType type)
|
renatofilho@787
|
519 |
{
|
renatofilho@787
|
520 |
gboolean res = FALSE;
|
renatofilho@787
|
521 |
GList *item;
|
renatofilho@787
|
522 |
GstDataQueueItem *leak = NULL;
|
renatofilho@787
|
523 |
|
renatofilho@787
|
524 |
g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
|
renatofilho@787
|
525 |
|
renatofilho@787
|
526 |
GST_DEBUG ("queue:%p", queue);
|
renatofilho@787
|
527 |
|
renatofilho@787
|
528 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
529 |
for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
|
renatofilho@787
|
530 |
GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
|
renatofilho@787
|
531 |
|
renatofilho@787
|
532 |
if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
|
renatofilho@787
|
533 |
leak = tmp;
|
renatofilho@787
|
534 |
break;
|
renatofilho@787
|
535 |
}
|
renatofilho@787
|
536 |
}
|
renatofilho@787
|
537 |
|
renatofilho@787
|
538 |
if (!leak)
|
renatofilho@787
|
539 |
goto done;
|
renatofilho@787
|
540 |
|
renatofilho@787
|
541 |
g_queue_delete_link (queue->queue, item);
|
renatofilho@787
|
542 |
|
renatofilho@787
|
543 |
if (leak->visible)
|
renatofilho@787
|
544 |
queue->cur_level.visible--;
|
renatofilho@787
|
545 |
queue->cur_level.bytes -= leak->size;
|
renatofilho@787
|
546 |
queue->cur_level.time -= leak->duration;
|
renatofilho@787
|
547 |
|
renatofilho@787
|
548 |
leak->destroy (leak);
|
renatofilho@787
|
549 |
|
renatofilho@787
|
550 |
res = TRUE;
|
renatofilho@787
|
551 |
|
renatofilho@787
|
552 |
done:
|
renatofilho@787
|
553 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
554 |
|
renatofilho@787
|
555 |
GST_DEBUG ("queue:%p , res:%d", queue, res);
|
renatofilho@787
|
556 |
|
renatofilho@787
|
557 |
return res;
|
renatofilho@787
|
558 |
}
|
renatofilho@787
|
559 |
|
renatofilho@787
|
560 |
/**
|
renatofilho@787
|
561 |
* gst_data_queue_limits_changed:
|
renatofilho@787
|
562 |
* @queue: The #GstDataQueue
|
renatofilho@787
|
563 |
*
|
renatofilho@787
|
564 |
* Inform the queue that the limits for the fullness check have changed and that
|
renatofilho@787
|
565 |
* any blocking gst_data_queue_push() should be unblocked to recheck the limts.
|
renatofilho@787
|
566 |
*/
|
renatofilho@787
|
567 |
void
|
renatofilho@787
|
568 |
gst_data_queue_limits_changed (GstDataQueue * queue)
|
renatofilho@787
|
569 |
{
|
renatofilho@787
|
570 |
g_return_if_fail (GST_IS_DATA_QUEUE (queue));
|
renatofilho@787
|
571 |
|
renatofilho@787
|
572 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
573 |
GST_DEBUG ("signal del");
|
renatofilho@787
|
574 |
g_cond_signal (queue->item_del);
|
renatofilho@787
|
575 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
576 |
}
|
renatofilho@787
|
577 |
|
renatofilho@787
|
578 |
/**
|
renatofilho@787
|
579 |
* gst_data_queue_get_level:
|
renatofilho@787
|
580 |
* @queue: The #GstDataQueue
|
renatofilho@787
|
581 |
* @level: the location to store the result
|
renatofilho@787
|
582 |
*
|
renatofilho@787
|
583 |
* Get the current level of the queue.
|
renatofilho@787
|
584 |
*/
|
renatofilho@787
|
585 |
void
|
renatofilho@787
|
586 |
gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
|
renatofilho@787
|
587 |
{
|
renatofilho@787
|
588 |
level->visible = queue->cur_level.visible;
|
renatofilho@787
|
589 |
level->bytes = queue->cur_level.bytes;
|
renatofilho@787
|
590 |
level->time = queue->cur_level.time;
|
renatofilho@787
|
591 |
}
|
renatofilho@787
|
592 |
|
renatofilho@787
|
593 |
static void
|
renatofilho@787
|
594 |
gst_data_queue_set_property (GObject * object,
|
renatofilho@787
|
595 |
guint prop_id, const GValue * value, GParamSpec * pspec)
|
renatofilho@787
|
596 |
{
|
renatofilho@787
|
597 |
switch (prop_id) {
|
renatofilho@787
|
598 |
default:
|
renatofilho@787
|
599 |
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
renatofilho@787
|
600 |
break;
|
renatofilho@787
|
601 |
}
|
renatofilho@787
|
602 |
}
|
renatofilho@787
|
603 |
|
renatofilho@787
|
604 |
static void
|
renatofilho@787
|
605 |
gst_data_queue_get_property (GObject * object,
|
renatofilho@787
|
606 |
guint prop_id, GValue * value, GParamSpec * pspec)
|
renatofilho@787
|
607 |
{
|
renatofilho@787
|
608 |
GstDataQueue *queue = GST_DATA_QUEUE (object);
|
renatofilho@787
|
609 |
|
renatofilho@787
|
610 |
GST_DATA_QUEUE_MUTEX_LOCK (queue);
|
renatofilho@787
|
611 |
|
renatofilho@787
|
612 |
switch (prop_id) {
|
renatofilho@787
|
613 |
case ARG_CUR_LEVEL_BYTES:
|
renatofilho@787
|
614 |
g_value_set_uint (value, queue->cur_level.bytes);
|
renatofilho@787
|
615 |
break;
|
renatofilho@787
|
616 |
case ARG_CUR_LEVEL_VISIBLE:
|
renatofilho@787
|
617 |
g_value_set_uint (value, queue->cur_level.visible);
|
renatofilho@787
|
618 |
break;
|
renatofilho@787
|
619 |
case ARG_CUR_LEVEL_TIME:
|
renatofilho@787
|
620 |
g_value_set_uint64 (value, queue->cur_level.time);
|
renatofilho@787
|
621 |
break;
|
renatofilho@787
|
622 |
default:
|
renatofilho@787
|
623 |
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
renatofilho@787
|
624 |
break;
|
renatofilho@787
|
625 |
}
|
renatofilho@787
|
626 |
|
renatofilho@787
|
627 |
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
|
renatofilho@787
|
628 |
}
|