gmyth-stream/gmemcoder/src/gmencoder.c
author morphbr
Wed Jul 04 08:33:19 2007 +0100 (2007-07-04)
branchtrunk
changeset 773 361cb0686ff7
parent 768 b405295259f3
child 777 4127375c2a03
permissions -rw-r--r--
[svn r779] - Added serve_file_info method
     1 #ifdef HAVE_CONFIG_H
     2 #include "config.h"
     3 #endif
     4 
     5 #include <sys/stat.h>
     6 #include <fcntl.h>
     7 #include <unistd.h>
     8 #include <glib.h>
     9 #include <gst/gst.h>
    10 #include <string.h>
    11 #include <sys/types.h>
    12 #include <sys/socket.h>
    13 
    14 #include "gmencoder.h"
    15 
    16 #define G_MENCODER_GET_PRIVATE(obj) \
    17     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), G_TYPE_MENCODER, GMencoderPrivate))
    18 
    19 // #define SUPPORT_MULT_INPUT 0
    20 #define USE_MANUAL_SINK
    21 #define BUFFER_SIZE (1024 * 64)
    22 
    23 typedef struct _GMencoderPrivate GMencoderPrivate;
    24 typedef struct _SetupInfo SetupInfo;
    25 
    26 struct _SetupInfo {
    27     gchar          *video_encode;
    28     gchar          *mux_name;
    29     gchar         **video_encode_prop;
    30     gdouble         video_fps;
    31     gdouble         video_rate;
    32     guint           video_width;
    33     guint           video_height;
    34     gchar          *audio_encode;
    35     gchar         **audio_encode_prop;
    36     guint           audio_rate;
    37 };
    38 
    39 
    40 struct _GMencoderPrivate {
    41     GstElement     *pipe;
    42     GstElement     *abin;
    43     GstElement     *vbin;
    44     GstElement     *sink;
    45     GstElement     *src;
    46     gboolean        ready;
    47     SetupInfo      *info;
    48     GstClockTime    videot;
    49     GstClockTime    audiot;
    50     gint            fd;
    51     gint            sources;
    52     gint            tick_id;
    53     gint64          duration;
    54     gboolean        send_chunked;
    55 #ifdef USE_MANUAL_SINK
    56     GByteArray	    *queue;
    57 #endif
    58 };
    59 
    60 enum {
    61     PAUSED,
    62     PLAYING,
    63     STOPED,
    64     EOS,
    65     ERROR,
    66     LAST_SIGNAL
    67 };
    68 
    69 static void     g_mencoder_class_init(GMencoderClass * klass);
    70 static void     g_mencoder_init(GMencoder * object);
    71 static void     g_mencoder_dispose(GObject * object);
    72 static void     g_mencoder_finalize(GObject * object);
    73 static GstElement *_create_audio_bin(const gchar * encode,
    74                                      gchar ** encode_prop, gint rate);
    75 static GstElement *_create_video_bin(const gchar * encode,
    76                                      gchar ** encode_prop,
    77                                      gdouble fps,
    78                                      gint rate, guint width, guint height);
    79 
    80 static          gboolean
    81 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data);
    82 
    83 static void     _decodebin_new_pad_cb(GstElement * object,
    84                                       GstPad * pad,
    85                                       gboolean flag, gpointer user_data);
    86 
    87 static void     _decodebin_unknown_type_cb(GstElement * object,
    88                                            GstPad * pad,
    89                                            GstCaps * caps,
    90                                            gpointer user_data);
    91 
    92 static void     _close_output(GMencoder * self);
    93 static gboolean _open_output(GMencoder * self, const gchar * uri);
    94 
    95 static GstElement *_create_source(const gchar * uri);
    96 static GstElement *_create_pipeline(GMencoder * self,
    97                                     const gchar * video_encode,
    98                                     const gchar * mux_name,
    99                                     gchar ** video_encode_prop,
   100                                     gdouble video_fps,
   101                                     gdouble video_rate,
   102                                     guint video_width,
   103                                     guint video_height,
   104                                     const gchar * audio_encode,
   105                                     gchar ** audio_encode_prop,
   106                                     guint audio_rate);
   107 #ifdef USE_MANUAL_SINK
   108 static void _flush_queue 	    (GMencoder *self);
   109 static void _buffer_arrive_cb       (GstElement* object,
   110                                      GstBuffer* buff,
   111                                      GstPad* pad,
   112                                      gpointer user_data);
   113 #endif
   114 
   115 
   116 static gboolean _tick_cb(gpointer data);
   117 
   118 static guint    g_mencoder_signals[LAST_SIGNAL] = { 0 };
   119 
   120 G_DEFINE_TYPE(GMencoder, g_mencoder, G_TYPE_OBJECT)
   121 
   122 static void     g_mencoder_class_init(GMencoderClass * klass)
   123 {
   124     GObjectClass   *object_class;
   125     object_class = (GObjectClass *) klass;
   126     g_type_class_add_private(klass, sizeof(GMencoderPrivate));
   127 
   128     object_class->dispose = g_mencoder_dispose;
   129     object_class->finalize = g_mencoder_finalize;
   130 
   131     g_mencoder_signals[PAUSED] =
   132         g_signal_new("paused",
   133                      G_OBJECT_CLASS_TYPE(object_class),
   134                      G_SIGNAL_RUN_FIRST,
   135                      0, NULL, NULL,
   136                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   137 
   138     g_mencoder_signals[PLAYING] =
   139         g_signal_new("playing",
   140                      G_OBJECT_CLASS_TYPE(object_class),
   141                      G_SIGNAL_RUN_FIRST,
   142                      0, NULL, NULL,
   143                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   144 
   145     g_mencoder_signals[STOPED] =
   146         g_signal_new("stoped",
   147                      G_OBJECT_CLASS_TYPE(object_class),
   148                      G_SIGNAL_RUN_FIRST,
   149                      0, NULL, NULL,
   150                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   151 
   152     g_mencoder_signals[EOS] =
   153         g_signal_new("eos",
   154                      G_OBJECT_CLASS_TYPE(object_class),
   155                      G_SIGNAL_RUN_FIRST,
   156                      0, NULL, NULL,
   157                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   158 
   159     g_mencoder_signals[ERROR] =
   160         g_signal_new("error",
   161                      G_OBJECT_CLASS_TYPE(object_class),
   162                      G_SIGNAL_RUN_LAST,
   163                      0, NULL, NULL,
   164                      g_cclosure_marshal_VOID__STRING,
   165                      G_TYPE_NONE, 1, G_TYPE_STRING);
   166 }
   167 
   168 static void
   169 g_mencoder_init(GMencoder * self)
   170 {
   171     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   172     priv->info = g_new0(SetupInfo, 1);
   173 #ifdef USE_MANUAL_SINK
   174     priv->queue = g_byte_array_new ();
   175 #endif
   176 }
   177 
   178 static void
   179 g_mencoder_dispose(GObject * object)
   180 {
   181 }
   182 
   183 static void
   184 g_mencoder_finalize(GObject * object)
   185 {
   186     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object);
   187 
   188     // TODO: clear vars
   189     g_mencoder_close_stream(G_MENCODER(object));
   190     g_free (priv->info);
   191 #ifdef USE_MANUAL_SINK
   192     g_byte_array_free (priv->queue, TRUE);
   193 #endif
   194 }
   195 
   196 GMencoder      *
   197 g_mencoder_new(void)
   198 {
   199     return g_object_new(G_TYPE_MENCODER, NULL);
   200 }
   201 
   202 
   203 static void
   204 _obj_set_prop(GObject * obj, const gchar * prop_name,
   205               const gchar * prop_val)
   206 {
   207     GValue          p = { 0 };
   208     GValue          v = { 0 };
   209     GParamSpec     *s = NULL;
   210     GObjectClass   *k = G_OBJECT_GET_CLASS(obj);
   211 
   212 
   213     g_value_init(&v, G_TYPE_STRING);
   214     g_value_set_string(&v, prop_val);
   215 
   216     s = g_object_class_find_property(k, prop_name);
   217     if (s == NULL) {
   218         g_print("Invalid property name: %s\n", prop_name);
   219         return;
   220     }
   221 
   222     g_value_init(&p, s->value_type);
   223     switch (s->value_type) {
   224     case G_TYPE_INT:
   225         g_value_set_int(&p, atoi(prop_val));
   226         break;
   227     case G_TYPE_ULONG:
   228         g_value_set_ulong (&p, atol(prop_val));
   229         break;
   230     case G_TYPE_STRING:
   231         g_value_set_string(&p, prop_val);
   232         break;
   233     case G_TYPE_BOOLEAN:
   234         g_value_set_boolean(&p, (gboolean) atoi (prop_val));
   235         break;
   236     case G_TYPE_DOUBLE:
   237         g_value_set_double(&p, atof (prop_val));
   238         break;
   239     case G_TYPE_FLOAT:
   240         g_value_set_float(&p, (float) atof (prop_val));
   241         break;
   242     default:
   243         g_warning ("Property %s of type %s. Not supported", 
   244                    prop_name, g_type_name (s->value_type));
   245         return;
   246     }
   247 
   248     g_object_set_property(obj, prop_name, &p);
   249     g_value_unset(&v);
   250     g_value_unset(&p);
   251 }
   252 
   253 static GstElement *
   254 _create_element_with_prop(const gchar * factory_name,
   255                           const gchar * element_name, gchar ** prop)
   256 {
   257     GstElement     *ret;
   258     int             i;
   259 
   260     ret = gst_element_factory_make(factory_name, element_name);
   261     if (ret == NULL)
   262         return NULL;
   263 
   264     if (prop != NULL) {
   265         for (i = 0; i < g_strv_length(prop); i++) {
   266             if (prop[i] != NULL) {
   267                 char          **v = g_strsplit(prop[i], "=", 2);
   268                 if (g_strv_length(v) == 2) {
   269                     _obj_set_prop(G_OBJECT(ret), v[0], v[1]);
   270                 }
   271                 g_strfreev(v);
   272             }
   273         }
   274     }
   275 
   276     return ret;
   277 
   278 }
   279 
   280 static GstElement *
   281 _create_audio_bin(const gchar * encode, gchar ** encode_prop, gint rate)
   282 {
   283     GstElement     *abin = NULL;
   284     GstElement     *aqueue = NULL;
   285     GstElement     *aconvert = NULL;
   286     GstElement     *aencode = NULL;
   287     GstElement     *aqueue_src = NULL;
   288     GstPad         *apad = NULL;
   289 
   290     // audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
   291     // udpsink name=upd_audio host=224.0.0.1 port=5002
   292     abin = gst_bin_new("abin");
   293     aqueue = gst_element_factory_make("queue", "aqueue");
   294     aconvert = gst_element_factory_make("audioconvert", "aconvert");
   295     aencode =
   296         _create_element_with_prop((encode ? encode : "lame"), "aencode",
   297                                   encode_prop);
   298     aqueue_src = gst_element_factory_make("queue", "aqueue_src");
   299 
   300     if ((abin == NULL) || (aqueue == NULL) || (aconvert == NULL)
   301         || (aencode == NULL) || (aqueue_src == NULL)) {
   302         g_warning("Audio elements not found");
   303         goto error;
   304     }
   305 
   306     g_object_set(G_OBJECT(aencode), "bitrate", 32, NULL);
   307     /*
   308      * if (rate > 0) { g_object_set (G_OBJECT (aencode), "bitrate", 32,
   309      * NULL); } 
   310      */
   311 
   312     gst_bin_add_many(GST_BIN(abin), aqueue, aconvert, aencode, aqueue_src,
   313                      NULL);
   314     if (gst_element_link_many(aqueue, aconvert, aencode, aqueue_src, NULL)
   315         == FALSE) {
   316         g_warning("Not Link audio elements");
   317     }
   318     // TODO: apply audio rate
   319 
   320     // ghost pad the audio bin
   321     apad = gst_element_get_pad(aqueue, "sink");
   322     gst_element_add_pad(abin, gst_ghost_pad_new("sink", apad));
   323     gst_object_unref(apad);
   324 
   325     apad = gst_element_get_pad(aqueue_src, "src");
   326     gst_element_add_pad(abin, gst_ghost_pad_new("src", apad));
   327     gst_object_unref(apad);
   328 
   329     return abin;
   330   error:
   331     if (abin != NULL)
   332         gst_object_unref(abin);
   333 
   334     if (aqueue != NULL)
   335         gst_object_unref(aqueue);
   336 
   337     if (aconvert != NULL)
   338         gst_object_unref(aconvert);
   339 
   340     if (aencode != NULL)
   341         gst_object_unref(aencode);
   342 
   343     if (aqueue_src != NULL)
   344         gst_object_unref(aqueue_src);
   345 
   346     if (apad != NULL)
   347         gst_object_unref(apad);
   348 
   349     return NULL;
   350 }
   351 
   352 
   353 
   354 
   355 // queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! colorspace
   356 // ! rate ! encode ! queue
   357 static GstElement *
   358 _create_video_bin(const gchar * encode,
   359                   gchar ** encode_prop,
   360                   gdouble fps, gint rate, guint width, guint height)
   361 {
   362     GstElement     *vbin = NULL;
   363     GstElement     *vqueue = NULL;
   364     GstElement     *vqueue_src = NULL;
   365     GstElement     *vcolorspace = NULL;
   366     GstElement     *vencode = NULL;
   367     GstElement     *vrate = NULL;
   368     GstPad         *vpad = NULL;
   369 
   370     vbin = gst_bin_new("vbin");
   371     vqueue = gst_element_factory_make("queue", "vqueue");
   372     vcolorspace =
   373         gst_element_factory_make("ffmpegcolorspace", "colorspace");
   374 
   375     vencode = _create_element_with_prop((encode !=
   376                                          NULL ? encode :
   377                                          "ffenc_mpeg1video"), "vencode",
   378                                         encode_prop);
   379     vqueue_src = gst_element_factory_make("queue", "queue_src");
   380 
   381     if ((vbin == NULL) || (vqueue == NULL) || (vcolorspace == NULL)
   382         || (vencode == NULL) || (vqueue_src == NULL)) {
   383         g_warning("Video elements not found");
   384         goto error;
   385     }
   386 
   387     gst_bin_add_many(GST_BIN(vbin), vqueue, vcolorspace, vencode,
   388                      vqueue_src, NULL);
   389 
   390     if ((width > 0) && (height > 0)) {
   391         // Scalling video
   392         GstCaps        *vcaps;
   393         GstElement     *vscale =
   394             gst_element_factory_make("videoscale", "vscale");
   395 
   396         gst_bin_add(GST_BIN(vbin), vscale);
   397 
   398         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   399                                     "width", G_TYPE_INT, width,
   400                                     "height", G_TYPE_INT, height, NULL);
   401 
   402         gst_element_link(vqueue, vscale);
   403 
   404         if (gst_element_link_filtered(vscale, vcolorspace, vcaps) == FALSE) {
   405             g_warning("Fail to resize video");
   406             gst_object_unref(vcaps);
   407             gst_object_unref(vscale);
   408             goto error;
   409         }
   410         gst_caps_unref(vcaps);
   411     } else {
   412         gst_element_link(vqueue, vcolorspace);
   413     }
   414 
   415     if (fps > 0) {
   416         // Changing the video fps
   417         GstCaps        *vcaps;
   418         vrate = gst_element_factory_make("videorate", "vrate");
   419 
   420         gst_bin_add(GST_BIN(vbin), vrate);
   421 
   422         if (gst_element_link(vcolorspace, vrate) == FALSE) {
   423             g_warning("Fail to link video elements");
   424             goto error;
   425         }
   426 
   427         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   428                                     "framerate", GST_TYPE_FRACTION,
   429                                     (int) (fps * 1000), 1000, NULL);
   430 
   431         if (gst_element_link_filtered(vrate, vencode, vcaps) == FALSE) {
   432             g_warning("Fail to link vrate with vencode.");
   433             goto error;
   434         }
   435         gst_caps_unref(vcaps);
   436     } else {
   437         if (gst_element_link(vcolorspace, vencode) == FALSE) {
   438             g_warning("Fail to link colorspace and video encode element.");
   439             goto error;
   440         }
   441     }
   442 
   443     gst_element_link(vencode, vqueue_src);
   444 
   445     // ghost pad the video bin
   446     vpad = gst_element_get_pad(vqueue, "sink");
   447     gst_element_add_pad(vbin, gst_ghost_pad_new("sink", vpad));
   448     gst_object_unref(vpad);
   449 
   450     vpad = gst_element_get_pad(vqueue_src, "src");
   451     gst_element_add_pad(vbin, gst_ghost_pad_new("src", vpad));
   452     gst_object_unref(vpad);
   453 
   454     return vbin;
   455 
   456   error:
   457     if (vpad != NULL)
   458         gst_object_unref(vpad);
   459 
   460     if (vbin != NULL)
   461         gst_object_unref(vbin);
   462 
   463     if (vqueue != NULL)
   464         gst_object_unref(vqueue);
   465 
   466     if (vencode != NULL)
   467         gst_object_unref(vencode);
   468 
   469     if (vqueue_src != NULL)
   470         gst_object_unref(vqueue_src);
   471 
   472     if (vcolorspace != NULL)
   473         gst_object_unref(vcolorspace);
   474 
   475     return NULL;
   476 }
   477 
   478 
   479 
   480 gboolean 
   481 g_mencoder_setup_stream(GMencoder * self,
   482 			gboolean chunked,
   483                         const gchar * mux_name,
   484                         const gchar * video_encode,
   485                         gchar ** video_encode_prop,
   486                         gdouble video_fps,
   487                         gdouble video_rate,
   488                         guint video_width,
   489                         guint video_height,
   490                         const gchar * audio_encode,
   491                         gchar ** audio_encode_prop,
   492                         guint audio_rate, const gchar * out_uri)
   493 {
   494     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   495     if (priv->ready == TRUE) {
   496         g_warning
   497             ("Stream already configured. You need close stream first.");
   498         return FALSE;
   499     }
   500 
   501     _close_output(self);
   502     if (_open_output(self, out_uri) == FALSE)
   503         return FALSE;
   504 
   505     priv->sources = 0;
   506     priv->send_chunked = chunked;
   507     priv->pipe = _create_pipeline(self,
   508                                   video_encode,
   509                                   mux_name,
   510                                   video_encode_prop,
   511                                   video_fps,
   512                                   video_rate,
   513                                   video_width,
   514                                   video_height,
   515                                   audio_encode, audio_encode_prop,
   516                                   audio_rate);
   517 
   518     return (priv->pipe != NULL);
   519 }
   520 
   521 
   522 gboolean
   523 g_mencoder_append_uri(GMencoder * self, const gchar * uri)
   524 {
   525     GstPad         *pad_src;
   526     GstPad         *pad_sink;
   527     GstElement     *src;
   528     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   529     gboolean        ret = FALSE;
   530     GstElement     *ap = NULL;
   531     GstElement     *vp = NULL;
   532 
   533 
   534     g_return_val_if_fail(priv->pipe != NULL, FALSE);
   535     g_return_val_if_fail(priv->ready == FALSE, FALSE);
   536 
   537 #ifndef SUPPORT_MULT_INPUT
   538     g_return_val_if_fail(priv->sources < 1, FALSE);
   539 #endif
   540 
   541     src = _create_source(uri);
   542     if (src == NULL)
   543         return FALSE;
   544 
   545     priv->src = gst_bin_get_by_name(GST_BIN(src), "src");
   546 
   547     gst_bin_add(GST_BIN(priv->pipe), src);
   548 
   549 #ifdef SUPPORT_MULT_INPUT
   550     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "ap");
   551     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vp");
   552 #else
   553     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "abin");
   554     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vbin");
   555 #endif
   556 
   557     if ((vp == NULL) || (ap == NULL)) {
   558         g_warning("Fail to get output bin");
   559         goto error;
   560     }
   561 
   562     pad_src = gst_element_get_pad(src, "src_audio");
   563     pad_sink = gst_element_get_compatible_pad(ap,
   564                                               pad_src,
   565                                               gst_pad_get_caps(pad_src));
   566 
   567     if ((pad_sink == NULL) || (pad_src == NULL))
   568         goto error;
   569 
   570     GstPadLinkReturn lret = gst_pad_link(pad_src, pad_sink);
   571     if (lret != GST_PAD_LINK_OK)
   572         goto error;
   573 
   574     gst_object_unref(pad_src);
   575     gst_object_unref(pad_sink);
   576 
   577     pad_src = gst_element_get_pad(src, "src_video");
   578     pad_sink = gst_element_get_compatible_pad(vp,
   579                                               pad_src,
   580                                               gst_pad_get_caps(pad_src));
   581 
   582     if ((pad_src == NULL) || (pad_sink == NULL))
   583         goto error;
   584 
   585     if (gst_pad_link(pad_src, pad_sink) != GST_PAD_LINK_OK) {
   586         g_warning("invalid source. video");
   587         goto error;
   588     }
   589 
   590     priv->sources++;
   591     ret = TRUE;
   592   error:
   593 
   594     if ((src != NULL) && (ret == FALSE)) {
   595         gst_bin_remove(GST_BIN(priv->pipe), src);
   596         gst_object_unref(src);
   597     }
   598 
   599     if (ap != NULL)
   600         gst_object_unref(ap);
   601 
   602     if (vp != NULL)
   603         gst_object_unref(vp);
   604 
   605     if (pad_src != NULL)
   606         gst_object_unref(pad_src);
   607 
   608     if (pad_sink != NULL)
   609         gst_object_unref(pad_sink);
   610 
   611     return ret;
   612 }
   613 
   614 
   615 
   616 void
   617 g_mencoder_remove_uri(GMencoder * self, const gchar * uri)
   618 {
   619     // GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE (self);
   620     // TODO: remove src
   621 }
   622 
   623 void
   624 g_mencoder_play_stream(GMencoder * self)
   625 {
   626     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   627     g_return_if_fail(priv->ready == FALSE);
   628     priv->ready = TRUE;
   629     gst_element_set_state(priv->pipe, GST_STATE_PLAYING);
   630     priv->tick_id = g_timeout_add(500, _tick_cb, self);
   631 }
   632 
   633 void
   634 g_mencoder_pause_stream(GMencoder * self)
   635 {
   636     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   637     g_return_if_fail(priv->ready == TRUE);
   638     gst_element_set_state(priv->pipe, GST_STATE_PAUSED);
   639 }
   640 
   641 void
   642 g_mencoder_close_stream(GMencoder * self)
   643 {
   644 
   645     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   646     if (priv->tick_id != 0) {
   647         g_source_remove(priv->tick_id);
   648         priv->tick_id = 0;
   649     }
   650 
   651     if (priv->pipe != NULL) {
   652         // TODO: fixe pipeline dispose
   653         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   654         // g_debug ("SETING STATE TO NULL: OK");
   655         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   656         // gst_object_unref (priv->pipe);
   657         gst_object_unref(priv->src);
   658         priv->src = NULL;
   659         priv->pipe = NULL;
   660         priv->abin = NULL;
   661         priv->vbin = NULL;
   662         priv->sink = NULL;
   663     }
   664     priv->ready = FALSE;
   665 }
   666 
   667 static GstElement *
   668 _create_pipeline(GMencoder * self,
   669                  const gchar * video_encode,
   670                  const gchar * mux_name,
   671                  gchar ** video_encode_prop,
   672                  gdouble video_fps,
   673                  gdouble video_rate,
   674                  guint video_width,
   675                  guint video_height,
   676                  const gchar * audio_encode,
   677                  gchar ** audio_encode_prop, guint audio_rate)
   678 {
   679     GstBus         *bus = NULL;
   680     GstElement     *pipe = NULL;
   681     GstElement     *sink = NULL;
   682     GstElement     *mux = NULL;
   683     GstElement     *abin = NULL;
   684     GstElement     *vbin = NULL;
   685     GstElement     *queue = NULL;
   686     GstPad         *aux_pad = NULL;
   687     GstPad         *mux_pad = NULL;
   688 #ifdef SUPPORT_MULT_INPUT
   689     GstElement     *ap = NULL;
   690     GstElement     *vp = NULL;
   691 #endif
   692 
   693     pipe = gst_pipeline_new("pipe");
   694 
   695 #ifdef SUPPORT_MULT_INPUT
   696     ap = gst_element_factory_make("concatmux", "ap");
   697     vp = gst_element_factory_make("concatmux", "vp");
   698     gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL);
   699 #endif
   700     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   701 
   702     mux =
   703         gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"),
   704                                  "mux");
   705     if (mux == NULL)
   706         goto error;
   707 
   708     queue = gst_element_factory_make("queue", "queueu_sink");
   709 
   710 
   711 #ifdef USE_MANUAL_SINK
   712     sink = gst_element_factory_make("fakesink", "sink");
   713     g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
   714     g_signal_connect (G_OBJECT (sink),
   715                       "handoff",
   716                       G_CALLBACK (_buffer_arrive_cb),
   717                       self);
   718 #else    
   719     sink = gst_element_factory_make("fdsink", "sink");
   720     if (sink == NULL)
   721         goto error;
   722 
   723     g_object_set(G_OBJECT(sink), "fd", priv->fd, "sync", FALSE, NULL);
   724 #endif
   725 
   726     abin = _create_audio_bin(audio_encode, audio_encode_prop, audio_rate);
   727     if (abin == NULL)
   728         goto error;
   729 
   730     vbin =
   731         _create_video_bin(video_encode, video_encode_prop, video_fps,
   732                           video_rate, video_width, video_height);
   733     if (vbin == NULL)
   734         goto error;
   735 
   736     // Finish Pipe
   737     gst_bin_add_many(GST_BIN(pipe), abin, vbin, mux, queue, sink, NULL);
   738 
   739 
   740 #ifdef SUPPORT_MULT_INPUT
   741     if (gst_element_link(ap, abin) == FALSE) {
   742         g_warning("Fail to link concat and abin");
   743         goto error;
   744     }
   745 
   746     if (gst_element_link(vp, vbin) == FALSE) {
   747         g_warning("Fail to link concat and vbin");
   748     }
   749 #endif
   750 
   751     // Link bins with mux
   752     aux_pad = gst_element_get_pad(abin, "src");
   753     mux_pad =
   754         gst_element_get_compatible_pad(mux, aux_pad,
   755                                        GST_PAD_CAPS(aux_pad));
   756     if (mux_pad == NULL) {
   757         g_warning("Mux element no have audio PAD");
   758         goto error;
   759     }
   760     GstPadLinkReturn ret = gst_pad_link(aux_pad, mux_pad);
   761     if (ret != GST_PAD_LINK_OK) {
   762         g_warning("Fail link audio and mux: %d", ret);
   763         goto error;
   764 
   765     }
   766     gst_object_unref(aux_pad);
   767     gst_object_unref(mux_pad);
   768 
   769     aux_pad = gst_element_get_pad(vbin, "src");
   770     mux_pad =
   771         gst_element_get_compatible_pad(mux, aux_pad,
   772                                        GST_PAD_CAPS(aux_pad));
   773     if (mux_pad == NULL) {
   774         g_warning("Mux element no have video PAD");
   775         goto error;
   776     }
   777     ret = gst_pad_link(aux_pad, mux_pad);
   778     if (ret != GST_PAD_LINK_OK) {
   779         g_warning("Fail link video and mux: %d", ret);
   780         goto error;
   781     }
   782     gst_object_unref(aux_pad);
   783     gst_object_unref(mux_pad);
   784     aux_pad = NULL;
   785     mux_pad = NULL;
   786 
   787     // Link mux with sink
   788     gst_element_link_many(mux, queue, sink, NULL);
   789 
   790     bus = gst_pipeline_get_bus(GST_PIPELINE(pipe));
   791     gst_bus_add_watch(bus, _pipeline_bus_cb, self);
   792     gst_object_unref(bus);
   793     return pipe;
   794 
   795   error:
   796     g_warning("Invalid uri");
   797 
   798     if (pipe != NULL) {
   799         gst_object_unref(pipe);
   800     }
   801 
   802 
   803     if (mux != NULL) {
   804         gst_object_unref(mux);
   805     }
   806 
   807     if (mux_pad != NULL) {
   808         gst_object_unref(mux_pad);
   809     }
   810 
   811     if (aux_pad != NULL) {
   812         gst_object_unref(mux_pad);
   813     }
   814 
   815     if (sink != NULL) {
   816         gst_object_unref(sink);
   817     }
   818 
   819     if (abin != NULL) {
   820         gst_object_unref(abin);
   821     }
   822 
   823     if (vbin != NULL) {
   824         gst_object_unref(vbin);
   825     }
   826 
   827     return FALSE;
   828 }
   829 
   830 
   831 static void
   832 _close_output(GMencoder * self)
   833 {
   834 }
   835 
   836 static GstElement *
   837 _create_source(const gchar * uri)
   838 {
   839 
   840     GstElement     *bsrc = NULL;
   841     GstElement     *src = NULL;
   842     GstElement     *queue = NULL;
   843     GstElement     *aqueue = NULL;
   844     GstElement     *vqueue = NULL;
   845     GstElement     *decode = NULL;
   846     GstPad         *src_pad = NULL;
   847 
   848 
   849     bsrc = gst_bin_new(NULL);
   850 
   851     // src = gst_element_factory_make ("gnomevfssrc", "src");
   852     // g_object_set (G_OBJECT (src), "location", uri, NULL);
   853     src = gst_element_make_from_uri(GST_URI_SRC, uri, "src");
   854     if (src == NULL)
   855         goto error;
   856 
   857     decode = gst_element_factory_make("decodebin", "decode");
   858     if (decode == NULL)
   859         goto error;
   860 
   861     queue = gst_element_factory_make("queue", "queue_src");
   862     aqueue = gst_element_factory_make("queue", "aqueue");
   863     if (aqueue == NULL)
   864         goto error;
   865 
   866     vqueue = gst_element_factory_make("queue", "vqueue");
   867     if (vqueue == NULL)
   868         goto error;
   869 
   870     gst_bin_add_many(GST_BIN(bsrc), src, queue, decode, aqueue, vqueue,
   871                      NULL);
   872     gst_element_link_many(src, queue, decode, NULL);
   873 
   874     g_signal_connect(G_OBJECT(decode),
   875                      "new-decoded-pad",
   876                      G_CALLBACK(_decodebin_new_pad_cb), bsrc);
   877 
   878     g_signal_connect(G_OBJECT(decode),
   879                      "unknown-type",
   880                      G_CALLBACK(_decodebin_unknown_type_cb), pipe);
   881 
   882     src_pad = gst_element_get_pad(aqueue, "src");
   883     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_audio", src_pad));
   884     gst_object_unref(src_pad);
   885 
   886     src_pad = gst_element_get_pad(vqueue, "src");
   887     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_video", src_pad));
   888     gst_object_unref(src_pad);
   889 
   890     return bsrc;
   891 
   892   error:
   893     if (src != NULL) {
   894         gst_object_unref(src);
   895     }
   896 
   897     if (decode != NULL) {
   898         gst_object_unref(decode);
   899     }
   900 
   901     if (aqueue != NULL) {
   902         gst_object_unref(aqueue);
   903     }
   904 
   905     if (vqueue != NULL) {
   906         gst_object_unref(vqueue);
   907     }
   908 
   909     return NULL;
   910 }
   911 
   912 static gboolean
   913 _open_output(GMencoder * self, const gchar * uri)
   914 {
   915     gboolean ret = TRUE;
   916     gchar         **i;
   917     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   918 
   919     i = g_strsplit(uri, "://", 0);
   920     if (strcmp(i[0], "fd") == 0) {
   921         priv->fd = atoi(i[1]);
   922 	if (priv->send_chunked)
   923 	    fcntl (priv->fd, F_SETFL, O_ASYNC);
   924     } else if (strcmp(i[0], "file") == 0) {
   925         if (g_file_test (i[1], G_FILE_TEST_EXISTS)) {
   926             if (unlink (i[1]) != 0) {
   927                 g_warning ("Fail to write in : %s", uri);
   928                 ret = FALSE;
   929                 goto done;
   930             }
   931         }
   932         priv->fd = open(i[1], O_WRONLY | O_CREAT | O_TRUNC,
   933                         S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
   934 
   935         if (priv->fd == -1) {
   936             g_warning ("Fail to open : %s", uri);
   937             ret = FALSE;
   938         }
   939     } else {
   940         g_warning("Output uri not supported");
   941         ret = FALSE;
   942     }
   943 
   944 done:
   945     g_strfreev(i);
   946     return ret;
   947 }
   948 
   949 static          gboolean
   950 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data)
   951 {
   952     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
   953 
   954     switch (GST_MESSAGE_TYPE(msg)) {
   955 
   956     case GST_MESSAGE_STATE_CHANGED:
   957         {
   958             GstState        oldstate;
   959             GstState        newstate;
   960             GstState        pendingstate;
   961 
   962 
   963             gst_message_parse_state_changed(msg, &oldstate,
   964                                             &newstate, &pendingstate);
   965 
   966             if (pendingstate != GST_STATE_VOID_PENDING)
   967                 break;
   968 
   969             if ((oldstate == GST_STATE_READY)
   970                 && (newstate == GST_STATE_PAUSED)) {
   971                 if (priv->ready)
   972                     g_signal_emit(user_data, g_mencoder_signals[PAUSED],
   973                                   0);
   974             } else if ((oldstate == GST_STATE_PAUSED)
   975                        && (newstate == GST_STATE_PLAYING)) {
   976                 g_signal_emit(user_data, g_mencoder_signals[PLAYING], 0);
   977             } else if ((oldstate == GST_STATE_READY) &&
   978                        (newstate == GST_STATE_NULL)) {
   979                 g_signal_emit(user_data, g_mencoder_signals[STOPED], 0);
   980             }
   981             break;
   982         }
   983 
   984     case GST_MESSAGE_ERROR:
   985         {
   986             GError         *error;
   987             gchar          *debug;
   988             gchar          *err_str;
   989 
   990             if (priv->tick_id != 0) {
   991                 g_source_remove(priv->tick_id);
   992                 priv->tick_id = 0;
   993             }
   994 
   995             gst_message_parse_error(msg, &error, &debug);
   996             err_str = g_strdup_printf("Error [%d] %s (%s)", error->code,
   997                                       error->message, debug);
   998             priv->ready = FALSE;
   999             g_signal_emit(user_data, g_mencoder_signals[ERROR], 0,
  1000                           err_str);
  1001             g_free(err_str);
  1002             g_clear_error(&error);
  1003             g_free(debug);
  1004             break;
  1005         }
  1006 
  1007     case GST_MESSAGE_EOS:
  1008         priv->ready = FALSE;
  1009 #ifdef USE_MANUAL_SINK
  1010 	_flush_queue (G_MENCODER (user_data));
  1011 #endif
  1012         g_signal_emit(user_data, g_mencoder_signals[EOS], 0);
  1013         break;
  1014 
  1015     case GST_MESSAGE_DURATION:
  1016         {
  1017             GstFormat       format;
  1018             gint64          duration;
  1019             gst_message_parse_duration(msg, &format, &duration);
  1020             if (format == GST_FORMAT_BYTES)
  1021                 priv->duration = duration;
  1022             break;
  1023         }
  1024     default:
  1025         {
  1026             break;
  1027         }
  1028     }
  1029     return TRUE;
  1030 }
  1031 
  1032 
  1033 
  1034 static void
  1035 _decodebin_new_pad_cb(GstElement * object,
  1036                       GstPad * pad, gboolean flag, gpointer user_data)
  1037 {
  1038     GstCaps        *caps;
  1039     gchar          *str_caps = NULL;
  1040     GstElement     *sink_element;
  1041     GstPad         *sink_pad;
  1042 
  1043     caps = gst_pad_get_caps(pad);
  1044     str_caps = gst_caps_to_string(caps);
  1045     if (strstr(str_caps, "audio") != NULL) {
  1046         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "aqueue");
  1047     } else if (strstr(str_caps, "video") != NULL) {
  1048         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "vqueue");
  1049     } else {
  1050         g_warning("invalid caps %s", str_caps);
  1051     }
  1052 
  1053     sink_pad = gst_element_get_pad(sink_element, "sink");
  1054     gst_pad_link(pad, sink_pad);
  1055 
  1056     gst_object_unref(sink_element);
  1057     gst_object_unref(sink_pad);
  1058     g_free(str_caps);
  1059     gst_caps_unref(caps);
  1060 }
  1061 
  1062 static void
  1063 _decodebin_unknown_type_cb(GstElement * object,
  1064                            GstPad * pad, GstCaps * caps,
  1065                            gpointer user_data)
  1066 {
  1067     g_warning("Unknown Type");
  1068     // priv->ready = FALSE;
  1069 }
  1070 
  1071 static          gboolean
  1072 _tick_cb(gpointer user_data)
  1073 {
  1074     GstFormat       format = GST_FORMAT_BYTES;
  1075     gint64          cur = 0;
  1076 
  1077     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1078 
  1079     if (priv->duration == 0) {
  1080         gint64          d = 0;
  1081         if (gst_element_query_duration(priv->src, &format, &d))
  1082             priv->duration = d;
  1083     }
  1084 
  1085     if (priv->duration != 0) {
  1086         gst_element_query_position(priv->src, &format, &cur);
  1087         g_print("PROGRESS:%lli\n", (99 * cur) / priv->duration);
  1088     }
  1089 
  1090     return TRUE;
  1091 }
  1092 
  1093 
  1094 #ifdef USE_MANUAL_SINK
  1095 static gboolean
  1096 _send_buffer (gint fd, gpointer buff, gint size)
  1097 {
  1098     gboolean ret = TRUE;
  1099     gchar *msg;
  1100     GByteArray *b_send;
  1101 
  1102     b_send = g_byte_array_new ();
  1103     msg = g_strdup_printf ("%x\r\n", size);
  1104     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1105     g_free (msg);
  1106 
  1107     b_send = g_byte_array_append (b_send, buff, size);
  1108 
  1109     msg = g_strdup ("\r\n");
  1110     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1111     g_free (msg);
  1112 
  1113     if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0) 
  1114         ret = FALSE;
  1115     g_byte_array_free (b_send, TRUE);
  1116 
  1117     return ret;
  1118 }
  1119 
  1120 static void
  1121 _flush_queue (GMencoder *self)
  1122 {
  1123     gchar *end_msg;
  1124     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
  1125 
  1126     if (BUFFER_SIZE == 0)
  1127         return;
  1128 
  1129     if (priv->queue->len > 0) {
  1130         _send_buffer (priv->fd, priv->queue->data, priv->queue->len);
  1131         priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1132     }
  1133 
  1134     end_msg = g_strdup ("0\r\n\r\n");
  1135     write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar));
  1136     g_free (end_msg);
  1137     return;
  1138 }
  1139 static void 
  1140 _buffer_arrive_cb (GstElement* object,
  1141                    GstBuffer* buff,
  1142                    GstPad* pad,
  1143                    gpointer user_data)
  1144 {
  1145     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1146 
  1147     if (BUFFER_SIZE == 0) {
  1148 	if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE)
  1149              goto error;
  1150         return;
  1151     }
  1152 
  1153     priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
  1154     while (priv->queue->len >= BUFFER_SIZE) {
  1155     	//g_usleep (0.2 * G_USEC_PER_SEC);
  1156     	if (priv->send_chunked) {	
  1157 	    if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE)
  1158                 goto error;
  1159 	} else {
  1160 	    if (write (priv->fd, priv->queue->data, BUFFER_SIZE) == -1)
  1161                 goto error;
  1162 	}
  1163         priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE);
  1164     }
  1165     return;
  1166 
  1167 error:
  1168     if (priv->tick_id != 0) {
  1169         g_source_remove(priv->tick_id);
  1170         priv->tick_id = 0;
  1171     }
  1172     priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1173     g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket");
  1174 }
  1175 #endif 
  1176