# HG changeset patch # User renatofilho # Date 1184430054 -3600 # Node ID e42706ada2313b48da2321d548a75bbb05544f63 # Parent a4529d0f8ede3f6157fcdabd4dfadff4b2f8f6b1 [svn r793] -created playbinmaemo element; autoplug for nokia devices diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/Makefile.am --- a/gst-gmyth/Makefile.am Thu Jul 05 13:43:24 2007 +0100 +++ b/gst-gmyth/Makefile.am Sat Jul 14 17:20:54 2007 +0100 @@ -1,9 +1,15 @@ SUBDIRS = \ mythsrc \ nuvdemux \ - concatmux + concatmux \ + playbinmaemo \ + decodebin2 \ + multiqueue DIST_SUBDIRS = \ mythsrc \ nuvdemux \ - concatmux + concatmux \ + playbinmaemo \ + decodebin2 \ + multiqueue diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/configure.ac --- a/gst-gmyth/configure.ac Thu Jul 05 13:43:24 2007 +0100 +++ b/gst-gmyth/configure.ac Sat Jul 14 17:20:54 2007 +0100 @@ -120,6 +120,14 @@ AC_SUBST(GMYTH_CFLAGS) AC_SUBST(GMYTH_LIBS) +PKG_CHECK_MODULES(X11, x11, HAVE_X11=yes,HAVE_X11=no) +if test "x$HAVE_X11" = "xno"; then + AC_MSG_ERROR(you need x11-dev installed) +fi +AC_SUBST(X11_CFLAGS) +AC_SUBST(X11_LIBS) + + dnl set the plugindir where plugins should be installed plugindir="\$(libdir)/gstreamer-$GST_MAJORMINOR" AC_SUBST(plugindir) @@ -136,6 +144,9 @@ Makefile \ nuvdemux/Makefile \ mythsrc/Makefile \ - concatmux/Makefile + concatmux/Makefile \ + playbinmaemo/Makefile \ + decodebin2/Makefile \ + multiqueue/Makefile ) diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/decodebin2/Makefile.am --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/decodebin2/Makefile.am Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,23 @@ +plugin_LTLIBRARIES = libgstdecodebin2.la + +libgstdecodebin2_la_SOURCES = \ + gstdecodebin2.c \ + gstplay-marshal.c + +libgstdecodebin2_la_CFLAGS = \ + $(GST_CFLAGS) \ + $(GST_BASE_CFLAGS) \ + $(GST_PLUGINS_BASE_CFLAGS) + +libgstdecodebin2_la_LIBADD = \ + $(GST_LIBS_LIBS) + +libgstdecodebin2_la_LDFLAGS = \ + $(GST_LIBS) \ + $(GST_PLUGIN_LDFLAGS) \ + $(GST_BASE_LIBS) \ + $(GST_PLUGINS_BASE_LIBS) + +noinst_HEADERS = \ + gstplay-marshal.h + diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/decodebin2/gstdecodebin2.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/decodebin2/gstdecodebin2.c Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,2111 @@ +/* GStreamer + * Copyright (C) <2006> Edward Hervey + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-decodebin2 + * @short_description: Next-generation automatic decoding bin + * + * #GstBin that auto-magically constructs a decoding pipeline using available + * decoders and demuxers via auto-plugging. + * + * At this stage, decodebin2 is considered UNSTABLE. The API provided in the + * signals is expected to change in the near future. + * + * To try out decodebin2, you can set the USE_DECODEBIN2 environment + * variable (USE_DECODEBIN2=1 for example). This will cause playbin to use + * decodebin2 instead of the older decodebin for its internal auto-plugging. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#include "gstplay-marshal.h" + +/* generic templates */ +static GstStaticPadTemplate decoder_bin_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate decoder_bin_src_template = +GST_STATIC_PAD_TEMPLATE ("src%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (gst_decode_bin_debug); +#define GST_CAT_DEFAULT gst_decode_bin_debug + +typedef struct _GstDecodeGroup GstDecodeGroup; +typedef struct _GstDecodePad GstDecodePad; +typedef struct _GstDecodeBin GstDecodeBin; +typedef struct _GstDecodeBin GstDecodeBin2; +typedef struct _GstDecodeBinClass GstDecodeBinClass; + +#define GST_TYPE_DECODE_BIN (gst_decode_bin_get_type()) +#define GST_DECODE_BIN_CAST(obj) ((GstDecodeBin*)(obj)) +#define GST_DECODE_BIN(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_DECODE_BIN,GstDecodeBin)) +#define GST_DECODE_BIN_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_DECODE_BIN,GstDecodeBinClass)) +#define GST_IS_DECODE_BIN(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_DECODE_BIN)) +#define GST_IS_DECODE_BIN_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_DECODE_BIN)) + +/** + * GstDecodeBin2: + * + * The opaque #DecodeBin2 data structure + */ +struct _GstDecodeBin +{ + GstBin bin; /* we extend GstBin */ + + GstElement *typefind; /* this holds the typefind object */ + GstElement *fakesink; + + GMutex *lock; /* Protects activegroup and groups */ + GstDecodeGroup *activegroup; /* group currently active */ + GList *groups; /* List of non-active GstDecodeGroups, sorted in + * order of creation. */ + GList *oldgroups; /* List of no-longer-used GstDecodeGroups. + * Should be freed in dispose */ + gint nbpads; /* unique identifier for source pads */ + GstCaps *caps; /* caps on which to stop decoding */ + + GList *factories; /* factories we can use for selecting elements */ +}; + +struct _GstDecodeBinClass +{ + GstBinClass parent_class; + + /* signal we fire when a new pad has been decoded into raw audio/video */ + void (*new_decoded_pad) (GstElement * element, GstPad * pad, gboolean last); + /* signal we fire when a pad has been removed */ + void (*removed_decoded_pad) (GstElement * element, GstPad * pad); + /* signal fired when we found a pad that we cannot decode */ + void (*unknown_type) (GstElement * element, GstPad * pad, GstCaps * caps); + /* signal fired to know if we continue trying to decode the given caps */ + gboolean (*autoplug_continue) (GstElement * element, GstCaps * caps); + /* signal fired to reorder the proposed list of factories */ + gboolean (*autoplug_sort) (GstElement * element, GstCaps * caps, + GList ** list); +}; + +/* signals */ +enum +{ + SIGNAL_NEW_DECODED_PAD, + SIGNAL_REMOVED_DECODED_PAD, + SIGNAL_UNKNOWN_TYPE, + SIGNAL_AUTOPLUG_CONTINUE, + SIGNAL_AUTOPLUG_SORT, + LAST_SIGNAL +}; + +/* Properties */ +enum +{ + PROP_0, + PROP_CAPS, +}; + +static GstBinClass *parent_class; +static guint gst_decode_bin_signals[LAST_SIGNAL] = { 0 }; + +static const GstElementDetails gst_decode_bin_details = +GST_ELEMENT_DETAILS ("Decoder Bin", + "Generic/Bin/Decoder", + "Autoplug and decode to raw media", + "Edward Hervey "); + + +static gboolean add_fakesink (GstDecodeBin * decode_bin); +static void remove_fakesink (GstDecodeBin * decode_bin); + +static void type_found (GstElement * typefind, guint probability, + GstCaps * caps, GstDecodeBin * decode_bin); + +static gboolean gst_decode_bin_autoplug_continue (GstElement * element, + GstCaps * caps); +static gboolean gst_decode_bin_autoplug_sort (GstElement * element, + GstCaps * caps, GList ** list); +static void gst_decode_bin_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_decode_bin_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_decode_bin_set_caps (GstDecodeBin * dbin, GstCaps * caps); +static GstCaps *gst_decode_bin_get_caps (GstDecodeBin * dbin); + +static GstPad *find_sink_pad (GstElement * element); +static GstStateChangeReturn gst_decode_bin_change_state (GstElement * element, + GstStateChange transition); + +#define DECODE_BIN_LOCK(dbin) G_STMT_START { \ + GST_LOG_OBJECT (dbin, \ + "locking from thread %p", \ + g_thread_self ()); \ + g_mutex_lock (GST_DECODE_BIN_CAST(dbin)->lock); \ + GST_LOG_OBJECT (dbin, \ + "locked from thread %p", \ + g_thread_self ()); \ +} G_STMT_END + +#define DECODE_BIN_UNLOCK(dbin) G_STMT_START { \ + GST_LOG_OBJECT (dbin, \ + "unlocking from thread %p", \ + g_thread_self ()); \ + g_mutex_unlock (GST_DECODE_BIN_CAST(dbin)->lock); \ +} G_STMT_END + +/* GstDecodeGroup + * + * Streams belonging to the same group/chain of a media file + * + */ + +struct _GstDecodeGroup +{ + GstDecodeBin *dbin; + GMutex *lock; + GstElement *multiqueue; + gboolean exposed; /* TRUE if this group is exposed */ + gboolean drained; /* TRUE if EOS went throug all endpads */ + gboolean blocked; /* TRUE if all endpads are blocked */ + gboolean complete; /* TRUE if we are not expecting anymore streams + * on this group */ + gulong overrunsig; + gulong underrunsig; + guint nbdynamic; /* number of dynamic pads in the group. */ + + GList *endpads; /* List of GstDecodePad of source pads to be exposed */ + GList *ghosts; /* List of GstGhostPad for the endpads */ +}; + +#define GROUP_MUTEX_LOCK(group) G_STMT_START { \ + GST_LOG_OBJECT (group->dbin, \ + "locking group %p from thread %p", \ + group, g_thread_self ()); \ + g_mutex_lock (group->lock); \ + GST_LOG_OBJECT (group->dbin, \ + "locked group %p from thread %p", \ + group, g_thread_self ()); \ +} G_STMT_END + +#define GROUP_MUTEX_UNLOCK(group) G_STMT_START { \ + GST_LOG_OBJECT (group->dbin, \ + "unlocking group %p from thread %p", \ + group, g_thread_self ()); \ + g_mutex_unlock (group->lock); \ +} G_STMT_END + + +static GstDecodeGroup *gst_decode_group_new (GstDecodeBin * decode_bin); +static GstPad *gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, + GstPad * pad); +static gboolean gst_decode_group_control_source_pad (GstDecodeGroup * group, + GstPad * pad); +static gboolean gst_decode_group_expose (GstDecodeGroup * group); +static void gst_decode_group_check_if_blocked (GstDecodeGroup * group); +static void gst_decode_group_set_complete (GstDecodeGroup * group); +static void gst_decode_group_hide (GstDecodeGroup * group); +static void gst_decode_group_free (GstDecodeGroup * group); + +/* GstDecodePad + * + * GstPad private used for source pads of groups + */ + +struct _GstDecodePad +{ + GstPad *pad; + GstDecodeGroup *group; + gboolean blocked; + gboolean drained; +}; + +static GstDecodePad *gst_decode_pad_new (GstDecodeGroup * group, GstPad * pad, + gboolean block); +static void source_pad_blocked_cb (GstPad * pad, gboolean blocked, + GstDecodePad * dpad); + +/* TempPadStruct + * Internal structure used for pads which have more than one structure. + */ +typedef struct _TempPadStruct +{ + GstDecodeBin *dbin; + GstDecodeGroup *group; +} TempPadStruct; + +/******************************** + * Standard GObject boilerplate * + ********************************/ + +static void gst_decode_bin_class_init (GstDecodeBinClass * klass); +static void gst_decode_bin_init (GstDecodeBin * decode_bin); +static void gst_decode_bin_dispose (GObject * object); +static void gst_decode_bin_finalize (GObject * object); + +static GType +gst_decode_bin_get_type (void) +{ + static GType gst_decode_bin_type = 0; + + if (!gst_decode_bin_type) { + static const GTypeInfo gst_decode_bin_info = { + sizeof (GstDecodeBinClass), + NULL, + NULL, + (GClassInitFunc) gst_decode_bin_class_init, + NULL, + NULL, + sizeof (GstDecodeBin), + 0, + (GInstanceInitFunc) gst_decode_bin_init, + NULL + }; + + gst_decode_bin_type = + g_type_register_static (GST_TYPE_BIN, "GstDecodeBin2", + &gst_decode_bin_info, 0); + } + + return gst_decode_bin_type; +} + +static gboolean +_gst_boolean_accumulator (GSignalInvocationHint * ihint, + GValue * return_accu, const GValue * handler_return, gpointer dummy) +{ + gboolean myboolean; + + myboolean = g_value_get_boolean (handler_return); + if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP)) + g_value_set_boolean (return_accu, myboolean); + + /* stop emission if FALSE */ + return myboolean; +} + +static void +gst_decode_bin_class_init (GstDecodeBinClass * klass) +{ + GObjectClass *gobject_klass; + GstElementClass *gstelement_klass; + GstBinClass *gstbin_klass; + + gobject_klass = (GObjectClass *) klass; + gstelement_klass = (GstElementClass *) klass; + gstbin_klass = (GstBinClass *) klass; + + parent_class = g_type_class_peek_parent (klass); + + gobject_klass->dispose = GST_DEBUG_FUNCPTR (gst_decode_bin_dispose); + gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_decode_bin_finalize); + gobject_klass->set_property = GST_DEBUG_FUNCPTR (gst_decode_bin_set_property); + gobject_klass->get_property = GST_DEBUG_FUNCPTR (gst_decode_bin_get_property); + + /** + * GstDecodeBin2::new-decoded-pad: + * @pad: the newly created pad + * @islast: #TRUE if this is the last pad to be added. Deprecated. + * + * This signal gets emitted as soon as a new pad of the same type as one of + * the valid 'raw' types is added. + */ + + gst_decode_bin_signals[SIGNAL_NEW_DECODED_PAD] = + g_signal_new ("new-decoded-pad", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstDecodeBinClass, new_decoded_pad), NULL, NULL, + gst_play_marshal_VOID__OBJECT_BOOLEAN, G_TYPE_NONE, 2, GST_TYPE_PAD, + G_TYPE_BOOLEAN); + + /** + * GstDecodeBin2::removed-decoded-pad: + * @pad: the pad that was removed + * + * This signal is emitted when a 'final' caps pad has been removed. + */ + + gst_decode_bin_signals[SIGNAL_REMOVED_DECODED_PAD] = + g_signal_new ("removed-decoded-pad", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstDecodeBinClass, removed_decoded_pad), NULL, NULL, + gst_marshal_VOID__OBJECT, G_TYPE_NONE, 1, GST_TYPE_PAD); + + /** + * GstDecodeBin2::unknown-type: + * @pad: the new pad containing caps that cannot be resolved to a 'final' stream type. + * @caps: the #GstCaps of the pad that cannot be resolved. + * + * This signal is emitted when a pad for which there is no further possible + * decoding is added to the decodebin. + */ + + gst_decode_bin_signals[SIGNAL_UNKNOWN_TYPE] = + g_signal_new ("unknown-type", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodeBinClass, unknown_type), + NULL, NULL, gst_marshal_VOID__OBJECT_OBJECT, G_TYPE_NONE, 2, + GST_TYPE_PAD, GST_TYPE_CAPS); + + /** + * GstDecodeBin2::autoplug-continue: + * @caps: The #GstCaps found. + * + * This signal is emitted whenever decodebin2 finds a new stream. It is + * emitted before looking for any elements that can handle that stream. + * + * Returns: #TRUE if you wish decodebin2 to look for elements that can + * handle the given @caps. If #FALSE, those caps will be considered as + * final and the pad will be exposed as such (see 'new-decoded-pad' + * signal). + */ + + gst_decode_bin_signals[SIGNAL_AUTOPLUG_CONTINUE] = + g_signal_new ("autoplug-continue", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodeBinClass, autoplug_continue), + _gst_boolean_accumulator, NULL, gst_play_marshal_BOOLEAN__OBJECT, + G_TYPE_BOOLEAN, 1, GST_TYPE_CAPS); + + /** + * GstDecodeBin2::autoplug-sort: + * @caps: The #GstCaps. + * @factories: A #GList of possible #GstElementFactory to use. + * + * This signal is emitted once decodebin2 has found all the possible + * #GstElementFactory that can be used to handle the given @caps. + * + * UNSTABLE API. Will change soon. + * + * Returns: #TRUE if you wish decodebin2 to start trying to decode + * the given @caps with the list of factories. #FALSE if you do not want + * these #GstCaps, if so the pad will be exposed as unknown (see + * 'unknown-type' signal). + */ + + gst_decode_bin_signals[SIGNAL_AUTOPLUG_SORT] = + g_signal_new ("autoplug-sort", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodeBinClass, autoplug_sort), + _gst_boolean_accumulator, NULL, gst_play_marshal_BOOLEAN__OBJECT_POINTER, + G_TYPE_BOOLEAN, 2, GST_TYPE_CAPS, G_TYPE_POINTER); + + g_object_class_install_property (gobject_klass, PROP_CAPS, + g_param_spec_boxed ("caps", "Caps", "The caps on which to stop decoding.", + GST_TYPE_CAPS, G_PARAM_READWRITE)); + + klass->autoplug_continue = + GST_DEBUG_FUNCPTR (gst_decode_bin_autoplug_continue); + klass->autoplug_sort = GST_DEBUG_FUNCPTR (gst_decode_bin_autoplug_sort); + + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&decoder_bin_sink_template)); + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&decoder_bin_src_template)); + + gst_element_class_set_details (gstelement_klass, &gst_decode_bin_details); + + gstelement_klass->change_state = + GST_DEBUG_FUNCPTR (gst_decode_bin_change_state); +} + +/* the filter function for selecting the elements we can use in + * autoplugging */ +static gboolean +gst_decode_bin_factory_filter (GstPluginFeature * feature, + GstDecodeBin * decode_bin) +{ + guint rank; + const gchar *klass; + + /* we only care about element factories */ + if (!GST_IS_ELEMENT_FACTORY (feature)) + return FALSE; + + klass = gst_element_factory_get_klass (GST_ELEMENT_FACTORY (feature)); + /* only demuxers, decoders and parsers can play */ + if (strstr (klass, "Demux") == NULL && + strstr (klass, "Decoder") == NULL && strstr (klass, "Parse") == NULL) { + return FALSE; + } + + /* only select elements with autoplugging rank */ + rank = gst_plugin_feature_get_rank (feature); + if (rank < GST_RANK_MARGINAL) + return FALSE; + + return TRUE; +} + +/* function used to sort element features */ +static gint +compare_ranks (GstPluginFeature * f1, GstPluginFeature * f2) +{ + gint diff; + const gchar *rname1, *rname2; + + diff = gst_plugin_feature_get_rank (f2) - gst_plugin_feature_get_rank (f1); + if (diff != 0) + return diff; + + rname1 = gst_plugin_feature_get_name (f1); + rname2 = gst_plugin_feature_get_name (f2); + + diff = strcmp (rname2, rname1); + + return diff; +} + +static void +print_feature (GstPluginFeature * feature) +{ + const gchar *rname; + + rname = gst_plugin_feature_get_name (feature); + + GST_DEBUG ("%s", rname); +} + +static void +gst_decode_bin_init (GstDecodeBin * decode_bin) +{ + GList *factories; + + /* first filter out the interesting element factories */ + factories = gst_default_registry_feature_filter ( + (GstPluginFeatureFilter) gst_decode_bin_factory_filter, + FALSE, decode_bin); + + /* sort them according to their ranks */ + decode_bin->factories = g_list_sort (factories, (GCompareFunc) compare_ranks); + /* do some debugging */ + g_list_foreach (decode_bin->factories, (GFunc) print_feature, NULL); + + + /* we create the typefind element only once */ + decode_bin->typefind = gst_element_factory_make ("typefind", "typefind"); + if (!decode_bin->typefind) { + g_warning ("can't find typefind element, decodebin will not work"); + } else { + GstPad *pad; + GstPad *gpad; + + /* add the typefind element */ + if (!gst_bin_add (GST_BIN (decode_bin), decode_bin->typefind)) { + g_warning ("Could not add typefind element, decodebin will not work"); + gst_object_unref (decode_bin->typefind); + decode_bin->typefind = NULL; + } + + /* get the sinkpad */ + pad = gst_element_get_pad (decode_bin->typefind, "sink"); + + /* ghost the sink pad to ourself */ + gpad = gst_ghost_pad_new ("sink", pad); + gst_pad_set_active (gpad, TRUE); + gst_element_add_pad (GST_ELEMENT (decode_bin), gpad); + + gst_object_unref (pad); + + /* connect a signal to find out when the typefind element found + * a type */ + g_signal_connect (G_OBJECT (decode_bin->typefind), "have_type", + G_CALLBACK (type_found), decode_bin); + } + + decode_bin->lock = g_mutex_new (); + decode_bin->activegroup = NULL; + decode_bin->groups = NULL; + + decode_bin->caps = + gst_caps_from_string ("video/x-raw-yuv;video/x-raw-rgb;video/x-raw-gray;" + "audio/x-raw-int;audio/x-raw-float;" "text/plain;text/x-pango-markup"); + + add_fakesink (decode_bin); + + /* FILLME */ +} + +static void +gst_decode_bin_dispose (GObject * object) +{ + GstDecodeBin *decode_bin; + GList *tmp; + + decode_bin = GST_DECODE_BIN (object); + + if (decode_bin->factories) + gst_plugin_feature_list_free (decode_bin->factories); + decode_bin->factories = NULL; + + if (decode_bin->activegroup) { + gst_decode_group_free (decode_bin->activegroup); + decode_bin->activegroup = NULL; + } + + /* remove groups */ + for (tmp = decode_bin->groups; tmp; tmp = g_list_next (tmp)) { + GstDecodeGroup *group = (GstDecodeGroup *) tmp->data; + + gst_decode_group_free (group); + } + g_list_free (decode_bin->groups); + decode_bin->groups = NULL; + + for (tmp = decode_bin->oldgroups; tmp; tmp = g_list_next (tmp)) { + GstDecodeGroup *group = (GstDecodeGroup *) tmp->data; + + gst_decode_group_free (group); + } + g_list_free (decode_bin->oldgroups); + decode_bin->oldgroups = NULL; + + if (decode_bin->caps) + gst_caps_unref (decode_bin->caps); + decode_bin->caps = NULL; + remove_fakesink (decode_bin); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_decode_bin_finalize (GObject * object) +{ + GstDecodeBin *decode_bin; + + decode_bin = GST_DECODE_BIN (object); + + if (decode_bin->lock) { + g_mutex_free (decode_bin->lock); + decode_bin->lock = NULL; + } + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_decode_bin_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstDecodeBin *dbin; + + dbin = GST_DECODE_BIN (object); + + switch (prop_id) { + case PROP_CAPS: + gst_decode_bin_set_caps (dbin, (GstCaps *) g_value_dup_boxed (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_decode_bin_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstDecodeBin *dbin; + + dbin = GST_DECODE_BIN (object); + switch (prop_id) { + case PROP_CAPS:{ + g_value_take_boxed (value, gst_decode_bin_get_caps (dbin)); + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + +} + +/* _set_caps + * Changes the caps on which decodebin will stop decoding. + * Will unref the previously set one. The refcount of the given caps will be + * taken. + * @caps can be NULL. + * + * MT-safe + */ + +static void +gst_decode_bin_set_caps (GstDecodeBin * dbin, GstCaps * caps) +{ + GST_DEBUG_OBJECT (dbin, "Setting new caps: %" GST_PTR_FORMAT, caps); + + DECODE_BIN_LOCK (dbin); + if (dbin->caps) + gst_caps_unref (dbin->caps); + dbin->caps = caps; + DECODE_BIN_UNLOCK (dbin); +} + +/* _get_caps + * Returns the currently configured caps on which decodebin will stop decoding. + * The returned caps (if not NULL), will have its refcount incremented. + * + * MT-safe + */ + +static GstCaps * +gst_decode_bin_get_caps (GstDecodeBin * dbin) +{ + GstCaps *caps; + + GST_DEBUG_OBJECT (dbin, "Getting currently set caps"); + + DECODE_BIN_LOCK (dbin); + caps = dbin->caps; + if (caps) + gst_caps_ref (caps); + DECODE_BIN_UNLOCK (dbin); + + return caps; +} + +/***** + * Default autoplug signal handlers + *****/ + +static gboolean +gst_decode_bin_autoplug_continue (GstElement * element, GstCaps * caps) +{ + return TRUE; +} + +static gboolean +gst_decode_bin_autoplug_sort (GstElement * element, GstCaps * caps, + GList ** list) +{ + return TRUE; +} + + + +/******** + * Discovery methods + *****/ + +static gboolean are_raw_caps (GstDecodeBin * dbin, GstCaps * caps); +static gboolean is_demuxer_element (GstElement * srcelement); +static GList *find_compatibles (GstDecodeBin * decode_bin, + const GstCaps * caps); + +static gboolean connect_pad (GstDecodeBin * dbin, GstElement * src, + GstPad * pad, GList * factories, GstDecodeGroup * group); +static gboolean connect_element (GstDecodeBin * dbin, GstElement * element, + GstDecodeGroup * group); +static void expose_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, + GstDecodeGroup * group); + +static void pad_added_group_cb (GstElement * element, GstPad * pad, + GstDecodeGroup * group); +static void pad_removed_group_cb (GstElement * element, GstPad * pad, + GstDecodeGroup * group); +static void no_more_pads_group_cb (GstElement * element, + GstDecodeGroup * group); +static void pad_added_cb (GstElement * element, GstPad * pad, + GstDecodeBin * dbin); +static void pad_removed_cb (GstElement * element, GstPad * pad, + GstDecodeBin * dbin); +static void no_more_pads_cb (GstElement * element, GstDecodeBin * dbin); + +static GstDecodeGroup *get_current_group (GstDecodeBin * dbin); + +static void +analyze_new_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, + GstCaps * caps, GstDecodeGroup * group) +{ + gboolean apcontinue = TRUE; + GList *factories = NULL; + gboolean apsort = TRUE; + + GST_DEBUG_OBJECT (dbin, "Pad %s:%s caps:%" GST_PTR_FORMAT, + GST_DEBUG_PAD_NAME (pad), caps); + + if ((caps == NULL) || gst_caps_is_empty (caps)) + goto unknown_type; + + if (gst_caps_is_any (caps)) + goto any_caps; + + /* 1. Emit 'autoplug-continue' */ + g_signal_emit (G_OBJECT (dbin), + gst_decode_bin_signals[SIGNAL_AUTOPLUG_CONTINUE], 0, caps, &apcontinue); + + /* 1.a if autoplug-continue is FALSE or caps is a raw format, goto pad_is_final */ + if ((!apcontinue) || are_raw_caps (dbin, caps)) + goto expose_pad; + + /* 1.b else if there's no compatible factory or 'autoplug-sort' returned FALSE, goto pad_not_used */ + if ((factories = find_compatibles (dbin, caps))) { + /* emit autoplug-sort */ + g_signal_emit (G_OBJECT (dbin), + gst_decode_bin_signals[SIGNAL_AUTOPLUG_SORT], + 0, caps, &factories, &apsort); + if (!apsort) { + g_list_free (factories); + /* User doesn't want that pad */ + goto pad_not_wanted; + } + } else { + /* no compatible factories */ + goto unknown_type; + } + + /* 1.c else goto pad_is_valid */ + GST_LOG_OBJECT (pad, "Let's continue discovery on this pad"); + + connect_pad (dbin, src, pad, factories, group); + g_list_free (factories); + + return; + +expose_pad: + { + GST_LOG_OBJECT (dbin, "Pad is final. autoplug-continue:%d", apcontinue); + expose_pad (dbin, src, pad, group); + return; + } + +pad_not_wanted: + { + GST_LOG_OBJECT (pad, "User doesn't want this pad, stopping discovery"); + return; + } + +unknown_type: + { + GST_LOG_OBJECT (pad, "Unknown type, firing signal"); + g_signal_emit (G_OBJECT (dbin), + gst_decode_bin_signals[SIGNAL_UNKNOWN_TYPE], 0, pad, caps); + + /* Check if there are no pending groups, if so, remove fakesink */ + if (dbin->groups == NULL) + remove_fakesink (dbin); + + return; + } + +any_caps: + { + GST_WARNING_OBJECT (pad, + "pad has ANY caps, not able to autoplug to anything"); + /* FIXME : connect to caps notification */ + return; + } +} + + +/* connect_pad: + * + * Try to connect the given pad to an element created from one of the factories, + * and recursively. + * + * Returns TRUE if an element was properly created and linked + */ + +static gboolean +connect_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, + GList * factories, GstDecodeGroup * group) +{ + gboolean res = FALSE; + GList *tmp; + + g_return_val_if_fail (factories != NULL, FALSE); + GST_DEBUG_OBJECT (dbin, "pad %s:%s , group:%p", + GST_DEBUG_PAD_NAME (pad), group); + + /* 1. is element demuxer or parser */ + if (is_demuxer_element (src)) { + GstPad *mqpad; + + GST_LOG_OBJECT (src, "is a demuxer, connecting the pad through multiqueue"); + + if (!group) + if (!(group = get_current_group (dbin))) { + group = gst_decode_group_new (dbin); + DECODE_BIN_LOCK (dbin); + dbin->groups = g_list_append (dbin->groups, group); + DECODE_BIN_UNLOCK (dbin); + } + + if (!(mqpad = gst_decode_group_control_demuxer_pad (group, pad))) + goto beach; + pad = mqpad; + } + + /* 2. Try to create an element and link to it */ + for (tmp = factories; tmp; tmp = g_list_next (tmp)) { + GstElementFactory *factory = (GstElementFactory *) tmp->data; + GstElement *element; + GstPad *sinkpad; + + /* 2.1. Try to create an element */ + if ((element = gst_element_factory_create (factory, NULL)) == NULL) { + GST_WARNING_OBJECT (dbin, "Could not create an element from %s", + gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (factory))); + continue; + } + + /* 2.3. Find its sink pad */ + if (!(sinkpad = find_sink_pad (element))) { + GST_WARNING_OBJECT (dbin, "Element %s doesn't have a sink pad", + GST_ELEMENT_NAME (element)); + gst_object_unref (element); + continue; + } + + /* 2.4 add it ... */ + if (!(gst_bin_add (GST_BIN (dbin), element))) { + GST_WARNING_OBJECT (dbin, "Couldn't add %s to the bin", + GST_ELEMENT_NAME (element)); + gst_object_unref (sinkpad); + gst_object_unref (element); + continue; + } + + /* ... activate it ... */ + if ((gst_element_set_state (element, + GST_STATE_READY)) == GST_STATE_CHANGE_FAILURE) { + GST_WARNING_OBJECT (dbin, "Couldn't set %s to READY", + GST_ELEMENT_NAME (element)); + gst_object_unref (sinkpad); + gst_bin_remove (GST_BIN (dbin), element); + continue; + } + + /* 2.5 ...and try to link */ + if ((gst_pad_link (pad, sinkpad)) != GST_PAD_LINK_OK) { + GST_WARNING_OBJECT (dbin, "Link failed on pad %s:%s", + GST_DEBUG_PAD_NAME (sinkpad)); + gst_element_set_state (element, GST_STATE_NULL); + gst_object_unref (sinkpad); + gst_bin_remove (GST_BIN (dbin), element); + continue; + } + + GST_LOG_OBJECT (dbin, "linked on pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + /* link this element further */ + connect_element (dbin, element, group); + + /* Bring the element to the state of the parent */ + if ((gst_element_set_state (element, + GST_STATE_PAUSED)) == GST_STATE_CHANGE_FAILURE) { + GST_WARNING_OBJECT (dbin, "Couldn't set %s to PAUSED", + GST_ELEMENT_NAME (element)); + gst_element_set_state (element, GST_STATE_NULL); + gst_object_unref (sinkpad); + gst_bin_remove (GST_BIN (dbin), element); + continue; + } + + res = TRUE; + break; + } + +beach: + return res; +} + +static gboolean +connect_element (GstDecodeBin * dbin, GstElement * element, + GstDecodeGroup * group) +{ + GList *pads; + gboolean res = TRUE; + gboolean dynamic = FALSE; + GList *to_connect = NULL; + + GST_DEBUG_OBJECT (dbin, "Attempting to connect element %s [group:%p] further", + GST_ELEMENT_NAME (element), group); + + /* 1. Loop over pad templates, grabbing existing pads along the way */ + for (pads = GST_ELEMENT_GET_CLASS (element)->padtemplates; pads; + pads = g_list_next (pads)) { + GstPadTemplate *templ = GST_PAD_TEMPLATE (pads->data); + const gchar *templ_name; + + /* we are only interested in source pads */ + if (GST_PAD_TEMPLATE_DIRECTION (templ) != GST_PAD_SRC) + continue; + + templ_name = GST_PAD_TEMPLATE_NAME_TEMPLATE (templ); + GST_DEBUG_OBJECT (dbin, "got a source pad template %s", templ_name); + + /* figure out what kind of pad this is */ + switch (GST_PAD_TEMPLATE_PRESENCE (templ)) { + case GST_PAD_ALWAYS: + { + /* get the pad that we need to autoplug */ + GstPad *pad = gst_element_get_pad (element, templ_name); + + if (pad) { + GST_DEBUG_OBJECT (dbin, "got the pad for always template %s", + templ_name); + /* here is the pad, we need to autoplug it */ + to_connect = g_list_prepend (to_connect, pad); + } else { + /* strange, pad is marked as always but it's not + * there. Fix the element */ + GST_WARNING_OBJECT (dbin, + "could not get the pad for always template %s", templ_name); + } + break; + } + case GST_PAD_SOMETIMES: + { + /* try to get the pad to see if it is already created or + * not */ + GstPad *pad = gst_element_get_pad (element, templ_name); + + if (pad) { + GST_DEBUG_OBJECT (dbin, "got the pad for sometimes template %s", + templ_name); + /* the pad is created, we need to autoplug it */ + to_connect = g_list_prepend (to_connect, pad); + } else { + GST_DEBUG_OBJECT (dbin, + "did not get the sometimes pad of template %s", templ_name); + /* we have an element that will create dynamic pads */ + dynamic = TRUE; + } + break; + } + case GST_PAD_REQUEST: + /* ignore request pads */ + GST_DEBUG_OBJECT (dbin, "ignoring request padtemplate %s", templ_name); + break; + } + } + + /* 2. if there are more potential pads, connect to relevent signals */ + if (dynamic) { + if (group) { + GST_LOG ("Adding signals to element %s in group %p", + GST_ELEMENT_NAME (element), group); + GROUP_MUTEX_LOCK (group); + group->nbdynamic++; + GST_LOG ("Group %p has now %d dynamic elements", group, group->nbdynamic); + GROUP_MUTEX_UNLOCK (group); + g_signal_connect (G_OBJECT (element), "pad-added", + G_CALLBACK (pad_added_group_cb), group); + g_signal_connect (G_OBJECT (element), "pad-removed", + G_CALLBACK (pad_removed_group_cb), group); + g_signal_connect (G_OBJECT (element), "no-more-pads", + G_CALLBACK (no_more_pads_group_cb), group); + } else { + /* This is a non-grouped element, the handlers are different */ + g_signal_connect (G_OBJECT (element), "pad-added", + G_CALLBACK (pad_added_cb), dbin); + g_signal_connect (G_OBJECT (element), "pad-removed", + G_CALLBACK (pad_removed_cb), dbin); + g_signal_connect (G_OBJECT (element), "no-more-pads", + G_CALLBACK (no_more_pads_cb), dbin); + } + } + + /* 3. for every available pad, connect it */ + for (pads = to_connect; pads; pads = g_list_next (pads)) { + GstPad *pad = GST_PAD_CAST (pads->data); + GstCaps *caps; + + caps = gst_pad_get_caps (pad); + analyze_new_pad (dbin, element, pad, caps, group); + if (caps) + gst_caps_unref (caps); + + gst_object_unref (pad); + } + g_list_free (to_connect); + + return res; +} + +/* expose_pad: + * + * Expose the given pad on the group as a decoded pad. + * If group is NULL, a GstDecodeGroup will be created and setup properly. + */ +static void +expose_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, + GstDecodeGroup * group) +{ + gboolean newgroup = FALSE; + gboolean isdemux; + + GST_DEBUG_OBJECT (dbin, "pad %s:%s, group:%p", + GST_DEBUG_PAD_NAME (pad), group); + + if (!group) + if (!(group = get_current_group (dbin))) { + group = gst_decode_group_new (dbin); + DECODE_BIN_LOCK (dbin); + dbin->groups = g_list_append (dbin->groups, group); + DECODE_BIN_UNLOCK (dbin); + newgroup = TRUE; + } + + isdemux = is_demuxer_element (src); + + if (isdemux || newgroup) { + GstPad *mqpad; + + GST_LOG_OBJECT (src, "is a demuxer, connecting the pad through multiqueue"); + + if (!(mqpad = gst_decode_group_control_demuxer_pad (group, pad))) + goto beach; + pad = mqpad; + } + + gst_decode_group_control_source_pad (group, pad); + + if (newgroup && !isdemux) { + /* If we have discovered a raw pad and it doesn't belong to any group, + * that means there wasn't any demuxer. In that case, we consider the + * group as being complete. */ + gst_decode_group_set_complete (group); + } +beach: + return; +} + +static void +type_found (GstElement * typefind, guint probability, + GstCaps * caps, GstDecodeBin * decode_bin) +{ + GstPad *pad; + + GST_STATE_LOCK (decode_bin); + + GST_DEBUG_OBJECT (decode_bin, "typefind found caps %" GST_PTR_FORMAT, caps); + + pad = gst_element_get_pad (typefind, "src"); + + analyze_new_pad (decode_bin, typefind, pad, caps, NULL); + + gst_object_unref (pad); + + GST_STATE_UNLOCK (decode_bin); + return; +} + +static void +pad_added_group_cb (GstElement * element, GstPad * pad, GstDecodeGroup * group) +{ + GstCaps *caps; + gboolean expose = FALSE; + + GST_LOG_OBJECT (pad, "pad added, group:%p", group); + + caps = gst_pad_get_caps (pad); + analyze_new_pad (group->dbin, element, pad, caps, group); + if (caps) + gst_caps_unref (caps); + + GROUP_MUTEX_LOCK (group); + group->nbdynamic--; + GST_LOG ("Group %p has now %d dynamic objects", group, group->nbdynamic); + if (group->nbdynamic == 0) + expose = TRUE; + GROUP_MUTEX_UNLOCK (group); + if (expose) { + GST_LOG + ("That was the last dynamic object, now attempting to expose the group"); + DECODE_BIN_LOCK (group->dbin); + gst_decode_group_expose (group); + DECODE_BIN_UNLOCK (group->dbin); + } +} + +static void +pad_removed_group_cb (GstElement * element, GstPad * pad, + GstDecodeGroup * group) +{ + GST_LOG_OBJECT (pad, "pad removed, group:%p", group); + + /* In fact, we don't have to do anything here, the active group will be + * removed when the group's multiqueue is drained */ +} + +static void +no_more_pads_group_cb (GstElement * element, GstDecodeGroup * group) +{ + GST_LOG_OBJECT (element, "no more pads, setting group %p to complete", group); + + /* FIXME : FILLME */ + gst_decode_group_set_complete (group); +} + +static void +pad_added_cb (GstElement * element, GstPad * pad, GstDecodeBin * dbin) +{ + GstCaps *caps; + + GST_LOG_OBJECT (pad, "Pad added to non-grouped element"); + + caps = gst_pad_get_caps (pad); + analyze_new_pad (dbin, element, pad, caps, NULL); + if (caps) + gst_caps_unref (caps); +} + +static void +pad_removed_cb (GstElement * element, GstPad * pad, GstDecodeBin * dbin) +{ + GST_LOG_OBJECT (pad, "Pad removed from non-grouped element"); +} + +static void +no_more_pads_cb (GstElement * element, GstDecodeBin * dbin) +{ + GstDecodeGroup *group; + + GST_LOG_OBJECT (element, "No more pads, setting current group to complete"); + + /* Find the non-complete group, there should only be one */ + if (!(group = get_current_group (dbin))) + goto no_group; + + gst_decode_group_set_complete (group); + return; + +no_group: + { + GST_WARNING_OBJECT (dbin, "We couldn't find a non-completed group !!"); + return; + } +} + +/* this function runs through the element factories and returns a list + * of all elements that are able to sink the given caps + */ +static GList * +find_compatibles (GstDecodeBin * decode_bin, const GstCaps * caps) +{ + GList *factories; + GList *to_try = NULL; + + /* loop over all the factories */ + for (factories = decode_bin->factories; factories; + factories = g_list_next (factories)) { + GstElementFactory *factory = GST_ELEMENT_FACTORY (factories->data); + const GList *templates; + GList *walk; + + /* get the templates from the element factory */ + templates = gst_element_factory_get_static_pad_templates (factory); + for (walk = (GList *) templates; walk; walk = g_list_next (walk)) { + GstStaticPadTemplate *templ = walk->data; + + /* we only care about the sink templates */ + if (templ->direction == GST_PAD_SINK) { + GstCaps *intersect; + GstCaps *tmpl_caps; + + /* try to intersect the caps with the caps of the template */ + tmpl_caps = gst_static_caps_get (&templ->static_caps); + + intersect = gst_caps_intersect (caps, tmpl_caps); + gst_caps_unref (tmpl_caps); + + /* check if the intersection is empty */ + if (!gst_caps_is_empty (intersect)) { + /* non empty intersection, we can use this element */ + to_try = g_list_prepend (to_try, factory); + gst_caps_unref (intersect); + break; + } + gst_caps_unref (intersect); + } + } + } + to_try = g_list_reverse (to_try); + + return to_try; +} + +/* Decide whether an element is a demuxer based on the + * klass and number/type of src pad templates it has */ +static gboolean +is_demuxer_element (GstElement * srcelement) +{ + GstElementFactory *srcfactory; + GstElementClass *elemclass; + GList *templates, *walk; + const gchar *klass; + gint potential_src_pads = 0; + + srcfactory = gst_element_get_factory (srcelement); + klass = gst_element_factory_get_klass (srcfactory); + + /* Can't be a demuxer unless it has Demux in the klass name */ + if (!strstr (klass, "Demux")) + return FALSE; + + /* Walk the src pad templates and count how many the element + * might produce */ + elemclass = GST_ELEMENT_GET_CLASS (srcelement); + + walk = templates = gst_element_class_get_pad_template_list (elemclass); + while (walk != NULL) { + GstPadTemplate *templ; + + templ = (GstPadTemplate *) walk->data; + if (GST_PAD_TEMPLATE_DIRECTION (templ) == GST_PAD_SRC) { + switch (GST_PAD_TEMPLATE_PRESENCE (templ)) { + case GST_PAD_ALWAYS: + case GST_PAD_SOMETIMES: + if (strstr (GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), "%")) + potential_src_pads += 2; /* Might make multiple pads */ + else + potential_src_pads += 1; + break; + case GST_PAD_REQUEST: + potential_src_pads += 2; + break; + } + } + walk = g_list_next (walk); + } + + if (potential_src_pads < 2) + return FALSE; + + return TRUE; +} + +/* Returns TRUE if the caps are raw, or if they are compatible with the caps + * specified in the 'caps' property + * + * The decodebin_lock should be taken ! + */ +static gboolean +are_raw_caps (GstDecodeBin * dbin, GstCaps * caps) +{ + GstCaps *intersection; + gboolean res; + + GST_LOG_OBJECT (dbin, "Checking with caps %" GST_PTR_FORMAT, caps); + + intersection = gst_caps_intersect (dbin->caps, caps); + + res = (!(gst_caps_is_empty (intersection))); + + gst_caps_unref (intersection); + + GST_LOG_OBJECT (dbin, "Caps are %sfinal caps", res ? "" : "not "); + + return res; +} + + +/**** + * GstDecodeGroup functions + ****/ + +static void +multi_queue_overrun_cb (GstElement * queue, GstDecodeGroup * group) +{ + GST_LOG_OBJECT (group->dbin, "multiqueue is full"); + + /* if we haven't exposed the group, do it */ + DECODE_BIN_LOCK (group->dbin); + gst_decode_group_expose (group); + DECODE_BIN_UNLOCK (group->dbin); +} + +static void +multi_queue_underrun_cb (GstElement * queue, GstDecodeGroup * group) +{ + GstDecodeBin *dbin = group->dbin; + + GST_LOG_OBJECT (dbin, "multiqueue is empty for group %p", group); + + /* Check if we need to activate another group */ + DECODE_BIN_LOCK (dbin); + if ((group == dbin->activegroup) && dbin->groups) { + GST_DEBUG_OBJECT (dbin, "Switching to new group"); + /* unexpose current active */ + gst_decode_group_hide (group); + + /* expose first group of groups */ + gst_decode_group_expose ((GstDecodeGroup *) dbin->groups->data); + } + DECODE_BIN_UNLOCK (dbin); +} + +/* gst_decode_group_new + * + * Creates a new GstDecodeGroup. It is up to the caller to add it to the list + * of groups. + */ +static GstDecodeGroup * +gst_decode_group_new (GstDecodeBin * dbin) +{ + GstDecodeGroup *group; + GstElement *mq; + + GST_LOG_OBJECT (dbin, "Creating new group"); + + if (!(mq = gst_element_factory_make ("multiqueue", NULL))) { + GST_WARNING ("Couldn't create multiqueue element"); + return NULL; + } + + g_object_set (G_OBJECT (mq), + "max-size-bytes", 2 * 1024 * 1024, + "max-size-time", 5 * GST_SECOND, "max-size-buffers", 0, NULL); + + group = g_new0 (GstDecodeGroup, 1); + group->lock = g_mutex_new (); + group->dbin = dbin; + group->multiqueue = mq; + group->exposed = FALSE; + group->drained = FALSE; + group->blocked = FALSE; + group->complete = FALSE; + group->endpads = NULL; + + group->overrunsig = g_signal_connect (G_OBJECT (mq), "overrun", + G_CALLBACK (multi_queue_overrun_cb), group); + group->underrunsig = g_signal_connect (G_OBJECT (mq), "underrun", + G_CALLBACK (multi_queue_underrun_cb), group); + + gst_bin_add (GST_BIN (dbin), group->multiqueue); + gst_element_set_state (group->multiqueue, GST_STATE_PAUSED); + + GST_LOG_OBJECT (dbin, "Returning new group %p", group); + + return group; +} + +/** get_current_group: + * + * Returns the current non-completed group. + * + * Returns NULL if no groups are available, or all groups are completed. + */ +static GstDecodeGroup * +get_current_group (GstDecodeBin * dbin) +{ + GList *tmp; + GstDecodeGroup *group = NULL; + + DECODE_BIN_LOCK (dbin); + for (tmp = dbin->groups; tmp; tmp = g_list_next (tmp)) { + GstDecodeGroup *this = (GstDecodeGroup *) tmp->data; + + GST_LOG_OBJECT (dbin, "group %p, complete:%d", this, this->complete); + + if (!this->complete) { + group = this; + break; + } + } + DECODE_BIN_UNLOCK (dbin); + + GST_LOG_OBJECT (dbin, "Returning group %p", group); + + return group; +} + +static gboolean +group_demuxer_event_probe (GstPad * pad, GstEvent * event, + GstDecodeGroup * group) +{ + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + GST_DEBUG_OBJECT (group->dbin, + "Got EOS on group input pads, exposing group if it wasn't before"); + DECODE_BIN_LOCK (group->dbin); + gst_decode_group_expose (group); + DECODE_BIN_UNLOCK (group->dbin); + } + return TRUE; +} + +/* gst_decode_group_control_demuxer_pad + * + * Adds a new demuxer srcpad to the given group. + * + * Returns the srcpad of the multiqueue corresponding the given pad. + * Returns NULL if there was an error. + */ +static GstPad * +gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, GstPad * pad) +{ + GstPad *srcpad, *sinkpad; + gchar *nb, *sinkname, *srcname; + + GST_LOG ("group:%p pad %s:%s", group, GST_DEBUG_PAD_NAME (pad)); + + srcpad = NULL; + + if (!(sinkpad = gst_element_get_pad (group->multiqueue, "sink%d"))) { + GST_ERROR ("Couldn't get sinkpad from multiqueue"); + return NULL; + } + + if ((gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK)) { + GST_ERROR ("Couldn't link demuxer and multiqueue"); + goto beach; + } + + sinkname = gst_pad_get_name (sinkpad); + nb = sinkname + 4; + srcname = g_strdup_printf ("src%s", nb); + g_free (sinkname); + + GROUP_MUTEX_LOCK (group); + + if (!(srcpad = gst_element_get_pad (group->multiqueue, srcname))) { + GST_ERROR ("Couldn't get srcpad %s from multiqueue", srcname); + goto chiringuito; + } + + /* connect event handler on pad to intercept EOS events */ + gst_pad_add_event_probe (pad, G_CALLBACK (group_demuxer_event_probe), group); + +chiringuito: + g_free (srcname); + GROUP_MUTEX_UNLOCK (group); + +beach: + gst_object_unref (sinkpad); + return srcpad; +} + +static gboolean +gst_decode_group_control_source_pad (GstDecodeGroup * group, GstPad * pad) +{ + GstDecodePad *dpad; + + g_return_val_if_fail (group != NULL, FALSE); + + GST_LOG ("group:%p , pad %s:%s", group, GST_DEBUG_PAD_NAME (pad)); + + /* FIXME : check if pad is already controlled */ + + GROUP_MUTEX_LOCK (group); + + /* Create GstDecodePad for the pad */ + dpad = gst_decode_pad_new (group, pad, TRUE); + + group->endpads = g_list_append (group->endpads, dpad); + + GROUP_MUTEX_UNLOCK (group); + + return TRUE; +} + +/* gst_decode_group_check_if_blocked: + * + * Call this when one of the pads blocked status has changed. + * If the group is complete and blocked, the group will be marked as blocked + * and will ghost/expose all pads on decodebin if the group is the current one. + * + * Call with the group lock taken ! MT safe + */ +static void +gst_decode_group_check_if_blocked (GstDecodeGroup * group) +{ + GList *tmp; + gboolean blocked = TRUE; + + GST_LOG ("group : %p , ->complete:%d , ->nbdynamic:%d", + group, group->complete, group->nbdynamic); + + /* 1. don't do anything if group is not complete */ + if (!group->complete || group->nbdynamic) { + GST_DEBUG_OBJECT (group->dbin, "Group isn't complete yet"); + return; + } + + for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { + GstDecodePad *dpad = (GstDecodePad *) tmp->data; + + if (!dpad->blocked) { + blocked = FALSE; + break; + } + } + + /* 2. Update status of group */ + group->blocked = blocked; + GST_LOG ("group is blocked:%d", blocked); + + /* 3. don't do anything if not blocked completely */ + if (!blocked) + return; + + /* 4. if we're the current group, expose pads */ + DECODE_BIN_LOCK (group->dbin); + if (!gst_decode_group_expose (group)) + GST_WARNING_OBJECT (group->dbin, "Couldn't expose group"); + DECODE_BIN_UNLOCK (group->dbin); +} + +static void +gst_decode_group_check_if_drained (GstDecodeGroup * group) +{ + GList *tmp; + GstDecodeBin *dbin = group->dbin; + gboolean drained = TRUE; + + GST_LOG ("group : %p", group); + + for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { + GstDecodePad *dpad = (GstDecodePad *) tmp->data; + + GST_LOG ("testing dpad %p", dpad); + + if (!dpad->drained) { + drained = FALSE; + break; + } + } + + group->drained = drained; + GST_LOG ("group is drained"); + + if (!drained) + return; + + DECODE_BIN_LOCK (dbin); + if ((group == dbin->activegroup) && dbin->groups) { + GST_DEBUG_OBJECT (dbin, "Switching to new group"); + + gst_decode_group_hide (group); + + gst_decode_group_expose ((GstDecodeGroup *) dbin->groups->data); + } + DECODE_BIN_UNLOCK (dbin); +} + +/* sort_end_pads: + * GCompareFunc to use with lists of GstPad. + * Sorts pads by mime type. + * First video (raw, then non-raw), then audio (raw, then non-raw), + * then others. + * + * Return: negative if ab + */ + +static gint +sort_end_pads (GstDecodePad * da, GstDecodePad * db) +{ + GstPad *a, *b; + gint va, vb; + GstCaps *capsa, *capsb; + GstStructure *sa, *sb; + const gchar *namea, *nameb; + + a = da->pad; + b = db->pad; + + capsa = gst_pad_get_caps (a); + capsb = gst_pad_get_caps (b); + + sa = gst_caps_get_structure ((const GstCaps *) capsa, 0); + sb = gst_caps_get_structure ((const GstCaps *) capsb, 0); + + namea = gst_structure_get_name (sa); + nameb = gst_structure_get_name (sb); + + if (g_strrstr (namea, "video/x-raw-")) + va = 0; + else if (g_strrstr (namea, "video/")) + va = 1; + else if (g_strrstr (namea, "audio/x-raw")) + va = 2; + else if (g_strrstr (namea, "audio/")) + va = 3; + else + va = 4; + + if (g_strrstr (nameb, "video/x-raw-")) + vb = 0; + else if (g_strrstr (nameb, "video/")) + vb = 1; + else if (g_strrstr (nameb, "audio/x-raw")) + vb = 2; + else if (g_strrstr (nameb, "audio/")) + vb = 3; + else + vb = 4; + + gst_caps_unref (capsa); + gst_caps_unref (capsb); + + return va - vb; +} + +/* gst_decode_group_expose: + * + * Expose this group's pads. + * + * Not MT safe, please take the group lock + */ + +static gboolean +gst_decode_group_expose (GstDecodeGroup * group) +{ + GList *tmp; + GList *next = NULL; + + if (group->dbin->activegroup) { + GST_DEBUG_OBJECT (group->dbin, "A group is already active and exposed"); + return TRUE; + } + + if (group->dbin->activegroup == group) { + GST_WARNING ("Group %p is already exposed", group); + return TRUE; + } + + if (!group->dbin->groups + || (group != (GstDecodeGroup *) group->dbin->groups->data)) { + GST_WARNING ("Group %p is not the first group to expose", group); + return FALSE; + } + + if (group->nbdynamic) { + GST_WARNING ("Group %p still has %d dynamic objects, not exposing yet", + group, group->nbdynamic); + return FALSE; + } + + GST_LOG ("Exposing group %p", group); + + /* re-order pads : video, then audio, then others */ + group->endpads = g_list_sort (group->endpads, (GCompareFunc) sort_end_pads); + + /* Expose pads */ + + for (tmp = group->endpads; tmp; tmp = next) { + GstDecodePad *dpad = (GstDecodePad *) tmp->data; + gchar *padname; + GstPad *ghost; + + next = g_list_next (tmp); + + /* 1. ghost pad */ + padname = g_strdup_printf ("src%d", group->dbin->nbpads); + group->dbin->nbpads++; + + GST_LOG_OBJECT (group->dbin, "About to expose pad %s:%s", + GST_DEBUG_PAD_NAME (dpad->pad)); + + ghost = gst_ghost_pad_new (padname, dpad->pad); + gst_pad_set_active (ghost, TRUE); + gst_element_add_pad (GST_ELEMENT (group->dbin), ghost); + group->ghosts = g_list_append (group->ghosts, ghost); + + g_free (padname); + + /* 2. emit signal */ + GST_DEBUG_OBJECT (group->dbin, "emitting new-decoded-pad"); + g_signal_emit (G_OBJECT (group->dbin), + gst_decode_bin_signals[SIGNAL_NEW_DECODED_PAD], 0, ghost, + (next == NULL)); + GST_DEBUG_OBJECT (group->dbin, "emitted new-decoded-pad"); + + /* 3. Unblock internal pad */ + GST_DEBUG_OBJECT (dpad->pad, "unblocking"); + gst_pad_set_blocked_async (dpad->pad, FALSE, + (GstPadBlockCallback) source_pad_blocked_cb, dpad); + GST_DEBUG_OBJECT (dpad->pad, "unblocked"); + + } + + group->dbin->activegroup = group; + + /* pop off the first group */ + group->dbin->groups = + g_list_delete_link (group->dbin->groups, group->dbin->groups); + + remove_fakesink (group->dbin); + + group->exposed = TRUE; + + GST_LOG_OBJECT (group->dbin, "signalling no-more-pads"); + gst_element_no_more_pads (GST_ELEMENT (group->dbin)); + + GST_LOG_OBJECT (group->dbin, "Group %p exposed", group); + return TRUE; +} + +static void +gst_decode_group_hide (GstDecodeGroup * group) +{ + GList *tmp; + + GST_LOG ("Hiding group %p", group); + + if (group != group->dbin->activegroup) { + GST_WARNING ("This group is not the active one, aborting"); + return; + } + + GROUP_MUTEX_LOCK (group); + + /* Remove ghost pads */ + for (tmp = group->ghosts; tmp; tmp = g_list_next (tmp)) + gst_element_remove_pad (GST_ELEMENT (group->dbin), (GstPad *) tmp->data); + + g_list_free (group->ghosts); + group->ghosts = NULL; + + group->exposed = FALSE; + + GROUP_MUTEX_UNLOCK (group); + + group->dbin->activegroup = NULL; + group->dbin->oldgroups = g_list_append (group->dbin->oldgroups, group); +} + +static void +deactivate_free_recursive (GstDecodeGroup * group, GstElement * element) +{ + GstIterator *it; + GstIteratorResult res; + gpointer point; + + GST_LOG ("element:%s", GST_ELEMENT_NAME (element)); + + /* call on downstream elements */ + it = gst_element_iterate_src_pads (element); + +restart: + + while (1) { + res = gst_iterator_next (it, &point); + switch (res) { + case GST_ITERATOR_DONE: + goto done; + case GST_ITERATOR_RESYNC: + gst_iterator_resync (it); + goto restart; + case GST_ITERATOR_ERROR: + { + GST_WARNING ("Had an error while iterating source pads of element: %s", + GST_ELEMENT_NAME (element)); + goto beach; + } + case GST_ITERATOR_OK: + { + GstPad *pad = GST_PAD (point); + GstPad *peerpad = NULL; + + if ((peerpad = gst_pad_get_peer (pad))) { + GstObject *parent = gst_pad_get_parent (peerpad); + + if (parent && GST_IS_ELEMENT (parent)) + deactivate_free_recursive (group, GST_ELEMENT (parent)); + if (parent) + gst_object_unref (parent); + } + } + break; + default: + break; + } + } + +done: + gst_element_set_state (element, GST_STATE_NULL); + gst_bin_remove (GST_BIN (group->dbin), element); + +beach: + gst_iterator_free (it); + + return; +} + +static void +gst_decode_group_free (GstDecodeGroup * group) +{ + GList *tmp; + + GST_LOG ("group %p", group); + + GROUP_MUTEX_LOCK (group); + /* Clear all GstDecodePad */ + for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { + GstDecodePad *dpad = (GstDecodePad *) tmp->data; + + g_free (dpad); + } + g_list_free (group->endpads); + group->endpads = NULL; + + /* disconnect signal handlers on multiqueue */ + g_signal_handler_disconnect (group->multiqueue, group->underrunsig); + g_signal_handler_disconnect (group->multiqueue, group->overrunsig); + + /* remove all elements */ + deactivate_free_recursive (group, group->multiqueue); + + GROUP_MUTEX_UNLOCK (group); + + g_mutex_free (group->lock); + g_free (group); +} + +/* gst_decode_group_set_complete: + * + * Mark the group as complete. This means no more streams will be controlled + * through this group. + * + * MT safe + */ +static void +gst_decode_group_set_complete (GstDecodeGroup * group) +{ + GST_LOG_OBJECT (group->dbin, "Setting group %p to COMPLETE", group); + + GROUP_MUTEX_LOCK (group); + group->complete = TRUE; + gst_decode_group_check_if_blocked (group); + GROUP_MUTEX_UNLOCK (group); +} + + + +/************************* + * GstDecodePad functions + *************************/ + +static void +source_pad_blocked_cb (GstPad * pad, gboolean blocked, GstDecodePad * dpad) +{ + GST_LOG_OBJECT (pad, "blocked:%d , dpad:%p, dpad->group:%p", + blocked, dpad, dpad->group); + + /* Update this GstDecodePad status */ + dpad->blocked = blocked; + + if (blocked) { + GROUP_MUTEX_LOCK (dpad->group); + gst_decode_group_check_if_blocked (dpad->group); + GROUP_MUTEX_UNLOCK (dpad->group); + } +} + +static gboolean +source_pad_event_probe (GstPad * pad, GstEvent * event, GstDecodePad * dpad) +{ + GST_LOG_OBJECT (pad, "%s dpad:%p", GST_EVENT_TYPE_NAME (event), dpad); + + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + /* Set our pad as drained */ + dpad->drained = TRUE; + + /* Check if all pads are drained */ + gst_decode_group_check_if_drained (dpad->group); + } + + return TRUE; +} + +/*gst_decode_pad_new: + * + * Creates a new GstDecodePad for the given pad. + * If block is TRUE, Sets the pad blocking asynchronously + */ + +static GstDecodePad * +gst_decode_pad_new (GstDecodeGroup * group, GstPad * pad, gboolean block) +{ + GstDecodePad *dpad; + + dpad = g_new0 (GstDecodePad, 1); + dpad->pad = pad; + dpad->group = group; + dpad->blocked = FALSE; + dpad->drained = TRUE; + + if (block) + gst_pad_set_blocked_async (pad, TRUE, + (GstPadBlockCallback) source_pad_blocked_cb, dpad); + gst_pad_add_event_probe (pad, G_CALLBACK (source_pad_event_probe), dpad); + return dpad; +} + + +/***** + * Element add/remove + *****/ + +/* + * add_fakesink / remove_fakesink + * + * We use a sink so that the parent ::change_state returns GST_STATE_CHANGE_ASYNC + * when that sink is present (since it's not connected to anything it will + * always return GST_STATE_CHANGE_ASYNC). + * + * But this is an ugly way of achieving this goal. + * Ideally, we shouldn't use a sink and just return GST_STATE_CHANGE_ASYNC in + * our ::change_state if we have not exposed the active group. + * We also need to override ::get_state to fake the asynchronous behaviour. + * Once the active group is exposed, we would then post a + * GST_MESSAGE_STATE_DIRTY and return GST_STATE_CHANGE_SUCCESS (which will call + * ::get_state . + */ + +static gboolean +add_fakesink (GstDecodeBin * decode_bin) +{ + GST_DEBUG_OBJECT (decode_bin, "Adding the fakesink"); + + if (decode_bin->fakesink) + return TRUE; + + decode_bin->fakesink = + gst_element_factory_make ("fakesink", "async-fakesink"); + if (!decode_bin->fakesink) + goto no_fakesink; + + /* hacky, remove sink flag, we don't want our decodebin to become a sink + * just because we add a fakesink element to make us ASYNC */ + GST_OBJECT_FLAG_UNSET (decode_bin->fakesink, GST_ELEMENT_IS_SINK); + + if (!gst_bin_add (GST_BIN (decode_bin), decode_bin->fakesink)) + goto could_not_add; + + return TRUE; + + /* ERRORS */ +no_fakesink: + { + g_warning ("can't find fakesink element, decodebin will not work"); + return FALSE; + } +could_not_add: + { + g_warning ("Could not add fakesink to decodebin, decodebin will not work"); + gst_object_unref (decode_bin->fakesink); + decode_bin->fakesink = NULL; + return FALSE; + } +} + +static void +remove_fakesink (GstDecodeBin * decode_bin) +{ + if (decode_bin->fakesink == NULL) + return; + + GST_DEBUG_OBJECT (decode_bin, "Removing the fakesink"); + + gst_element_set_state (decode_bin->fakesink, GST_STATE_NULL); + gst_bin_remove (GST_BIN (decode_bin), decode_bin->fakesink); + decode_bin->fakesink = NULL; + + gst_element_post_message (GST_ELEMENT_CAST (decode_bin), + gst_message_new_state_dirty (GST_OBJECT_CAST (decode_bin))); +} + +/***** + * convenience functions + *****/ + +/* find_sink_pad + * + * Returns the first sink pad of the given element, or NULL if it doesn't have + * any. + */ + +static GstPad * +find_sink_pad (GstElement * element) +{ + GstIterator *it; + GstPad *pad = NULL; + gpointer point; + + it = gst_element_iterate_sink_pads (element); + + if ((gst_iterator_next (it, &point)) == GST_ITERATOR_OK) + pad = (GstPad *) point; + + gst_iterator_free (it); + + return pad; +} + +static GstStateChangeReturn +gst_decode_bin_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstDecodeBin *dbin = GST_DECODE_BIN (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + if (dbin->typefind == NULL) + goto missing_typefind; + break; + case GST_STATE_CHANGE_READY_TO_PAUSED:{ + if (!add_fakesink (dbin)) + goto missing_fakesink; + break; + } + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + /* FIXME : put some cleanup functions here.. if needed */ + + return ret; + +/* ERRORS */ +missing_typefind: + { + GST_ELEMENT_ERROR (dbin, CORE, MISSING_PLUGIN, (NULL), ("no typefind!")); + return GST_STATE_CHANGE_FAILURE; + } +missing_fakesink: + { + GST_ELEMENT_ERROR (dbin, CORE, MISSING_PLUGIN, (NULL), ("no fakesink!")); + return GST_STATE_CHANGE_FAILURE; + } +} + +static gboolean +plugin_init (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (gst_decode_bin_debug, "decodebin2", 0, + "decoder bin"); + + return gst_element_register (plugin, "decodebin2", GST_RANK_NONE, + GST_TYPE_DECODE_BIN); +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "decodebin2", + "decoder bin newer version", plugin_init, VERSION, GST_LICENSE, + GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/decodebin2/gstplay-marshal.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/decodebin2/gstplay-marshal.c Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,167 @@ +#include "gstplay-marshal.h" + +#include + + +#ifdef G_ENABLE_DEBUG +#define g_marshal_value_peek_boolean(v) g_value_get_boolean (v) +#define g_marshal_value_peek_char(v) g_value_get_char (v) +#define g_marshal_value_peek_uchar(v) g_value_get_uchar (v) +#define g_marshal_value_peek_int(v) g_value_get_int (v) +#define g_marshal_value_peek_uint(v) g_value_get_uint (v) +#define g_marshal_value_peek_long(v) g_value_get_long (v) +#define g_marshal_value_peek_ulong(v) g_value_get_ulong (v) +#define g_marshal_value_peek_int64(v) g_value_get_int64 (v) +#define g_marshal_value_peek_uint64(v) g_value_get_uint64 (v) +#define g_marshal_value_peek_enum(v) g_value_get_enum (v) +#define g_marshal_value_peek_flags(v) g_value_get_flags (v) +#define g_marshal_value_peek_float(v) g_value_get_float (v) +#define g_marshal_value_peek_double(v) g_value_get_double (v) +#define g_marshal_value_peek_string(v) (char*) g_value_get_string (v) +#define g_marshal_value_peek_param(v) g_value_get_param (v) +#define g_marshal_value_peek_boxed(v) g_value_get_boxed (v) +#define g_marshal_value_peek_pointer(v) g_value_get_pointer (v) +#define g_marshal_value_peek_object(v) g_value_get_object (v) +#else /* !G_ENABLE_DEBUG */ +/* WARNING: This code accesses GValues directly, which is UNSUPPORTED API. + * Do not access GValues directly in your code. Instead, use the + * g_value_get_*() functions + */ +#define g_marshal_value_peek_boolean(v) (v)->data[0].v_int +#define g_marshal_value_peek_char(v) (v)->data[0].v_int +#define g_marshal_value_peek_uchar(v) (v)->data[0].v_uint +#define g_marshal_value_peek_int(v) (v)->data[0].v_int +#define g_marshal_value_peek_uint(v) (v)->data[0].v_uint +#define g_marshal_value_peek_long(v) (v)->data[0].v_long +#define g_marshal_value_peek_ulong(v) (v)->data[0].v_ulong +#define g_marshal_value_peek_int64(v) (v)->data[0].v_int64 +#define g_marshal_value_peek_uint64(v) (v)->data[0].v_uint64 +#define g_marshal_value_peek_enum(v) (v)->data[0].v_long +#define g_marshal_value_peek_flags(v) (v)->data[0].v_ulong +#define g_marshal_value_peek_float(v) (v)->data[0].v_float +#define g_marshal_value_peek_double(v) (v)->data[0].v_double +#define g_marshal_value_peek_string(v) (v)->data[0].v_pointer +#define g_marshal_value_peek_param(v) (v)->data[0].v_pointer +#define g_marshal_value_peek_boxed(v) (v)->data[0].v_pointer +#define g_marshal_value_peek_pointer(v) (v)->data[0].v_pointer +#define g_marshal_value_peek_object(v) (v)->data[0].v_pointer +#endif /* !G_ENABLE_DEBUG */ + + +/* BOOLEAN:OBJECT (gstplay-marshal.list:1) */ +void +gst_play_marshal_BOOLEAN__OBJECT (GClosure *closure, + GValue *return_value, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint, + gpointer marshal_data) +{ + typedef gboolean (*GMarshalFunc_BOOLEAN__OBJECT) (gpointer data1, + gpointer arg_1, + gpointer data2); + register GMarshalFunc_BOOLEAN__OBJECT callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + gboolean v_return; + + g_return_if_fail (return_value != NULL); + g_return_if_fail (n_param_values == 2); + + if (G_CCLOSURE_SWAP_DATA (closure)) + { + data1 = closure->data; + data2 = g_value_peek_pointer (param_values + 0); + } + else + { + data1 = g_value_peek_pointer (param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_BOOLEAN__OBJECT) (marshal_data ? marshal_data : cc->callback); + + v_return = callback (data1, + g_marshal_value_peek_object (param_values + 1), + data2); + + g_value_set_boolean (return_value, v_return); +} + +/* VOID:OBJECT,BOOLEAN (gstplay-marshal.list:2) */ +void +gst_play_marshal_VOID__OBJECT_BOOLEAN (GClosure *closure, + GValue *return_value, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint, + gpointer marshal_data) +{ + typedef void (*GMarshalFunc_VOID__OBJECT_BOOLEAN) (gpointer data1, + gpointer arg_1, + gboolean arg_2, + gpointer data2); + register GMarshalFunc_VOID__OBJECT_BOOLEAN callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + + g_return_if_fail (n_param_values == 3); + + if (G_CCLOSURE_SWAP_DATA (closure)) + { + data1 = closure->data; + data2 = g_value_peek_pointer (param_values + 0); + } + else + { + data1 = g_value_peek_pointer (param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_VOID__OBJECT_BOOLEAN) (marshal_data ? marshal_data : cc->callback); + + callback (data1, + g_marshal_value_peek_object (param_values + 1), + g_marshal_value_peek_boolean (param_values + 2), + data2); +} + +/* BOOLEAN:OBJECT,POINTER (gstplay-marshal.list:3) */ +void +gst_play_marshal_BOOLEAN__OBJECT_POINTER (GClosure *closure, + GValue *return_value, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint, + gpointer marshal_data) +{ + typedef gboolean (*GMarshalFunc_BOOLEAN__OBJECT_POINTER) (gpointer data1, + gpointer arg_1, + gpointer arg_2, + gpointer data2); + register GMarshalFunc_BOOLEAN__OBJECT_POINTER callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + gboolean v_return; + + g_return_if_fail (return_value != NULL); + g_return_if_fail (n_param_values == 3); + + if (G_CCLOSURE_SWAP_DATA (closure)) + { + data1 = closure->data; + data2 = g_value_peek_pointer (param_values + 0); + } + else + { + data1 = g_value_peek_pointer (param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_BOOLEAN__OBJECT_POINTER) (marshal_data ? marshal_data : cc->callback); + + v_return = callback (data1, + g_marshal_value_peek_object (param_values + 1), + g_marshal_value_peek_pointer (param_values + 2), + data2); + + g_value_set_boolean (return_value, v_return); +} + diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/decodebin2/gstplay-marshal.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/decodebin2/gstplay-marshal.h Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,36 @@ + +#ifndef __gst_play_marshal_MARSHAL_H__ +#define __gst_play_marshal_MARSHAL_H__ + +#include + +G_BEGIN_DECLS + +/* BOOLEAN:OBJECT (gstplay-marshal.list:1) */ +extern void gst_play_marshal_BOOLEAN__OBJECT (GClosure *closure, + GValue *return_value, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint, + gpointer marshal_data); + +/* VOID:OBJECT,BOOLEAN (gstplay-marshal.list:2) */ +extern void gst_play_marshal_VOID__OBJECT_BOOLEAN (GClosure *closure, + GValue *return_value, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint, + gpointer marshal_data); + +/* BOOLEAN:OBJECT,POINTER (gstplay-marshal.list:3) */ +extern void gst_play_marshal_BOOLEAN__OBJECT_POINTER (GClosure *closure, + GValue *return_value, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint, + gpointer marshal_data); + +G_END_DECLS + +#endif /* __gst_play_marshal_MARSHAL_H__ */ + diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/multiqueue/Makefile.am --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/multiqueue/Makefile.am Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,24 @@ +plugin_LTLIBRARIES = libgstmultiqueue.la + +libgstmultiqueue_la_SOURCES = \ + gstmultiqueue.c \ + gstdataqueue.c + +libgstmultiqueue_la_CFLAGS = \ + $(GST_CFLAGS) \ + $(GST_BASE_CFLAGS) \ + $(GST_PLUGINS_BASE_CFLAGS) + +libgstmultiqueue_la_LIBADD = \ + $(GST_LIBS_LIBS) + +libgstmultiqueue_la_LDFLAGS = \ + $(GST_LIBS) \ + $(GST_PLUGIN_LDFLAGS) \ + $(GST_BASE_LIBS) \ + $(GST_PLUGINS_BASE_LIBS) + +noinst_HEADERS = \ + gstmultiqueue.h \ + gstdataqueue.h + diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/multiqueue/gstdataqueue.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/multiqueue/gstdataqueue.c Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,628 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstdataqueue.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:gstdataqueue + * @short_description: Threadsafe queueing object + * + * #GstDataQueue is an object that handles threadsafe queueing of objects. It + * also provides size-related functionality. This object should be used for + * any #GstElement that wishes to provide some sort of queueing functionality. + */ + +#include +#include "gstdataqueue.h" + +GST_DEBUG_CATEGORY_STATIC (data_queue_debug); +#define GST_CAT_DEFAULT (data_queue_debug) +GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow); + + +/* Queue signals and args */ +enum +{ + SIGNAL_EMPTY, + SIGNAL_FULL, + LAST_SIGNAL +}; + +enum +{ + ARG_0, + ARG_CUR_LEVEL_VISIBLE, + ARG_CUR_LEVEL_BYTES, + ARG_CUR_LEVEL_TIME + /* FILL ME */ +}; + +#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ + GST_CAT_LOG (data_queue_dataflow, \ + "locking qlock from thread %p", \ + g_thread_self ()); \ + g_mutex_lock (q->qlock); \ + GST_CAT_LOG (data_queue_dataflow, \ + "locked qlock from thread %p", \ + g_thread_self ()); \ +} G_STMT_END + +#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \ + GST_DATA_QUEUE_MUTEX_LOCK (q); \ + if (q->flushing) \ + goto label; \ + } G_STMT_END + +#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ + GST_CAT_LOG (data_queue_dataflow, \ + "unlocking qlock from thread %p", \ + g_thread_self ()); \ + g_mutex_unlock (q->qlock); \ +} G_STMT_END + +#define STATUS(q, msg) \ + GST_CAT_LOG (data_queue_dataflow, \ + "queue:%p " msg ": %u visible items, %u " \ + "bytes, %"G_GUINT64_FORMAT \ + " ns, %u elements", \ + queue, \ + q->cur_level.visible, \ + q->cur_level.bytes, \ + q->cur_level.time, \ + q->queue->length) + +static void gst_data_queue_base_init (GstDataQueueClass * klass); +static void gst_data_queue_class_init (GstDataQueueClass * klass); +static void gst_data_queue_init (GstDataQueue * queue); +static void gst_data_queue_finalize (GObject * object); + +static void gst_data_queue_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_data_queue_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec); + +static GObjectClass *parent_class = NULL; +static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 }; + +GType +gst_data_queue_get_type (void) +{ + static GType queue_type = 0; + + if (!queue_type) { + static const GTypeInfo queue_info = { + sizeof (GstDataQueueClass), + (GBaseInitFunc) gst_data_queue_base_init, + NULL, + (GClassInitFunc) gst_data_queue_class_init, + NULL, + NULL, + sizeof (GstDataQueue), + 0, + (GInstanceInitFunc) gst_data_queue_init, + NULL + }; + + queue_type = g_type_register_static (G_TYPE_OBJECT, + "GstDataQueue", &queue_info, 0); + GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, + "data queue object"); + GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, + "dataflow inside the data queue object"); + } + + return queue_type; +} + +static void +gst_data_queue_base_init (GstDataQueueClass * klass) +{ + /* Do we need anything here ?? */ + return; +} + +static void +gst_data_queue_class_init (GstDataQueueClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + parent_class = g_type_class_peek_parent (klass); + + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property); + + /* signals */ + /** + * GstDataQueue::empty: + * @queue: the queue instance + * + * Reports that the queue became empty (empty). + * A queue is empty if the total amount of visible items inside it (num-visible, time, + * size) is lower than the boundary values which can be set through the GObject + * properties. + */ + gst_data_queue_signals[SIGNAL_EMPTY] = + g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /** + * GstDataQueue::full: + * @queue: the queue instance + * + * Reports that the queue became full (full). + * A queue is full if the total amount of data inside it (num-visible, time, + * size) is higher than the boundary values which can be set through the GObject + * properties. + */ + gst_data_queue_signals[SIGNAL_FULL] = + g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /* properties */ + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, + g_param_spec_uint ("current-level-bytes", "Current level (kB)", + "Current amount of data in the queue (bytes)", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE, + g_param_spec_uint ("current-level-visible", + "Current level (visible items)", + "Current number of visible items in the queue", 0, G_MAXUINT, 0, + G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current level (ns)", + "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0, + G_PARAM_READABLE)); + + /* set several parent class virtual functions */ + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize); + +} + +static void +gst_data_queue_init (GstDataQueue * queue) +{ + queue->cur_level.visible = 0; /* no content */ + queue->cur_level.bytes = 0; /* no content */ + queue->cur_level.time = 0; /* no content */ + + queue->checkfull = NULL; + + queue->qlock = g_mutex_new (); + queue->item_add = g_cond_new (); + queue->item_del = g_cond_new (); + queue->queue = g_queue_new (); + + GST_DEBUG ("initialized queue's not_empty & not_full conditions"); +} + +/** + * gst_data_queue_new: + * @checkfull: the callback used to tell if the element considers the queue full + * or not. + * @checkdata: a #gpointer that will be given in the @checkfull callback. + * + * Returns: a new #GstDataQueue. + */ + +GstDataQueue * +gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata) +{ + GstDataQueue *ret; + + g_return_val_if_fail (checkfull != NULL, NULL); + + ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL); + ret->checkfull = checkfull; + ret->checkdata = checkdata; + + return ret; +} + +static void +gst_data_queue_cleanup (GstDataQueue * queue) +{ + while (!g_queue_is_empty (queue->queue)) { + GstDataQueueItem *item = g_queue_pop_head (queue->queue); + + /* Just call the destroy notify on the item */ + item->destroy (item); + } + queue->cur_level.visible = 0; + queue->cur_level.bytes = 0; + queue->cur_level.time = 0; +} + +/* called only once, as opposed to dispose */ +static void +gst_data_queue_finalize (GObject * object) +{ + GstDataQueue *queue = GST_DATA_QUEUE (object); + + GST_DEBUG ("finalizing queue"); + + gst_data_queue_cleanup (queue); + g_queue_free (queue->queue); + + GST_DEBUG ("free mutex"); + g_mutex_free (queue->qlock); + GST_DEBUG ("done free mutex"); + + g_cond_free (queue->item_add); + g_cond_free (queue->item_del); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_data_queue_locked_flush (GstDataQueue * queue) +{ + STATUS (queue, "before flushing"); + gst_data_queue_cleanup (queue); + STATUS (queue, "after flushing"); + /* we deleted something... */ + g_cond_signal (queue->item_del); +} + +static gboolean +gst_data_queue_locked_is_empty (GstDataQueue * queue) +{ + return (queue->queue->length == 0); +} + +static gboolean +gst_data_queue_locked_is_full (GstDataQueue * queue) +{ + return queue->checkfull (queue, queue->cur_level.visible, + queue->cur_level.bytes, queue->cur_level.time, queue->checkdata); +} + +/** + * gst_data_queue_flush: + * @queue: a #GstDataQueue. + * + * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and + * #gst_data_queue_pop will be released. + * MT safe. + */ +void +gst_data_queue_flush (GstDataQueue * queue) +{ + GST_DEBUG ("queue:%p", queue); + GST_DATA_QUEUE_MUTEX_LOCK (queue); + gst_data_queue_locked_flush (queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} + +/** + * gst_data_queue_is_empty: + * @queue: a #GstDataQueue. + * + * Queries if there are any items in the @queue. + * MT safe. + * + * Returns: #TRUE if @queue is empty. + */ +gboolean +gst_data_queue_is_empty (GstDataQueue * queue) +{ + gboolean res; + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + res = gst_data_queue_locked_is_empty (queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + return res; +} + +/** + * gst_data_queue_is_full: + * @queue: a #GstDataQueue. + * + * Queries if @queue is full. This check will be done using the + * #GstDataQueueCheckFullCallback registered with @queue. + * MT safe. + * + * Returns: #TRUE if @queue is full. + */ +gboolean +gst_data_queue_is_full (GstDataQueue * queue) +{ + gboolean res; + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + res = gst_data_queue_locked_is_full (queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + return res; +} + +/** + * gst_data_queue_set_flushing: + * @queue: a #GstDataQueue. + * @flushing: a #gboolean stating if the queue will be flushing or not. + * + * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing + * state, any incoming data on the @queue will be discarded. Any call currently + * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight + * away with a return value of #FALSE. While the @queue is in flushing state, + * all calls to those two functions will return #FALSE. + * + * MT Safe. + */ +void +gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) +{ + GST_DEBUG ("queue:%p , flushing:%d", queue, flushing); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + queue->flushing = flushing; + if (flushing) { + /* release push/pop functions */ + g_cond_signal (queue->item_add); + g_cond_signal (queue->item_del); + } + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} + +/** + * gst_data_queue_push: + * @queue: a #GstDataQueue. + * @item: a #GstDataQueueItem. + * + * Pushes a #GstDataQueueItem (or a structure that begins with the same fields) + * on the @queue. If the @queue is full, the call will block until space is + * available, OR the @queue is set to flushing state. + * MT safe. + * + * Note that this function has slightly different semantics than gst_pad_push() + * and gst_pad_push_event(): this function only takes ownership of @item and + * the #GstMiniObject contained in @item if the push was successful. If FALSE + * is returned, the caller is responsible for freeing @item and its contents. + * + * Returns: #TRUE if the @item was successfully pushed on the @queue. + */ +gboolean +gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) +{ + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); + g_return_val_if_fail (item != NULL, FALSE); + + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); + + STATUS (queue, "before pushing"); + + /* We ALWAYS need to check for queue fillness */ + if (gst_data_queue_locked_is_full (queue)) { + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); + + /* signal might have removed some items */ + while (gst_data_queue_locked_is_full (queue)) { + g_cond_wait (queue->item_del, queue->qlock); + if (queue->flushing) + goto flushing; + } + } + + g_queue_push_tail (queue->queue, item); + + if (item->visible) + queue->cur_level.visible++; + queue->cur_level.bytes += item->size; + queue->cur_level.time += item->duration; + + STATUS (queue, "after pushing"); + g_cond_signal (queue->item_add); + + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + return TRUE; + + /* ERRORS */ +flushing: + { + GST_DEBUG ("queue:%p, we are flushing", queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + return FALSE; + } +} + +/** + * gst_data_queue_pop: + * @queue: a #GstDataQueue. + * @item: pointer to store the returned #GstDataQueueItem. + * + * Retrieves the first @item available on the @queue. If the queue is currently + * empty, the call will block until at least one item is available, OR the + * @queue is set to the flushing state. + * MT safe. + * + * Returns: #TRUE if an @item was successfully retrieved from the @queue. + */ +gboolean +gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) +{ + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); + g_return_val_if_fail (item != NULL, FALSE); + + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); + + STATUS (queue, "before popping"); + + if (gst_data_queue_locked_is_empty (queue)) { + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); + + while (gst_data_queue_locked_is_empty (queue)) { + g_cond_wait (queue->item_add, queue->qlock); + if (queue->flushing) + goto flushing; + } + } + + /* Get the item from the GQueue */ + *item = g_queue_pop_head (queue->queue); + + /* update current level counter */ + if ((*item)->visible) + queue->cur_level.visible--; + queue->cur_level.bytes -= (*item)->size; + queue->cur_level.time -= (*item)->duration; + + STATUS (queue, "after popping"); + g_cond_signal (queue->item_del); + + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + return TRUE; + + /* ERRORS */ +flushing: + { + GST_DEBUG ("queue:%p, we are flushing", queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + return FALSE; + } +} + +/** + * gst_data_queue_drop_head: + * @queue: The #GstDataQueue to drop an item from. + * @type: The #GType of the item to drop. + * + * Pop and unref the head-most #GstMiniObject with the given #GType. + * + * Returns: TRUE if an element was removed. + */ +gboolean +gst_data_queue_drop_head (GstDataQueue * queue, GType type) +{ + gboolean res = FALSE; + GList *item; + GstDataQueueItem *leak = NULL; + + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); + + GST_DEBUG ("queue:%p", queue); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) { + GstDataQueueItem *tmp = (GstDataQueueItem *) item->data; + + if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) { + leak = tmp; + break; + } + } + + if (!leak) + goto done; + + g_queue_delete_link (queue->queue, item); + + if (leak->visible) + queue->cur_level.visible--; + queue->cur_level.bytes -= leak->size; + queue->cur_level.time -= leak->duration; + + leak->destroy (leak); + + res = TRUE; + +done: + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + GST_DEBUG ("queue:%p , res:%d", queue, res); + + return res; +} + +/** + * gst_data_queue_limits_changed: + * @queue: The #GstDataQueue + * + * Inform the queue that the limits for the fullness check have changed and that + * any blocking gst_data_queue_push() should be unblocked to recheck the limts. + */ +void +gst_data_queue_limits_changed (GstDataQueue * queue) +{ + g_return_if_fail (GST_IS_DATA_QUEUE (queue)); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG ("signal del"); + g_cond_signal (queue->item_del); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} + +/** + * gst_data_queue_get_level: + * @queue: The #GstDataQueue + * @level: the location to store the result + * + * Get the current level of the queue. + */ +void +gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level) +{ + level->visible = queue->cur_level.visible; + level->bytes = queue->cur_level.bytes; + level->time = queue->cur_level.time; +} + +static void +gst_data_queue_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_data_queue_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstDataQueue *queue = GST_DATA_QUEUE (object); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + + switch (prop_id) { + case ARG_CUR_LEVEL_BYTES: + g_value_set_uint (value, queue->cur_level.bytes); + break; + case ARG_CUR_LEVEL_VISIBLE: + g_value_set_uint (value, queue->cur_level.visible); + break; + case ARG_CUR_LEVEL_TIME: + g_value_set_uint64 (value, queue->cur_level.time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/multiqueue/gstdataqueue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/multiqueue/gstdataqueue.h Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,159 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstdataqueue.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_DATA_QUEUE_H__ +#define __GST_DATA_QUEUE_H__ + +#include + +G_BEGIN_DECLS +#define GST_TYPE_DATA_QUEUE \ + (gst_data_queue_get_type()) +#define GST_DATA_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_DATA_QUEUE,GstDataQueue)) +#define GST_DATA_QUEUE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_DATA_QUEUE,GstDataQueueClass)) +#define GST_IS_DATA_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_DATA_QUEUE)) +#define GST_IS_DATA_QUEUE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_DATA_QUEUE)) +typedef struct _GstDataQueue GstDataQueue; +typedef struct _GstDataQueueClass GstDataQueueClass; +typedef struct _GstDataQueueSize GstDataQueueSize; +typedef struct _GstDataQueueItem GstDataQueueItem; + +/** + * GstDataQueueItem: + * @object: the #GstMiniObject to queue. + * @size: the size in bytes of the miniobject. + * @duration: the duration in #GstClockTime of the miniobject. Can not be + * #GST_CLOCK_TIME_NONE. + * @visible: #TRUE if @object should be considered as a visible object. + * @destroy: The #GDestroyNotify function to use to free the #GstDataQueueItem. + * This function should also drop the reference to @object the owner of the + * #GstDataQueueItem is assumed to hold. + * + * Structure used by #GstDataQueue. You can supply a different structure, as + * long as the top of the structure is identical to this structure. + */ + +struct _GstDataQueueItem +{ + GstMiniObject *object; + guint size; + guint64 duration; + gboolean visible; + + /* user supplied destroy function */ + GDestroyNotify destroy; +}; + +/** + * GstDataQueueSize: + * @visible: number of buffers + * @bytes: number of bytes + * @time: amount of time + * + * Structure describing the size of a queue. + */ +struct _GstDataQueueSize +{ + guint visible; + guint bytes; + guint64 time; +}; + +/** + * GstDataQueueCheckFullFunction: + * @queue: a #GstDataQueue. + * @visible: The number of visible items currently in the queue. + * @bytes: The amount of bytes currently in the queue. + * @time: The accumulated duration of the items currently in the queue. + * @checkdata: The #gpointer registered when the #GstDataQueue was created. + * + * The prototype of the function used to inform the queue that it should be + * considered as full. + * + * Returns: #TRUE if the queue should be considered full. + */ +typedef gboolean (*GstDataQueueCheckFullFunction) (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata); + +/** + * GstDataQueue: + * + * Opaque #GstDataQueue structure. + */ +struct _GstDataQueue +{ + GObject object; + + /*< private > */ + /* the queue of data we're keeping our grubby hands on */ + GQueue *queue; + + GstDataQueueSize cur_level; /* size of the queue */ + GstDataQueueCheckFullFunction checkfull; /* Callback to check if the queue is full */ + gpointer *checkdata; + + GMutex *qlock; /* lock for queue (vs object lock) */ + GCond *item_add; /* signals buffers now available for reading */ + GCond *item_del; /* signals space now available for writing */ + gboolean flushing; /* indicates whether conditions where signalled because + * of external flushing */ + + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstDataQueueClass +{ + GObjectClass parent_class; + + /* signals */ + void (*empty) (GstDataQueue * queue); + void (*full) (GstDataQueue * queue); + + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_data_queue_get_type (void); + +GstDataQueue * gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, + gpointer checkdata); + +gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); +gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); + +void gst_data_queue_flush (GstDataQueue * queue); +void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); + +gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); + +gboolean gst_data_queue_is_full (GstDataQueue * queue); +gboolean gst_data_queue_is_empty (GstDataQueue * queue); + +void gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize *level); +void gst_data_queue_limits_changed (GstDataQueue * queue); + +G_END_DECLS + +#endif /* __GST_DATA_QUEUE_H__ */ diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/multiqueue/gstmultiqueue.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/multiqueue/gstmultiqueue.c Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,1397 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * Copyright (C) 2007 Jan Schmidt + * Copyright (C) 2007 Wim Taymans + * + * gstmultiqueue.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include "gstmultiqueue.h" + +/** + * GstSingleQueue: + * @sinkpad: associated sink #GstPad + * @srcpad: associated source #GstPad + * + * Structure containing all information and properties about + * a single queue. + */ +typedef struct _GstSingleQueue GstSingleQueue; + +struct _GstSingleQueue +{ + /* unique identifier of the queue */ + guint id; + + GstMultiQueue *mqueue; + + GstPad *sinkpad; + GstPad *srcpad; + + /* flowreturn of previous srcpad push */ + GstFlowReturn srcresult; + GstSegment sink_segment; + GstSegment src_segment; + + /* queue of data */ + GstDataQueue *queue; + GstDataQueueSize max_size, extra_size; + GstClockTime cur_time; + gboolean is_eos; + gboolean inextra; /* TRUE if the queue is currently in extradata mode */ + + /* Protected by global lock */ + guint32 nextid; /* ID of the next object waiting to be pushed */ + guint32 oldid; /* ID of the last object pushed (last in a series) */ + GCond *turn; /* SingleQueue turn waiting conditional */ +}; + + +/* Extension of GstDataQueueItem structure for our usage */ +typedef struct _GstMultiQueueItem GstMultiQueueItem; + +struct _GstMultiQueueItem +{ + GstMiniObject *object; + guint size; + guint64 duration; + gboolean visible; + + GDestroyNotify destroy; + guint32 posid; +}; + +static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); +static void gst_single_queue_free (GstSingleQueue * squeue); + +static void wake_up_next_non_linked (GstMultiQueue * mq); +static void compute_high_id (GstMultiQueue * mq); + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (multi_queue_debug); +#define GST_CAT_DEFAULT (multi_queue_debug) + +/* default limits, we try to keep up to 2 seconds of data and if there is not + * time, up to 10 MB. The number of buffers is dynamically scaled to make sure + * there is data in the queues. Normally, the byte and time limits are not hit + * in theses conditions. */ +#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ +#define DEFAULT_MAX_SIZE_BUFFERS 5 +#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND + +/* second limits. When we hit one of the above limits we are probably dealing + * with a badly muxed file and we scale the limits to these emergency values. + * This is currently not yet implemented. */ +#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ +#define DEFAULT_EXTRA_SIZE_BUFFERS 5 +#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND + +/* Signals and args */ +enum +{ + SIGNAL_UNDERRUN, + SIGNAL_OVERRUN, + LAST_SIGNAL +}; + +enum +{ + ARG_0, + ARG_EXTRA_SIZE_BYTES, + ARG_EXTRA_SIZE_BUFFERS, + ARG_EXTRA_SIZE_TIME, + ARG_MAX_SIZE_BYTES, + ARG_MAX_SIZE_BUFFERS, + ARG_MAX_SIZE_TIME, +}; + + +static const GstElementDetails gst_multiqueue_details = +GST_ELEMENT_DETAILS ("MultiQueue", + "Generic", + "Multiple data queue", + "Edward Hervey "); + + +#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ + g_mutex_lock (q->qlock); \ +} G_STMT_END + +#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ + g_mutex_unlock (q->qlock); \ +} G_STMT_END + +static void gst_multi_queue_finalize (GObject * object); +static void gst_multi_queue_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_multi_queue_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec); + +static GstPad *gst_multi_queue_request_new_pad (GstElement * element, + GstPadTemplate * temp, const gchar * name); +static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad); + +static void gst_multi_queue_loop (GstPad * pad); + +#define _do_init(bla) \ + GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element"); + +GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement, + GST_TYPE_ELEMENT, _do_init); + +static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 }; + + + +static void +gst_multi_queue_base_init (gpointer g_class) +{ + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_set_details (gstelement_class, &gst_multiqueue_details); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&sinktemplate)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&srctemplate)); +} + +static void +gst_multi_queue_class_init (GstMultiQueueClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->set_property = + GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); + gobject_class->get_property = + GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); + + /* SIGNALS */ + gst_multi_queue_signals[SIGNAL_UNDERRUN] = + g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + gst_multi_queue_signals[SIGNAL_OVERRUN] = + g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /* PROPERTIES */ + + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, + g_param_spec_uint ("max-size-bytes", "Max. size (kB)", + "Max. amount of data in the queue (bytes, 0=disable)", + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, + g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", + "Max. number of buffers in the queue (0=disable)", + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, + g_param_spec_uint64 ("max-size-time", "Max. size (ns)", + "Max. amount of data in the queue (in ns, 0=disable)", + 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, + g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", + "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", + 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, + g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", + "Amount of buffers the queues can grow if one of them is empty (0=disable)", + 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, + g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", + "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", + 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE)); + + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); + + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad); +} + +static void +gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) +{ + mqueue->nbqueues = 0; + mqueue->queues = NULL; + + mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES; + mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS; + mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME; + + mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES; + mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS; + mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME; + + mqueue->counter = 1; + mqueue->highid = -1; + mqueue->nextnotlinked = -1; + + mqueue->qlock = g_mutex_new (); +} + +static void +gst_multi_queue_finalize (GObject * object) +{ + GstMultiQueue *mqueue = GST_MULTI_QUEUE (object); + + g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL); + g_list_free (mqueue->queues); + mqueue->queues = NULL; + + /* free/unref instance data */ + g_mutex_free (mqueue->qlock); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + + + +#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ + GList * tmp = mq->queues; \ + while (tmp) { \ + GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ + q->max_size.format = mq->max_size.format; \ + tmp = g_list_next(tmp); \ + }; \ +} G_STMT_END + +static void +gst_multi_queue_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstMultiQueue *mq = GST_MULTI_QUEUE (object); + + switch (prop_id) { + case ARG_MAX_SIZE_BYTES: + mq->max_size.bytes = g_value_get_uint (value); + SET_CHILD_PROPERTY (mq, bytes); + break; + case ARG_MAX_SIZE_BUFFERS: + mq->max_size.visible = g_value_get_uint (value); + SET_CHILD_PROPERTY (mq, visible); + break; + case ARG_MAX_SIZE_TIME: + mq->max_size.time = g_value_get_uint64 (value); + SET_CHILD_PROPERTY (mq, time); + break; + case ARG_EXTRA_SIZE_BYTES: + mq->extra_size.bytes = g_value_get_uint (value); + break; + case ARG_EXTRA_SIZE_BUFFERS: + mq->extra_size.visible = g_value_get_uint (value); + break; + case ARG_EXTRA_SIZE_TIME: + mq->extra_size.time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_multi_queue_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstMultiQueue *mq = GST_MULTI_QUEUE (object); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + switch (prop_id) { + case ARG_EXTRA_SIZE_BYTES: + g_value_set_uint (value, mq->extra_size.bytes); + break; + case ARG_EXTRA_SIZE_BUFFERS: + g_value_set_uint (value, mq->extra_size.visible); + break; + case ARG_EXTRA_SIZE_TIME: + g_value_set_uint64 (value, mq->extra_size.time); + break; + case ARG_MAX_SIZE_BYTES: + g_value_set_uint (value, mq->max_size.bytes); + break; + case ARG_MAX_SIZE_BUFFERS: + g_value_set_uint (value, mq->max_size.visible); + break; + case ARG_MAX_SIZE_TIME: + g_value_set_uint64 (value, mq->max_size.time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); +} + +GList * +gst_multi_queue_get_internal_links (GstPad * pad) +{ + GList *res = NULL; + GstMultiQueue *mqueue; + GstSingleQueue *sq = NULL; + GList *tmp; + + g_return_val_if_fail (GST_IS_PAD (pad), NULL); + + mqueue = GST_MULTI_QUEUE (GST_PAD_PARENT (pad)); + if (!mqueue) + goto no_parent; + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + /* Find which single queue it belongs to */ + for (tmp = mqueue->queues; tmp && !res; tmp = g_list_next (tmp)) { + sq = (GstSingleQueue *) tmp->data; + + if (sq->sinkpad == pad) + res = g_list_prepend (res, sq->srcpad); + if (sq->srcpad == pad) + res = g_list_prepend (res, sq->sinkpad); + } + + if (!res) + GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + + return res; + +no_parent: + { + GST_DEBUG_OBJECT (pad, "no parent"); + return NULL; + } +} + + +/* + * GstElement methods + */ + +static GstPad * +gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp, + const gchar * name) +{ + GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); + GstSingleQueue *squeue; + + GST_LOG_OBJECT (element, "name : %s", name); + + /* Create a new single queue, add the sink and source pad and return the sink pad */ + squeue = gst_single_queue_new (mqueue); + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + mqueue->queues = g_list_append (mqueue->queues, squeue); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + + /* + GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s", + GST_DEBUG_PAD_NAME (squeue->sinkpad)); + */ + return squeue->sinkpad; +} + +static void +gst_multi_queue_release_pad (GstElement * element, GstPad * pad) +{ + GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); + GstSingleQueue *sq = NULL; + GList *tmp; + +// GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + /* Find which single queue it belongs to, knowing that it should be a sinkpad */ + for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) { + sq = (GstSingleQueue *) tmp->data; + + if (sq->sinkpad == pad) + break; + } + + if (!tmp) { + GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + return; + } + + /* FIXME: The removal of the singlequeue should probably not happen until it + * finishes draining */ + + /* remove it from the list */ + mqueue->queues = g_list_delete_link (mqueue->queues, tmp); + + /* FIXME : recompute next-non-linked */ + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + + /* delete SingleQueue */ + gst_data_queue_set_flushing (sq->queue, TRUE); + + gst_pad_set_active (sq->srcpad, FALSE); + gst_pad_set_active (sq->sinkpad, FALSE); + gst_element_remove_pad (element, sq->srcpad); + gst_element_remove_pad (element, sq->sinkpad); + gst_single_queue_free (sq); +} + +static gboolean +gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) +{ + gboolean result; + + GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"), + sq->id); + + if (flush) { + sq->srcresult = GST_FLOW_WRONG_STATE; + gst_data_queue_set_flushing (sq->queue, TRUE); + + /* wake up non-linked task */ + GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", + sq->id); + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + g_cond_signal (sq->turn); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); + result = gst_pad_pause_task (sq->srcpad); + } else { + gst_data_queue_flush (sq->queue); + gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); + gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); + /* All pads start off not-linked for a smooth kick-off */ + sq->srcresult = GST_FLOW_NOT_LINKED; + sq->cur_time = 0; + sq->max_size.visible = mq->max_size.visible; + sq->is_eos = FALSE; + sq->inextra = FALSE; + sq->nextid = 0; + sq->oldid = 0; + gst_data_queue_set_flushing (sq->queue, FALSE); + + GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); + result = + gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, + sq->srcpad); + } + return result; +} + +/* calculate the diff between running time on the sink and src of the queue. + * This is the total amount of time in the queue. + * WITH LOCK TAKEN */ +static void +update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) +{ + gint64 sink_time, src_time; + + sink_time = + gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, + sq->sink_segment.last_stop); + + src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, + sq->src_segment.last_stop); + + GST_DEBUG_OBJECT (mq, + "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, + GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); + + /* This allows for streams with out of order timestamping - sometimes the + * emerging timestamp is later than the arriving one(s) */ + if (sink_time >= src_time) + sq->cur_time = sink_time - src_time; + else + sq->cur_time = 0; +} + +/* take a NEWSEGMENT event and apply the values to segment, updating the time + * level of queue. */ +static void +apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, + GstSegment * segment) +{ + gboolean update; + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time; + + gst_event_parse_new_segment_full (event, &update, &rate, &arate, + &format, &start, &stop, &time); + + /* now configure the values, we use these to track timestamps on the + * sinkpad. */ + if (format != GST_FORMAT_TIME) { + /* non-time format, pretent the current time segment is closed with a + * 0 start and unknown stop time. */ + update = FALSE; + format = GST_FORMAT_TIME; + start = 0; + stop = -1; + time = 0; + } + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + gst_segment_set_newsegment_full (segment, update, + rate, arate, format, start, stop, time); + + GST_DEBUG_OBJECT (mq, + "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment); + + /* segment can update the time level of the queue */ + update_time_level (mq, sq); + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); +} + +/* take a buffer and update segment, updating the time level of the queue. */ +static void +apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, + GstClockTime duration, GstSegment * segment) +{ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + /* if no timestamp is set, assume it's continuous with the previous + * time */ + if (timestamp == GST_CLOCK_TIME_NONE) + timestamp = segment->last_stop; + + /* add duration */ + if (duration != GST_CLOCK_TIME_NONE) + timestamp += duration; + + GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT, + sq->id, GST_TIME_ARGS (timestamp)); + + gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp); + + /* calc diff with other end */ + update_time_level (mq, sq); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); +} + +static GstFlowReturn +gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, + GstMiniObject * object) +{ + GstFlowReturn result = GST_FLOW_OK; + + if (GST_IS_BUFFER (object)) { + GstBuffer *buffer; + GstClockTime timestamp, duration; + + buffer = GST_BUFFER_CAST (object); + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + + apply_buffer (mq, sq, timestamp, duration, &sq->src_segment); + + /* Applying the buffer may have made the queue non-full again, unblock it if needed */ + gst_data_queue_limits_changed (sq->queue); + + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, + sq->id, buffer, GST_TIME_ARGS (timestamp)); + + result = gst_pad_push (sq->srcpad, buffer); + } else if (GST_IS_EVENT (object)) { + GstEvent *event; + + event = GST_EVENT_CAST (object); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + result = GST_FLOW_UNEXPECTED; + break; + case GST_EVENT_NEWSEGMENT: + apply_segment (mq, sq, event, &sq->src_segment); + /* Applying the segment may have made the queue non-full again, unblock it if needed */ + gst_data_queue_limits_changed (sq->queue); + break; + default: + break; + } + + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Pushing event %p of type %s", + sq->id, event, GST_EVENT_TYPE_NAME (event)); + + gst_pad_push_event (sq->srcpad, event); + } else { + g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", + sq->id); + } + return result; + + /* ERRORS */ +} + +static GstMiniObject * +gst_multi_queue_item_steal_object (GstMultiQueueItem * item) +{ + GstMiniObject *res; + + res = item->object; + item->object = NULL; + + return res; +} + +static void +gst_multi_queue_item_destroy (GstMultiQueueItem * item) +{ + if (item->object) + gst_mini_object_unref (item->object); + g_free (item); +} + +/* takes ownership of passed mini object! */ +static GstMultiQueueItem * +gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) +{ + GstMultiQueueItem *item; + + item = g_new (GstMultiQueueItem, 1); + item->object = object; + item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; + item->posid = curid; + + if (GST_IS_BUFFER (object)) { + item->size = GST_BUFFER_SIZE (object); + item->duration = GST_BUFFER_DURATION (object); + if (item->duration == GST_CLOCK_TIME_NONE) + item->duration = 0; + item->visible = TRUE; + } else { + item->size = 0; + item->duration = 0; + item->visible = FALSE; + } + return item; +} + +/* Each main loop attempts to push buffers until the return value + * is not-linked. not-linked pads are not allowed to push data beyond + * any linked pads, so they don't 'rush ahead of the pack'. + */ +static void +gst_multi_queue_loop (GstPad * pad) +{ + GstSingleQueue *sq; + GstMultiQueueItem *item; + GstDataQueueItem *sitem; + GstMultiQueue *mq; + GstMiniObject *object; + guint32 newid; + guint32 oldid = -1; + GstFlowReturn result; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = sq->mqueue; + + do { + GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); + + /* Get something from the queue, blocking until that happens, or we get + * flushed */ + if (!(gst_data_queue_pop (sq->queue, &sitem))) + goto out_flushing; + + item = (GstMultiQueueItem *) sitem; + newid = item->posid; + + /* steal the object and destroy the item */ + object = gst_multi_queue_item_steal_object (item); + gst_multi_queue_item_destroy (item); + + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", + sq->id, newid, oldid); + + /* If we're not-linked, we do some extra work because we might need to + * wait before pushing. If we're linked but there's a gap in the IDs, + * or it's the first loop, or we just passed the previous highid, + * we might need to wake some sleeping pad up, so there's extra work + * there too */ + if (sq->srcresult == GST_FLOW_NOT_LINKED || + (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) { + GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + /* Update the nextid so other threads know when to wake us up */ + sq->nextid = newid; + + /* Update the oldid (the last ID we output) for highid tracking */ + if (oldid != -1) + sq->oldid = oldid; + + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + /* Go to sleep until it's time to push this buffer */ + + /* Recompute the highid */ + compute_high_id (mq); + while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " + "newid %u and highid %u", sq->id, newid, mq->highid); + + + /* Wake up all non-linked pads before we sleep */ + wake_up_next_non_linked (mq); + + mq->numwaiting++; + g_cond_wait (sq->turn, mq->qlock); + mq->numwaiting--; + + GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " + "wakeup with newid %u and highid %u", sq->id, newid, mq->highid); + } + + /* Re-compute the high_id in case someone else pushed */ + compute_high_id (mq); + } else { + compute_high_id (mq); + /* Wake up all non-linked pads */ + wake_up_next_non_linked (mq); + } + /* We're done waiting, we can clear the nextid */ + sq->nextid = 0; + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } + + GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + /* Try to push out the new object */ + result = gst_single_queue_push_one (mq, sq, object); + sq->srcresult = result; + + if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED) + goto out_flushing; + + GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + oldid = newid; + } + while (TRUE); + +out_flushing: + { + /* Need to make sure wake up any sleeping pads when we exit */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + compute_high_id (mq); + wake_up_next_non_linked (mq); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + gst_data_queue_set_flushing (sq->queue, TRUE); + gst_pad_pause_task (sq->srcpad); + GST_CAT_LOG_OBJECT (multi_queue_debug, mq, + "SingleQueue[%d] task paused, reason:%s", + sq->id, gst_flow_get_name (sq->srcresult)); + return; + } +} + +/** + * gst_multi_queue_chain: + * + * This is similar to GstQueue's chain function, except: + * _ we don't have leak behavioures, + * _ we push with a unique id (curid) + */ +static GstFlowReturn +gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) +{ + GstSingleQueue *sq; + GstMultiQueue *mq; + GstMultiQueueItem *item; + GstFlowReturn ret = GST_FLOW_OK; + guint32 curid; + GstClockTime timestamp, duration; + + sq = gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + + /* Get a unique incrementing id */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + curid = mq->counter++; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d", + sq->id, buffer, curid); + + item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid); + + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + + if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) + goto flushing; + + /* update time level, we must do this after pushing the data in the queue so + * that we never end up filling the queue first. */ + apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment); + +done: + gst_object_unref (mq); + + return ret; + + /* ERRORS */ +flushing: + { + ret = sq->srcresult; + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", + sq->id, gst_flow_get_name (ret)); + gst_multi_queue_item_destroy (item); + goto done; + } +} + +static gboolean +gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) +{ + GstSingleQueue *sq; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + + if (active) { + /* All pads start off not-linked for a smooth kick-off */ + sq->srcresult = GST_FLOW_NOT_LINKED; + } else { + sq->srcresult = GST_FLOW_WRONG_STATE; + gst_data_queue_flush (sq->queue); + } + return TRUE; +} + +static gboolean +gst_multi_queue_sink_event (GstPad * pad, GstEvent * event) +{ + GstSingleQueue *sq; + GstMultiQueue *mq; + guint32 curid; + GstMultiQueueItem *item; + gboolean res; + GstEventType type; + GstEvent *sref = NULL; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + + type = GST_EVENT_TYPE (event); + + switch (type) { + case GST_EVENT_FLUSH_START: + GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", + sq->id); + + res = gst_pad_push_event (sq->srcpad, event); + + gst_single_queue_flush (mq, sq, TRUE); + goto done; + + case GST_EVENT_FLUSH_STOP: + GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", + sq->id); + + res = gst_pad_push_event (sq->srcpad, event); + + gst_single_queue_flush (mq, sq, FALSE); + goto done; + case GST_EVENT_NEWSEGMENT: + /* take ref because the queue will take ownership and we need the event + * afterwards to update the segment */ + sref = gst_event_ref (event); + break; + + default: + if (!(GST_EVENT_IS_SERIALIZED (event))) { + res = gst_pad_push_event (sq->srcpad, event); + goto done; + } + break; + } + + /* Get an unique incrementing id */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + curid = mq->counter++; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + item = gst_multi_queue_item_new ((GstMiniObject *) event, curid); + + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Enqueuing event %p of type %s with id %d", + sq->id, event, GST_EVENT_TYPE_NAME (event), curid); + + if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) + goto flushing; + + /* mark EOS when we received one, we must do that after putting the + * buffer in the queue because EOS marks the buffer as filled. No need to take + * a lock, the _check_full happens from this thread only, right before pushing + * into dataqueue. */ + switch (type) { + case GST_EVENT_EOS: + sq->is_eos = TRUE; + break; + case GST_EVENT_NEWSEGMENT: + apply_segment (mq, sq, sref, &sq->sink_segment); + gst_event_unref (sref); + break; + default: + break; + } +done: + gst_object_unref (mq); + return res; + +flushing: + { + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", + sq->id, gst_flow_get_name (sq->srcresult)); + if (sref) + gst_event_unref (sref); + gst_multi_queue_item_destroy (item); + goto done; + } +} + +static GstCaps * +gst_multi_queue_getcaps (GstPad * pad) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + GstPad *otherpad; + GstCaps *result; + + otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad; + + GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad"); + + result = gst_pad_peer_get_caps (otherpad); + if (result == NULL) + result = gst_caps_new_any (); + + return result; +} + +static GstFlowReturn +gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, + GstCaps * caps, GstBuffer ** buf) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + + return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf); +} + +static gboolean +gst_multi_queue_src_activate_push (GstPad * pad, gboolean active) +{ + GstMultiQueue *mq; + GstSingleQueue *sq; + gboolean result = FALSE; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = sq->mqueue; + + GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id); + + if (active) { + result = gst_single_queue_flush (mq, sq, FALSE); + } else { + result = gst_single_queue_flush (mq, sq, TRUE); + /* make sure streaming finishes */ + result |= gst_pad_stop_task (pad); + } + return result; +} + +static gboolean +gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps) +{ + return TRUE; +} + +static gboolean +gst_multi_queue_src_event (GstPad * pad, GstEvent * event) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + + return gst_pad_push_event (sq->sinkpad, event); +} + +static gboolean +gst_multi_queue_src_query (GstPad * pad, GstQuery * query) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + GstPad *peerpad; + gboolean res; + + /* FIXME, Handle position offset depending on queue size */ + + /* default handling */ + if (!(peerpad = gst_pad_get_peer (sq->sinkpad))) + goto no_peer; + + res = gst_pad_query (peerpad, query); + + gst_object_unref (peerpad); + + return res; + + /* ERRORS */ +no_peer: + { + GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer"); + return FALSE; + } +} + +/* + * Next-non-linked functions + */ + +/* WITH LOCK TAKEN */ +static void +wake_up_next_non_linked (GstMultiQueue * mq) +{ + GList *tmp; + + /* maybe no-one is waiting */ + if (mq->numwaiting < 1) + return; + + /* Else figure out which singlequeue(s) need waking up */ + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + if (sq->nextid != 0 && sq->nextid <= mq->highid) { + GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); + g_cond_signal (sq->turn); + } + } + } +} + +/* WITH LOCK TAKEN */ +static void +compute_high_id (GstMultiQueue * mq) +{ + /* The high-id is either the highest id among the linked pads, or if all + * pads are not-linked, it's the lowest not-linked pad */ + GList *tmp; + guint32 lowest = G_MAXUINT32; + guint32 highid = G_MAXUINT32; + + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", + sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult)); + + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + /* No need to consider queues which are not waiting */ + if (sq->nextid == 0) { + GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); + continue; + } + + if (sq->nextid < lowest) + lowest = sq->nextid; + } else if (sq->srcresult != GST_FLOW_UNEXPECTED) { + /* If we don't have a global highid, or the global highid is lower than + * this single queue's last outputted id, store the queue's one, + * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */ + if ((highid == G_MAXUINT32) || (sq->oldid > highid)) + highid = sq->oldid; + } + } + + if (highid == G_MAXUINT32 || lowest < highid) + mq->highid = lowest; + else + mq->highid = highid; + + GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid, + lowest); +} + +#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ + (sq->max_size.format) <= (value)) + +/* + * GstSingleQueue functions + */ +static void +single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) +{ + GstMultiQueue *mq = sq->mqueue; + GList *tmp; + GstDataQueueSize size; + gboolean filled = FALSE; + + gst_data_queue_get_level (sq->queue, &size); + + GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; + GstDataQueueSize ssize; + + GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); + + if (gst_data_queue_is_empty (ssq->queue)) { + GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); + if (IS_FILLED (visible, size.visible)) { + sq->max_size.visible++; + GST_DEBUG_OBJECT (mq, + "Another queue is empty, bumping single queue %d max visible to %d", + sq->id, sq->max_size.visible); + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + goto beach; + } + /* check if we reached the hard time/bytes limits */ + gst_data_queue_get_level (ssq->queue, &ssize); + + GST_DEBUG_OBJECT (mq, + "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" + G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, + ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); + + /* if this queue is filled completely we must signal overrun */ + if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) { + GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); + filled = TRUE; + } + } + /* no queues were empty */ + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + /* Overrun is always forwarded, since this is blocking the upstream element */ + if (filled) { + GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); + } + +beach: + return; +} + +static void +single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) +{ + gboolean empty = TRUE; + GstMultiQueue *mq = sq->mqueue; + GList *tmp; + + GST_LOG_OBJECT (mq, + "Single Queue %d is empty, Checking other single queues", sq->id); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + if (gst_data_queue_is_full (sq->queue)) { + GstDataQueueSize size; + + gst_data_queue_get_level (sq->queue, &size); + if (IS_FILLED (visible, size.visible)) { + sq->max_size.visible++; + GST_DEBUG_OBJECT (mq, + "queue %d is filled, bumping its max visible to %d", sq->id, + sq->max_size.visible); + gst_data_queue_limits_changed (sq->queue); + } + } + if (!gst_data_queue_is_empty (sq->queue)) + empty = FALSE; + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + if (empty) { + GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0); + } +} + +static gboolean +single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, + guint64 time, GstSingleQueue * sq) +{ + gboolean res; + + GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT + "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes, + sq->max_size.bytes, sq->cur_time, sq->max_size.time); + + /* we are always filled on EOS */ + if (sq->is_eos) + return TRUE; + + /* we never go past the max visible items */ + if (IS_FILLED (visible, visible)) + return TRUE; + + if (sq->cur_time != 0) { + /* if we have valid time in the queue, check */ + res = IS_FILLED (time, sq->cur_time); + } else { + /* no valid time, check bytes */ + res = IS_FILLED (bytes, bytes); + } + return res; +} + +static void +gst_single_queue_free (GstSingleQueue * sq) +{ + /* DRAIN QUEUE */ + gst_data_queue_flush (sq->queue); + g_object_unref (sq->queue); + g_cond_free (sq->turn); + g_free (sq); +} + +static GstSingleQueue * +gst_single_queue_new (GstMultiQueue * mqueue) +{ + GstSingleQueue *sq; + gchar *tmp; + + sq = g_new0 (GstSingleQueue, 1); + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + sq->id = mqueue->nbqueues++; + + /* copy over max_size and extra_size so we don't need to take the lock + * any longer when checking if the queue is full. */ + sq->max_size.visible = mqueue->max_size.visible; + sq->max_size.bytes = mqueue->max_size.bytes; + sq->max_size.time = mqueue->max_size.time; + + sq->extra_size.visible = mqueue->extra_size.visible; + sq->extra_size.bytes = mqueue->extra_size.bytes; + sq->extra_size.time = mqueue->extra_size.time; + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + + GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); + + sq->mqueue = mqueue; + sq->srcresult = GST_FLOW_WRONG_STATE; + sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) + single_queue_check_full, sq); + sq->is_eos = FALSE; + gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); + gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); + + sq->nextid = 0; + sq->oldid = 0; + sq->turn = g_cond_new (); + + /* attach to underrun/overrun signals to handle non-starvation */ + g_signal_connect (G_OBJECT (sq->queue), "full", + G_CALLBACK (single_queue_overrun_cb), sq); + g_signal_connect (G_OBJECT (sq->queue), "empty", + G_CALLBACK (single_queue_underrun_cb), sq); + + tmp = g_strdup_printf ("sink%d", sq->id); + sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); + g_free (tmp); + + gst_pad_set_chain_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_chain)); + gst_pad_set_activatepush_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push)); + gst_pad_set_event_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event)); + gst_pad_set_getcaps_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps)); + gst_pad_set_bufferalloc_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc)); + gst_pad_set_internal_link_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); + + tmp = g_strdup_printf ("src%d", sq->id); + sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp); + g_free (tmp); + + gst_pad_set_activatepush_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push)); + gst_pad_set_acceptcaps_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps)); + gst_pad_set_getcaps_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps)); + gst_pad_set_event_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_src_event)); + gst_pad_set_query_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_src_query)); + gst_pad_set_internal_link_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); + + gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); + gst_pad_set_element_private (sq->srcpad, (gpointer) sq); + + gst_pad_set_active (sq->srcpad, TRUE); + gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); + + gst_pad_set_active (sq->sinkpad, TRUE); + gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); + + GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", + sq->id); + + return sq; +} + +static gboolean +plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "multiqueue", GST_RANK_NONE, + GST_TYPE_MULTI_QUEUE); +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "multiqueue", + "multiqueue", plugin_init, VERSION, GST_LICENSE, + GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) + diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/multiqueue/gstmultiqueue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/multiqueue/gstmultiqueue.h Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,86 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstmultiqueue.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_MULTI_QUEUE_H__ +#define __GST_MULTI_QUEUE_H__ + +#include +#include "gstdataqueue.h" + +G_BEGIN_DECLS + +#define GST_TYPE_MULTI_QUEUE \ + (gst_multi_queue_get_type()) +#define GST_MULTI_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_QUEUE,GstMultiQueue)) +#define GST_MULTI_QUEUE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_QUEUE,GstMultiQueueClass)) +#define GST_IS_MULTI_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_QUEUE)) +#define GST_IS_MULTI_QUEUE_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_QUEUE)) + +typedef struct _GstMultiQueue GstMultiQueue; +typedef struct _GstMultiQueueClass GstMultiQueueClass; + +/** + * GstMultiQueue: + * + * Opaque #GstMultiQueue structure. + */ +struct _GstMultiQueue { + GstElement element; + + /* number of queues */ + guint nbqueues; + + /* The list of individual queues */ + GList *queues; + + GstDataQueueSize max_size, extra_size; + + guint32 counter; /* incoming object counter */ + guint32 highid; /* contains highest id of last outputted object */ + + GMutex * qlock; /* Global queue lock (vs object lock or individual */ + /* queues lock). Protects nbqueues, queues, global */ + /* GstMultiQueueSize, counter and highid */ + + gint nextnotlinked; /* ID of the next queue not linked (-1 : none) */ + + gint numwaiting; /* number of not-linked pads waiting */ +}; + +struct _GstMultiQueueClass { + GstElementClass parent_class; + + /* signals emitted when ALL queues are either full or empty */ + void (*underrun) (GstMultiQueue *queue); + void (*overrun) (GstMultiQueue *queue); +}; + +GType gst_multi_queue_get_type (void); + +G_END_DECLS + + +#endif /* __GST_MULTI_QUEUE_H__ */ diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/playbinmaemo/Makefile.am --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/playbinmaemo/Makefile.am Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,27 @@ +plugin_LTLIBRARIES = libgstplaybinmaemo.la + +libgstplaybinmaemo_la_SOURCES = \ + gstplaybinmaemo.c + +libgstplaybinmaemo_la_CFLAGS = \ + $(GST_CFLAGS) \ + $(GST_BASE_CFLAGS) \ + $(GST_PLUGINS_BASE_CFLAGS) \ + $(X11_CFLAGS) + +libgstplaybinmaemo_la_LIBADD = \ + $(GST_LIBS_LIBS) \ + $(X11_LIBS) + -lgstinterfaces-0.10 + +libgstplaybinmaemo_la_LDFLAGS = \ + $(GST_LIBS) \ + $(GST_PLUGIN_LDFLAGS) \ + $(GST_BASE_LIBS) \ + $(GST_PLUGINS_BASE_LIBS) \ + -lgstinterfaces-0.10 \ + $(X11_LIBS) + +noinst_HEADERS = \ + gstplaybinmaemo.h + diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/playbinmaemo/gstplaybinmaemo.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/playbinmaemo/gstplaybinmaemo.c Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,841 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include +#include +#include +//#include +#include "gstplaybinmaemo.h" + + +GST_DEBUG_CATEGORY_STATIC (gst_play_bin_maemo_debug); +#define GST_CAT_DEFAULT gst_play_bin_maemo_debug + +#define DEFAULT_VOLUME 10 +#define DEFAULT_XID -1 + +/* props */ +enum +{ + ARG_0, + ARG_URI, + ARG_QUEUE_SIZE, + ARG_QUEUE_MIN_THRESHOLD, + ARG_SOURCE, + ARG_VOLUME, + ARG_XID +}; + +static const GstElementDetails gst_play_bin_maemo_details = + GST_ELEMENT_DETAILS("Nuv demuxer", + "Generic/Bin/Player", + "Autoplug and play media from an uri used on maemo plataform", + "Renato Araujo Oliveira Filho "); + +static void gst_play_bin_maemo_dispose (GObject * object); +static void gst_play_bin_maemo_finalize (GObject * object); +static void gst_play_bin_maemo_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * spec); +static void gst_play_bin_maemo_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * spec); +static GstStateChangeReturn + gst_play_bin_maemo_change_state (GstElement *element, + GstStateChange transition); +static gboolean factory_filter_sinks (GstPluginFeature *feature, + GstPlayBinMaemo *pbm); +static gint compare_ranks (GstPluginFeature * f1, + GstPluginFeature * f2); +static GList *find_compatibles (GstPlayBinMaemo *pbm, + const GstCaps *caps); +static GstPad *find_sink_pad (GstElement * element); +static void update_volume (GstPlayBinMaemo *pbm); +static void update_xid (GstPlayBinMaemo *pbm); +static void new_decoded_pad_cb (GstElement *object, + GstPad* pad, + gboolean arg, + gpointer user_data); +static void unknown_type_cb (GstElement *object, + GstPad *pad, + GstCaps *casp, + gpointer user_data); +static gboolean autoplug_continue_cb (GstElement* object, + GstCaps* caps, + gpointer user_data); +static void decode_new_pad_cb (GstElement *element, + GObject *new_pad, + gpointer user_data); +static void queue_underrun_cb (GstElement* queue, + gpointer user_data); +static void queue_sink_underrun_cb (GstElement* queue, + gpointer user_data); +static void queue_sink_overrun_cb (GstElement* queue, + gpointer user_data); + + + + + +GST_BOILERPLATE(GstPlayBinMaemo, gst_play_bin_maemo, GstPipeline, GST_TYPE_PIPELINE) + + +static void +gst_play_bin_maemo_base_init (gpointer klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS(klass); + + gst_element_class_set_details (element_class, &gst_play_bin_maemo_details); +} + +static void +gst_play_bin_maemo_class_init (GstPlayBinMaemoClass * klass) +{ + GObjectClass *gobject_klass; + GstElementClass *gstelement_klass; + GstBinClass *gstbin_klass; + + gobject_klass = (GObjectClass *) klass; + gstelement_klass = (GstElementClass *) klass; + gstbin_klass = (GstBinClass *) klass; + + parent_class = g_type_class_peek_parent (klass); + + gobject_klass->set_property = gst_play_bin_maemo_set_property; + gobject_klass->get_property = gst_play_bin_maemo_get_property; + + g_object_class_install_property (gobject_klass, ARG_URI, + g_param_spec_string ("uri", "URI", "URI of the media to play", + NULL, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_klass, ARG_VOLUME, + g_param_spec_uint ("volume", "Audio volume", "volume", + 0, 10, (guint) DEFAULT_VOLUME, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_klass, ARG_XID, + g_param_spec_long ("xid", "xid", "X windown ID", + -1, G_MAXLONG, DEFAULT_XID, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_klass, ARG_SOURCE, + g_param_spec_object ("source", "Source", "Source element", + GST_TYPE_ELEMENT, G_PARAM_READABLE)); + + GST_DEBUG_CATEGORY_INIT (gst_play_bin_maemo_debug, "playbinmaemo", 0, + "playbinmaemo"); + + gobject_klass->dispose = GST_DEBUG_FUNCPTR (gst_play_bin_maemo_dispose); + gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_play_bin_maemo_finalize); + + gstelement_klass->change_state = + GST_DEBUG_FUNCPTR (gst_play_bin_maemo_change_state); +} + +static void +gst_play_bin_maemo_init (GstPlayBinMaemo * play_bin_maemo, GstPlayBinMaemoClass *class) +{ + GList *factories; + + play_bin_maemo->uri = NULL; + play_bin_maemo->source = NULL; + + play_bin_maemo->volume = DEFAULT_VOLUME * 65535 / 10; + play_bin_maemo->xid = DEFAULT_XID; + + factories = gst_default_registry_feature_filter ((GstPluginFeatureFilter) factory_filter_sinks, + FALSE, play_bin_maemo); + + play_bin_maemo->factories = g_list_sort (factories, (GCompareFunc) compare_ranks); +} + +static void +gst_play_bin_maemo_dispose (GObject * object) +{ + GstPlayBinMaemo *play_bin_maemo; + + play_bin_maemo = GST_PLAY_BIN_MAEMO (object); + g_free (play_bin_maemo->uri); + play_bin_maemo->uri = NULL; + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_play_bin_maemo_finalize (GObject * object) +{ + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static gboolean +array_has_value (const gchar * values[], const gchar * value) +{ + gint i; + + for (i = 0; values[i]; i++) { + if (g_str_has_prefix (value, values[i])) + return TRUE; + } + return FALSE; +} + +/* list of URIs that we consider to be streams and that need buffering. + * We have no mechanism yet to figure this out with a query. */ +static const gchar *stream_uris[] = { "http://", "mms://", "mmsh://", + "mmsu://", "mmst://", NULL +}; + +/* blacklisted URIs, we know they will always fail. */ +static const gchar *blacklisted_uris[] = { NULL }; + +/* mime types that we don't consider to be media types */ +static const gchar *no_media_mimes[] = { + "application/x-executable", "application/x-bzip", "application/x-gzip", + "application/zip", "application/x-compress", NULL +}; + +/* mime types we consider raw media */ +static const gchar *raw_mimes[] = { + "audio/x-raw", "video/x-raw", NULL +}; + +#define IS_STREAM_URI(uri) (array_has_value (stream_uris, uri)) +#define IS_BLACKLISTED_URI(uri) (array_has_value (blacklisted_uris, uri)) +#define IS_NO_MEDIA_MIME(mime) (array_has_value (no_media_mimes, mime)) +#define IS_RAW_MIME(mime) (array_has_value (raw_mimes, mime)) + +/* + * Generate and configure a source element. + */ +static GstElement * +gen_source_element (GstPlayBinMaemo * play_bin_maemo) +{ + GstElement *source; + + if (!play_bin_maemo->uri) + goto no_uri; + + if (!gst_uri_is_valid (play_bin_maemo->uri)) + goto invalid_uri; + + if (IS_BLACKLISTED_URI (play_bin_maemo->uri)) + goto uri_blacklisted; + + source = gst_element_make_from_uri (GST_URI_SRC, play_bin_maemo->uri, + "source"); + if (!source) + goto no_source; + + play_bin_maemo->is_stream = IS_STREAM_URI (play_bin_maemo->uri); + + /* make HTTP sources send extra headers so we get icecast + * metadata in case the stream is an icecast stream */ + if (!strncmp (play_bin_maemo->uri, "http://", 7) && + g_object_class_find_property (G_OBJECT_GET_CLASS (source), + "iradio-mode")) { + g_object_set (source, "iradio-mode", TRUE, NULL); + } + return source; + + /* ERRORS */ +no_uri: + { + GST_ELEMENT_ERROR (play_bin_maemo, RESOURCE, NOT_FOUND, + (_("No URI specified to play from.")), (NULL)); + return NULL; + } +invalid_uri: + { + GST_ELEMENT_ERROR (play_bin_maemo, RESOURCE, NOT_FOUND, + (_("Invalid URI \"%s\"."), play_bin_maemo->uri), (NULL)); + return NULL; + } +uri_blacklisted: + { + GST_ELEMENT_ERROR (play_bin_maemo, RESOURCE, FAILED, + (_("RTSP streams cannot be played yet.")), (NULL)); + return NULL; + } +no_source: + { + gchar *prot = gst_uri_get_protocol (play_bin_maemo->uri); + + /* whoops, could not create the source element, dig a little deeper to + * figure out what might be wrong. */ + if (prot) { + gchar *desc; + + /* + gst_element_post_message (GST_ELEMENT (play_bin_maemo), + gst_missing_uri_source_message_new (GST_ELEMENT (play_bin_maemo), + prot)); + + desc = gst_pb_utils_get_source_description (prot); + GST_ELEMENT_ERROR (play_bin_maemo, CORE, MISSING_PLUGIN, + (_("A %s plugin is required to play this stream, but not installed."), + desc), ("No URI handler for %s", prot)); + */ + g_free (desc); + g_free (prot); + } else + goto invalid_uri; + + return NULL; + } +} + +static void +remove_source (GstPlayBinMaemo *pbm) +{ + GstElement *source = pbm->source; + + if (source) { + GST_DEBUG_OBJECT (pbm, "removing old src element"); + gst_element_set_state (source, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (pbm), source); + pbm->source = NULL; + } +} + +static void +remove_decoders (GstPlayBinMaemo *pbm) +{ + if (pbm->queue != NULL) { + gst_element_set_state (pbm->queue, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (pbm), pbm->queue); + pbm->queue = NULL; + } + + if (pbm->decoder != NULL) { + gst_element_set_state (pbm->decoder, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (pbm), pbm->decoder); + pbm->decoder = NULL; + } +} + +static void +remove_sinks (GstPlayBinMaemo *pbm) +{ + GSList *walk; + + for(walk=pbm->sinks; walk != NULL; walk = walk->next) { + GstElement *element = (GstElement *) walk->data; + + gst_element_set_state (element, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (pbm), element); + } + + g_slist_free (pbm->sinks); + pbm->sinks = NULL; +} + +static void +prepare_elements (GstPlayBinMaemo *pbm) +{ + if (pbm->decoder == NULL) { + pbm->decoder = gst_element_factory_make ("decodebin2", "decode"); + gst_bin_add (GST_BIN (pbm), pbm->decoder); + g_signal_connect (G_OBJECT (pbm->decoder), + "autoplug-continue", + G_CALLBACK (autoplug_continue_cb), + pbm); + g_signal_connect (G_OBJECT (pbm->decoder), + "unknown-type", + G_CALLBACK (unknown_type_cb), + pbm); + g_signal_connect (G_OBJECT (pbm->decoder), + "new-decoded-pad", + G_CALLBACK (new_decoded_pad_cb), + pbm); + } + + if (pbm->queue == NULL) { + pbm->queue = gst_element_factory_make ("queue", NULL); + gst_bin_add (GST_BIN (pbm), pbm->queue); + } + + if (gst_element_link_many (pbm->source, pbm->queue, pbm->decoder, NULL) == FALSE) { + g_warning ("FAIL TO LINK SRC WITH DECODEBIN2"); + } +} + +static gboolean +setup_source (GstPlayBinMaemo *pbm) +{ + if (!pbm->need_rebuild) + return TRUE; + + GST_DEBUG_OBJECT (pbm, "setup source"); + + /* delete old src */ + remove_source (pbm); + + /* create and configure an element that can handle the uri */ + if (!(pbm->source = gen_source_element (pbm))) + goto no_source; + + + gst_bin_add (GST_BIN_CAST (pbm), pbm->source); + + remove_decoders (pbm); + + remove_sinks (pbm); + +#if 0 + if (verify_src_have_sink (pbm)) { + /* source can be linked with sinks directly */ + return TRUE; + } +#endif + + prepare_elements (pbm); + + return TRUE; + +no_source: + return FALSE; +} + +static void +gst_play_bin_maemo_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GstPlayBinMaemo *play_bin_maemo; + + g_return_if_fail (GST_IS_PLAY_BIN_MAEMO (object)); + + play_bin_maemo = GST_PLAY_BIN_MAEMO (object); + + switch (prop_id) { + case ARG_URI: + { + const gchar *uri = g_value_get_string (value); + + if (uri == NULL) { + g_warning ("cannot set NULL uri"); + return; + } + /* if we have no previous uri, or the new uri is different from the + * old one, replug */ + if (play_bin_maemo->uri == NULL || strcmp (play_bin_maemo->uri, uri) != 0) { + g_free (play_bin_maemo->uri); + play_bin_maemo->uri = g_strdup (uri); + + GST_DEBUG ("setting new uri to %s", uri); + + play_bin_maemo->need_rebuild = TRUE; + } + break; + } + case ARG_VOLUME: + { + guint volume; + volume = g_value_get_uint (value); + if (volume != 0) { + volume = (guint) (65535 * volume / 10); + } + + if (play_bin_maemo->volume != volume) { + play_bin_maemo->volume = volume; + update_volume (play_bin_maemo); + } + break; + } + case ARG_XID: + { + long xid; + xid = g_value_get_long (value); + if (play_bin_maemo->xid != xid) { + play_bin_maemo->xid = xid; + update_xid (play_bin_maemo); + } + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_play_bin_maemo_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstPlayBinMaemo *play_bin_maemo; + + g_return_if_fail (GST_IS_PLAY_BIN_MAEMO (object)); + + play_bin_maemo = GST_PLAY_BIN_MAEMO (object); + + switch (prop_id) { + case ARG_URI: + g_value_set_string (value, play_bin_maemo->uri); + break; + case ARG_SOURCE: + g_value_set_object (value, play_bin_maemo->source); + break; + case ARG_VOLUME: + g_value_set_uint (value, play_bin_maemo->volume); + break; + case ARG_XID: + g_value_set_long (value, play_bin_maemo->xid); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_play_bin_maemo_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstPlayBinMaemo *play_bin_maemo; + + play_bin_maemo = GST_PLAY_BIN_MAEMO (element); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (!setup_source (play_bin_maemo)) + goto source_failed; + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (ret == GST_STATE_CHANGE_FAILURE) { + play_bin_maemo->need_rebuild = TRUE; + return GST_STATE_CHANGE_FAILURE; + } + break; + /* clean-up in both cases, READY=>NULL clean-up is if there was an error */ + case GST_STATE_CHANGE_PAUSED_TO_READY: + case GST_STATE_CHANGE_READY_TO_NULL: + play_bin_maemo->need_rebuild = TRUE; + remove_decoders (play_bin_maemo); + remove_source (play_bin_maemo); + break; + default: + break; + } + return ret; + + /* ERRORS */ +source_failed: + { + play_bin_maemo->need_rebuild = TRUE; + + return GST_STATE_CHANGE_FAILURE; + } +} + +static gboolean +factory_filter_sinks (GstPluginFeature *feature, + GstPlayBinMaemo *pbm) +{ + guint rank; + const gchar *klass; + + if (!GST_IS_ELEMENT_FACTORY (feature)) + return FALSE; + + klass = gst_element_factory_get_klass (GST_ELEMENT_FACTORY (feature)); + + if ((strstr (klass, "Sink/Video") == NULL) && (strstr (klass, "Sink/Audio") == NULL)) + return FALSE; + + g_debug ("Fitered: %s", gst_element_factory_get_longname ((GST_ELEMENT_FACTORY (feature)))); + rank = gst_plugin_feature_get_rank (feature); + if (rank < GST_RANK_MARGINAL) + return FALSE; + + return TRUE; +} + +static gint +compare_ranks (GstPluginFeature * f1, GstPluginFeature * f2) +{ + gint diff; + const gchar *rname1, *rname2; + + diff = gst_plugin_feature_get_rank (f2) - gst_plugin_feature_get_rank (f1); + if (diff != 0) + return diff; + + rname1 = gst_plugin_feature_get_name (f1); + rname2 = gst_plugin_feature_get_name (f2); + + diff = strcmp (rname2, rname1); + + return diff; +} + + +static GList * +find_compatibles (GstPlayBinMaemo *pbm, const GstCaps *caps) +{ + GList *factories; + GList *to_try = NULL; + + /* loop over all the factories */ + for (factories = pbm->factories; factories; factories = g_list_next (factories)) { + GstElementFactory *factory = GST_ELEMENT_FACTORY (factories->data); + const GList *templates; + GList *walk; + + /* get the templates from the element factory */ + templates = gst_element_factory_get_static_pad_templates (factory); + for (walk = (GList *) templates; walk; walk = g_list_next (walk)) { + GstStaticPadTemplate *templ = walk->data; + + /* we only care about the sink templates */ + if (templ->direction == GST_PAD_SINK) { + GstCaps *intersect; + GstCaps *tmpl_caps; + + /* try to intersect the caps with the caps of the template */ + tmpl_caps = gst_static_caps_get (&templ->static_caps); + + intersect = gst_caps_intersect (caps, tmpl_caps); + gst_caps_unref (tmpl_caps); + + /* check if the intersection is empty */ + if (!gst_caps_is_empty (intersect)) { + /* non empty intersection, we can use this element */ + to_try = g_list_prepend (to_try, factory); + gst_caps_unref (intersect); + break; + } + gst_caps_unref (intersect); + } + } + } + to_try = g_list_reverse (to_try); + + return to_try; +} + + +static gboolean +autoplug_continue_cb (GstElement* object, + GstCaps* caps, + gpointer user_data) +{ + GList *comp = NULL; + gboolean ret = TRUE; + + comp = find_compatibles (GST_PLAY_BIN_MAEMO (user_data), caps); + if (comp != NULL) { + g_list_free (comp); + ret = FALSE; + } + + return ret; +} + +static void +unknown_type_cb (GstElement *object, + GstPad *pad, + GstCaps *caps, + gpointer user_data) +{ + g_debug ("unknown_type_cb: %s", gst_caps_to_string (caps)); +} + +static GstPad * +find_sink_pad (GstElement * element) +{ + GstIterator *it; + GstPad *pad = NULL; + gpointer point; + + it = gst_element_iterate_sink_pads (element); + + if ((gst_iterator_next (it, &point)) == GST_ITERATOR_OK) + pad = (GstPad *) point; + + gst_iterator_free (it); + + return pad; +} + +static void +new_decoded_pad_cb (GstElement *object, + GstPad* pad, + gboolean arg, + gpointer user_data) +{ + GList *comp = NULL; + GList *walk; + GstCaps *caps; + gboolean linked; + GstPlayBinMaemo *pbm; + + pbm = GST_PLAY_BIN_MAEMO (user_data); + caps = gst_pad_get_caps (pad); + + g_debug ("new_decoded_pad_cb: %s", gst_caps_to_string (caps)); + + comp = find_compatibles (GST_PLAY_BIN_MAEMO (user_data), caps); + + + if (comp == NULL) { + g_warning ("flow error: dont find comaptible"); + return; + } + + GST_PAD_STREAM_LOCK (pad); + + linked = FALSE; + for (walk=comp; walk != NULL; walk = walk->next) { + GstElementFactory *factory = (GstElementFactory *) walk->data; + GstElement *element; + GstPad *sinkpad; + + if ((element = gst_element_factory_create (factory, NULL)) == NULL) { + GST_WARNING_OBJECT (pbm, "Could not create an element from %s", + gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (factory))); + continue; + } + + if (strstr (gst_element_factory_get_klass (factory), "Sink/Video") != NULL) { + pbm->sink_video = element; + update_xid (pbm); + } else if (strstr (gst_element_factory_get_klass (factory), "Sink/Audio") != NULL) { + pbm->volume_element = element; + update_volume (pbm); + } + + if (!(gst_bin_add (GST_BIN (user_data), element))) { + GST_WARNING_OBJECT (pbm, "Couldn't set %s to READY", + GST_ELEMENT_NAME (element)); + gst_object_unref (element); + continue; + } + + if ((gst_element_set_state (element, GST_STATE_READY)) + == GST_STATE_CHANGE_FAILURE) { + gst_element_set_state (element, GST_STATE_NULL); + gst_object_unref (sinkpad); + gst_bin_remove (GST_BIN (user_data), element); + continue; + } + + if (!(sinkpad = find_sink_pad (element))) { + GST_WARNING_OBJECT (pbm, "Element %s doesn't have a sink pad", GST_ELEMENT_NAME (element)); + gst_object_unref (element); + continue; + } + + + if ((gst_pad_link (pad, sinkpad)) != GST_PAD_LINK_OK) { + GST_WARNING_OBJECT (pbm, "Link failed on pad %s:%s", + GST_DEBUG_PAD_NAME (sinkpad)); + gst_element_set_state (element, GST_STATE_NULL); + gst_object_unref (sinkpad); + gst_bin_remove (GST_BIN (user_data), element); + continue; + } + + gst_object_unref (sinkpad); + + if ((gst_element_set_state (element, GST_STATE_PAUSED)) == GST_STATE_CHANGE_FAILURE) { + gst_element_set_state (element, GST_STATE_NULL); + gst_bin_remove (GST_BIN (user_data), element); + continue; + } + + + pbm->sinks = g_slist_append (pbm->sinks, element); + linked = TRUE; + break; + } + + g_list_free (comp); + if (linked == FALSE) { + g_warning ("GstFlow ERROR"); + } + GST_PAD_STREAM_UNLOCK (pad); +} + +static void +update_volume (GstPlayBinMaemo *pbm) +{ + if (pbm->volume_element != NULL) { + if (pbm->volume > 0) { + g_object_set (G_OBJECT (pbm->volume_element), + "volume", pbm->volume, + NULL); + } else { + g_object_set (G_OBJECT (pbm->volume_element), + "mute", TRUE, + NULL); + } + } +} + +static void +update_xid (GstPlayBinMaemo *pbm) +{ + if ((pbm->sink_video != NULL) && + (pbm->xid != -1) && + (GST_IS_X_OVERLAY (pbm->sink_video))) { + XGCValues values; + Display *disp; + g_object_set (G_OBJECT (pbm->sink_video), + "force-aspect-ratio", TRUE, NULL); + g_debug ("Update XID to %ld", pbm->xid); + + gst_x_overlay_set_xwindow_id (GST_X_OVERLAY (pbm->sink_video), + pbm->xid); + } +} + +static gboolean +plugin_init(GstPlugin * plugin) +{ +#ifdef ENABLE_NLS + setlocale(LC_ALL, ""); + bindtextdomain(GETTEXT_PACKAGE, LOCALEDIR); +#endif /* ENABLE_NLS */ + + if (!gst_element_register(plugin, "playbinmaemo", GST_RANK_SECONDARY, + GST_TYPE_PLAY_BIN_MAEMO)) { + return FALSE; + } + + return TRUE; +} + +GST_PLUGIN_DEFINE(GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "playbinmaemo", + "Demuxes and muxes audio and video", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, + GST_PACKAGE_ORIGIN) diff -r a4529d0f8ede -r e42706ada231 gst-gmyth/playbinmaemo/gstplaybinmaemo.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-gmyth/playbinmaemo/gstplaybinmaemo.h Sat Jul 14 17:20:54 2007 +0100 @@ -0,0 +1,73 @@ +/* GStreamer + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_PLAYBINMAEMO_H__ +#define __GST_PLAYBINMAEMO_H__ + +#include + +G_BEGIN_DECLS + +#define GST_TYPE_PLAY_BIN_MAEMO \ + (gst_play_bin_maemo_get_type()) +#define GST_PLAY_BIN_MAEMO(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PLAY_BIN_MAEMO,GstPlayBinMaemo)) +#define GST_PLAY_BIN_MAEMO_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PLAY_BIN_MAEMO,GstPlayBinMaemoClass)) +#define GST_IS_PLAY_BIN_MAEMO(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PLAY_BIN_MAEMO)) +#define GST_IS_PLAY_BIN_MAEMO_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PLAY_BIN_MAEMO)) +#define GST_PLAY_BIN_MAEMO_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PLAY_BIN_MAEMO,GstPlayBinMaemoClass)) + +typedef struct _GstPlayBinMaemo GstPlayBinMaemo; +typedef struct _GstPlayBinMaemoClass GstPlayBinMaemoClass; + +struct _GstPlayBinMaemo { + GstPipeline pipeline; + + /* properties */ + guint64 queue_size; + guint64 queue_min_threshold; + gboolean is_stream; + glong xid; + guint volume; + + /* currently loaded media */ + gboolean need_rebuild; + gchar *uri; + GstElement *source; + GstElement *decoder; + GstElement *queue; + GstElement *volume_element; + GstElement *sink_video; + GSList *sinks; + GList *factories; +}; + +struct _GstPlayBinMaemoClass { + GstPipelineClass parent_class; +}; + +GType gst_play_bin_maemo_get_type (void); + +G_END_DECLS + +#endif /* __GST_PLAYBINMAEMO_H__ */ +