gmyth-stream/gmemcoder/src/gmencoder.c
author morphbr
Mon Jul 02 08:33:58 2007 +0100 (2007-07-02)
branchtrunk
changeset 766 f1da4fbe667e
parent 761 949291aaba65
child 768 b405295259f3
permissions -rw-r--r--
[svn r772] * Corrected identation;
* Corrected connect with timeout while retrieving channel_list
     1 #ifdef HAVE_CONFIG_H
     2 #include "config.h"
     3 #endif
     4 
     5 #include <sys/stat.h>
     6 #include <fcntl.h>
     7 #include <unistd.h>
     8 #include <glib.h>
     9 #include <gst/gst.h>
    10 #include <string.h>
    11 #include <sys/types.h>
    12 #include <sys/socket.h>
    13 
    14 #include "gmencoder.h"
    15 
    16 #define G_MENCODER_GET_PRIVATE(obj) \
    17     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), G_TYPE_MENCODER, GMencoderPrivate))
    18 
    19 // #define SUPPORT_MULT_INPUT 0
    20 #define USE_MANUAL_SINK
    21 #define BUFFER_SIZE (1024 * 64)
    22 
    23 typedef struct _GMencoderPrivate GMencoderPrivate;
    24 typedef struct _SetupInfo SetupInfo;
    25 
    26 struct _SetupInfo {
    27     gchar          *video_encode;
    28     gchar          *mux_name;
    29     gchar         **video_encode_prop;
    30     gdouble         video_fps;
    31     gdouble         video_rate;
    32     guint           video_width;
    33     guint           video_height;
    34     gchar          *audio_encode;
    35     gchar         **audio_encode_prop;
    36     guint           audio_rate;
    37 };
    38 
    39 
    40 struct _GMencoderPrivate {
    41     GstElement     *pipe;
    42     GstElement     *abin;
    43     GstElement     *vbin;
    44     GstElement     *sink;
    45     GstElement     *src;
    46     gboolean        ready;
    47     SetupInfo      *info;
    48     GstClockTime    videot;
    49     GstClockTime    audiot;
    50     gint            fd;
    51     gint            sources;
    52     gint            tick_id;
    53     gint64          duration;
    54 #ifdef USE_MANUAL_SINK
    55     GByteArray	    *queue;
    56 #endif
    57 };
    58 
    59 enum {
    60     PAUSED,
    61     PLAYING,
    62     STOPED,
    63     EOS,
    64     ERROR,
    65     LAST_SIGNAL
    66 };
    67 
    68 static void     g_mencoder_class_init(GMencoderClass * klass);
    69 static void     g_mencoder_init(GMencoder * object);
    70 static void     g_mencoder_dispose(GObject * object);
    71 static void     g_mencoder_finalize(GObject * object);
    72 static GstElement *_create_audio_bin(const gchar * encode,
    73                                      gchar ** encode_prop, gint rate);
    74 static GstElement *_create_video_bin(const gchar * encode,
    75                                      gchar ** encode_prop,
    76                                      gdouble fps,
    77                                      gint rate, guint width, guint height);
    78 
    79 static          gboolean
    80 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data);
    81 
    82 static void     _decodebin_new_pad_cb(GstElement * object,
    83                                       GstPad * pad,
    84                                       gboolean flag, gpointer user_data);
    85 
    86 static void     _decodebin_unknown_type_cb(GstElement * object,
    87                                            GstPad * pad,
    88                                            GstCaps * caps,
    89                                            gpointer user_data);
    90 
    91 static void     _close_output(GMencoder * self);
    92 static gboolean _open_output(GMencoder * self, const gchar * uri);
    93 
    94 static GstElement *_create_source(const gchar * uri);
    95 static GstElement *_create_pipeline(GMencoder * self,
    96                                     const gchar * video_encode,
    97                                     const gchar * mux_name,
    98                                     gchar ** video_encode_prop,
    99                                     gdouble video_fps,
   100                                     gdouble video_rate,
   101                                     guint video_width,
   102                                     guint video_height,
   103                                     const gchar * audio_encode,
   104                                     gchar ** audio_encode_prop,
   105                                     guint audio_rate);
   106 #ifdef USE_MANUAL_SINK
   107 static void _flush_queue 	    (GMencoder *self);
   108 static void _buffer_arrive_cb       (GstElement* object,
   109                                      GstBuffer* buff,
   110                                      GstPad* pad,
   111                                      gpointer user_data);
   112 #endif
   113 
   114 
   115 static gboolean _tick_cb(gpointer data);
   116 
   117 static guint    g_mencoder_signals[LAST_SIGNAL] = { 0 };
   118 
   119 G_DEFINE_TYPE(GMencoder, g_mencoder, G_TYPE_OBJECT)
   120 
   121 static void     g_mencoder_class_init(GMencoderClass * klass)
   122 {
   123     GObjectClass   *object_class;
   124     object_class = (GObjectClass *) klass;
   125     g_type_class_add_private(klass, sizeof(GMencoderPrivate));
   126 
   127     object_class->dispose = g_mencoder_dispose;
   128     object_class->finalize = g_mencoder_finalize;
   129 
   130     g_mencoder_signals[PAUSED] =
   131         g_signal_new("paused",
   132                      G_OBJECT_CLASS_TYPE(object_class),
   133                      G_SIGNAL_RUN_FIRST,
   134                      0, NULL, NULL,
   135                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   136 
   137     g_mencoder_signals[PLAYING] =
   138         g_signal_new("playing",
   139                      G_OBJECT_CLASS_TYPE(object_class),
   140                      G_SIGNAL_RUN_FIRST,
   141                      0, NULL, NULL,
   142                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   143 
   144     g_mencoder_signals[STOPED] =
   145         g_signal_new("stoped",
   146                      G_OBJECT_CLASS_TYPE(object_class),
   147                      G_SIGNAL_RUN_FIRST,
   148                      0, NULL, NULL,
   149                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   150 
   151     g_mencoder_signals[EOS] =
   152         g_signal_new("eos",
   153                      G_OBJECT_CLASS_TYPE(object_class),
   154                      G_SIGNAL_RUN_FIRST,
   155                      0, NULL, NULL,
   156                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   157 
   158     g_mencoder_signals[ERROR] =
   159         g_signal_new("error",
   160                      G_OBJECT_CLASS_TYPE(object_class),
   161                      G_SIGNAL_RUN_LAST,
   162                      0, NULL, NULL,
   163                      g_cclosure_marshal_VOID__STRING,
   164                      G_TYPE_NONE, 1, G_TYPE_STRING);
   165 }
   166 
   167 static void
   168 g_mencoder_init(GMencoder * self)
   169 {
   170     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   171     priv->info = g_new0(SetupInfo, 1);
   172 #ifdef USE_MANUAL_SINK
   173     priv->queue = g_byte_array_new ();
   174 #endif
   175 }
   176 
   177 static void
   178 g_mencoder_dispose(GObject * object)
   179 {
   180 }
   181 
   182 static void
   183 g_mencoder_finalize(GObject * object)
   184 {
   185     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object);
   186 
   187     // TODO: clear vars
   188     g_mencoder_close_stream(G_MENCODER(object));
   189     g_free (priv->info);
   190 #ifdef USE_MANUAL_SINK
   191     g_byte_array_free (priv->queue, TRUE);
   192 #endif
   193 }
   194 
   195 GMencoder      *
   196 g_mencoder_new(void)
   197 {
   198     return g_object_new(G_TYPE_MENCODER, NULL);
   199 }
   200 
   201 
   202 static void
   203 _obj_set_prop(GObject * obj, const gchar * prop_name,
   204               const gchar * prop_val)
   205 {
   206     GValue          p = { 0 };
   207     GValue          v = { 0 };
   208     GParamSpec     *s = NULL;
   209     GObjectClass   *k = G_OBJECT_GET_CLASS(obj);
   210 
   211 
   212     g_value_init(&v, G_TYPE_STRING);
   213     g_value_set_string(&v, prop_val);
   214 
   215     s = g_object_class_find_property(k, prop_name);
   216     if (s == NULL) {
   217         g_print("Invalid property name: %s\n", prop_name);
   218         return;
   219     }
   220 
   221     g_value_init(&p, s->value_type);
   222     switch (s->value_type) {
   223     case G_TYPE_INT:
   224         g_value_set_int(&p, atoi(prop_val));
   225         break;
   226     case G_TYPE_STRING:
   227         g_value_set_string(&p, prop_val);
   228         break;
   229     default:
   230         return;
   231     }
   232 
   233     g_object_set_property(obj, prop_name, &p);
   234     g_value_unset(&v);
   235     g_value_unset(&p);
   236 }
   237 
   238 static GstElement *
   239 _create_element_with_prop(const gchar * factory_name,
   240                           const gchar * element_name, gchar ** prop)
   241 {
   242     GstElement     *ret;
   243     int             i;
   244 
   245     ret = gst_element_factory_make(factory_name, element_name);
   246     if (ret == NULL)
   247         return NULL;
   248 
   249     if (prop != NULL) {
   250         for (i = 0; i < g_strv_length(prop); i++) {
   251             if (prop[i] != NULL) {
   252                 char          **v = g_strsplit(prop[i], "=", 2);
   253                 if (g_strv_length(v) == 2) {
   254                     _obj_set_prop(G_OBJECT(ret), v[0], v[1]);
   255                 }
   256                 g_strfreev(v);
   257             }
   258         }
   259     }
   260 
   261     return ret;
   262 
   263 }
   264 
   265 static GstElement *
   266 _create_audio_bin(const gchar * encode, gchar ** encode_prop, gint rate)
   267 {
   268     GstElement     *abin = NULL;
   269     GstElement     *aqueue = NULL;
   270     GstElement     *aconvert = NULL;
   271     GstElement     *aencode = NULL;
   272     GstElement     *aqueue_src = NULL;
   273     GstPad         *apad = NULL;
   274 
   275     // audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
   276     // udpsink name=upd_audio host=224.0.0.1 port=5002
   277     abin = gst_bin_new("abin");
   278     aqueue = gst_element_factory_make("queue", "aqueue");
   279     aconvert = gst_element_factory_make("audioconvert", "aconvert");
   280     aencode =
   281         _create_element_with_prop((encode ? encode : "lame"), "aencode",
   282                                   encode_prop);
   283     aqueue_src = gst_element_factory_make("queue", "aqueue_src");
   284 
   285     if ((abin == NULL) || (aqueue == NULL) || (aconvert == NULL)
   286         || (aencode == NULL) || (aqueue_src == NULL)) {
   287         g_warning("Audio elements not found");
   288         goto error;
   289     }
   290 
   291     g_object_set(G_OBJECT(aencode), "bitrate", 32, NULL);
   292     /*
   293      * if (rate > 0) { g_object_set (G_OBJECT (aencode), "bitrate", 32,
   294      * NULL); } 
   295      */
   296 
   297     gst_bin_add_many(GST_BIN(abin), aqueue, aconvert, aencode, aqueue_src,
   298                      NULL);
   299     if (gst_element_link_many(aqueue, aconvert, aencode, aqueue_src, NULL)
   300         == FALSE) {
   301         g_warning("Not Link audio elements");
   302     }
   303     // TODO: apply audio rate
   304 
   305     // ghost pad the audio bin
   306     apad = gst_element_get_pad(aqueue, "sink");
   307     gst_element_add_pad(abin, gst_ghost_pad_new("sink", apad));
   308     gst_object_unref(apad);
   309 
   310     apad = gst_element_get_pad(aqueue_src, "src");
   311     gst_element_add_pad(abin, gst_ghost_pad_new("src", apad));
   312     gst_object_unref(apad);
   313 
   314     return abin;
   315   error:
   316     if (abin != NULL)
   317         gst_object_unref(abin);
   318 
   319     if (aqueue != NULL)
   320         gst_object_unref(aqueue);
   321 
   322     if (aconvert != NULL)
   323         gst_object_unref(aconvert);
   324 
   325     if (aencode != NULL)
   326         gst_object_unref(aencode);
   327 
   328     if (aqueue_src != NULL)
   329         gst_object_unref(aqueue_src);
   330 
   331     if (apad != NULL)
   332         gst_object_unref(apad);
   333 
   334     return NULL;
   335 }
   336 
   337 
   338 
   339 
   340 // queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! colorspace
   341 // ! rate ! encode ! queue
   342 static GstElement *
   343 _create_video_bin(const gchar * encode,
   344                   gchar ** encode_prop,
   345                   gdouble fps, gint rate, guint width, guint height)
   346 {
   347     GstElement     *vbin = NULL;
   348     GstElement     *vqueue = NULL;
   349     GstElement     *vqueue_src = NULL;
   350     GstElement     *vcolorspace = NULL;
   351     GstElement     *vencode = NULL;
   352     GstElement     *vrate = NULL;
   353     GstPad         *vpad = NULL;
   354 
   355     vbin = gst_bin_new("vbin");
   356     vqueue = gst_element_factory_make("queue", "vqueue");
   357     vcolorspace =
   358         gst_element_factory_make("ffmpegcolorspace", "colorspace");
   359 
   360     vencode = _create_element_with_prop((encode !=
   361                                          NULL ? encode :
   362                                          "ffenc_mpeg1video"), "vencode",
   363                                         encode_prop);
   364     vqueue_src = gst_element_factory_make("queue", "queue_src");
   365 
   366     if ((vbin == NULL) || (vqueue == NULL) || (vcolorspace == NULL)
   367         || (vencode == NULL) || (vqueue_src == NULL)) {
   368         g_warning("Video elements not found");
   369         goto error;
   370     }
   371 
   372     gst_bin_add_many(GST_BIN(vbin), vqueue, vcolorspace, vencode,
   373                      vqueue_src, NULL);
   374 
   375     if ((width > 0) && (height > 0)) {
   376         // Scalling video
   377         GstCaps        *vcaps;
   378         GstElement     *vscale =
   379             gst_element_factory_make("videoscale", "vscale");
   380 
   381         gst_bin_add(GST_BIN(vbin), vscale);
   382 
   383         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   384                                     "width", G_TYPE_INT, width,
   385                                     "height", G_TYPE_INT, height, NULL);
   386 
   387         gst_element_link(vqueue, vscale);
   388 
   389         if (gst_element_link_filtered(vscale, vcolorspace, vcaps) == FALSE) {
   390             g_warning("Fail to resize video");
   391             gst_object_unref(vcaps);
   392             gst_object_unref(vscale);
   393             goto error;
   394         }
   395         gst_caps_unref(vcaps);
   396     } else {
   397         gst_element_link(vqueue, vcolorspace);
   398     }
   399 
   400     if (fps > 0) {
   401         // Changing the video fps
   402         GstCaps        *vcaps;
   403         vrate = gst_element_factory_make("videorate", "vrate");
   404 
   405         gst_bin_add(GST_BIN(vbin), vrate);
   406 
   407         if (gst_element_link(vcolorspace, vrate) == FALSE) {
   408             g_warning("Fail to link video elements");
   409             goto error;
   410         }
   411 
   412         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   413                                     "framerate", GST_TYPE_FRACTION,
   414                                     (int) (fps * 1000), 1000, NULL);
   415 
   416         if (gst_element_link_filtered(vrate, vencode, vcaps) == FALSE) {
   417             g_warning("Fail to link vrate with vencode.");
   418             goto error;
   419         }
   420         gst_caps_unref(vcaps);
   421     } else {
   422         if (gst_element_link(vcolorspace, vencode) == FALSE) {
   423             g_warning("Fail to link colorspace and video encode element.");
   424             goto error;
   425         }
   426     }
   427 
   428     gst_element_link(vencode, vqueue_src);
   429 
   430     // ghost pad the video bin
   431     vpad = gst_element_get_pad(vqueue, "sink");
   432     gst_element_add_pad(vbin, gst_ghost_pad_new("sink", vpad));
   433     gst_object_unref(vpad);
   434 
   435     vpad = gst_element_get_pad(vqueue_src, "src");
   436     gst_element_add_pad(vbin, gst_ghost_pad_new("src", vpad));
   437     gst_object_unref(vpad);
   438 
   439     return vbin;
   440 
   441   error:
   442     if (vpad != NULL)
   443         gst_object_unref(vpad);
   444 
   445     if (vbin != NULL)
   446         gst_object_unref(vbin);
   447 
   448     if (vqueue != NULL)
   449         gst_object_unref(vqueue);
   450 
   451     if (vencode != NULL)
   452         gst_object_unref(vencode);
   453 
   454     if (vqueue_src != NULL)
   455         gst_object_unref(vqueue_src);
   456 
   457     if (vcolorspace != NULL)
   458         gst_object_unref(vcolorspace);
   459 
   460     return NULL;
   461 }
   462 
   463 
   464 
   465 gboolean 
   466 g_mencoder_setup_stream(GMencoder * self,
   467                         const gchar * mux_name,
   468                         const gchar * video_encode,
   469                         gchar ** video_encode_prop,
   470                         gdouble video_fps,
   471                         gdouble video_rate,
   472                         guint video_width,
   473                         guint video_height,
   474                         const gchar * audio_encode,
   475                         gchar ** audio_encode_prop,
   476                         guint audio_rate, const gchar * out_uri)
   477 {
   478     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   479     if (priv->ready == TRUE) {
   480         g_warning
   481             ("Stream already configured. You need close stream first.");
   482         return FALSE;
   483     }
   484 
   485     _close_output(self);
   486     if (_open_output(self, out_uri) == FALSE)
   487         return FALSE;
   488 
   489     priv->sources = 0;
   490     priv->pipe = _create_pipeline(self,
   491                                   video_encode,
   492                                   mux_name,
   493                                   video_encode_prop,
   494                                   video_fps,
   495                                   video_rate,
   496                                   video_width,
   497                                   video_height,
   498                                   audio_encode, audio_encode_prop,
   499                                   audio_rate);
   500 
   501     return (priv->pipe != NULL);
   502 }
   503 
   504 
   505 gboolean
   506 g_mencoder_append_uri(GMencoder * self, const gchar * uri)
   507 {
   508     GstPad         *pad_src;
   509     GstPad         *pad_sink;
   510     GstElement     *src;
   511     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   512     gboolean        ret = FALSE;
   513     GstElement     *ap = NULL;
   514     GstElement     *vp = NULL;
   515 
   516 
   517     g_return_val_if_fail(priv->pipe != NULL, FALSE);
   518     g_return_val_if_fail(priv->ready == FALSE, FALSE);
   519 
   520 #ifndef SUPPORT_MULT_INPUT
   521     g_return_val_if_fail(priv->sources < 1, FALSE);
   522 #endif
   523 
   524     src = _create_source(uri);
   525     if (src == NULL)
   526         return FALSE;
   527 
   528     priv->src = gst_bin_get_by_name(GST_BIN(src), "src");
   529 
   530     gst_bin_add(GST_BIN(priv->pipe), src);
   531 
   532 #ifdef SUPPORT_MULT_INPUT
   533     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "ap");
   534     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vp");
   535 #else
   536     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "abin");
   537     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vbin");
   538 #endif
   539 
   540     if ((vp == NULL) || (ap == NULL)) {
   541         g_warning("Fail to get output bin");
   542         goto error;
   543     }
   544 
   545     pad_src = gst_element_get_pad(src, "src_audio");
   546     pad_sink = gst_element_get_compatible_pad(ap,
   547                                               pad_src,
   548                                               gst_pad_get_caps(pad_src));
   549 
   550     if ((pad_sink == NULL) || (pad_src == NULL))
   551         goto error;
   552 
   553     GstPadLinkReturn lret = gst_pad_link(pad_src, pad_sink);
   554     if (lret != GST_PAD_LINK_OK)
   555         goto error;
   556 
   557     gst_object_unref(pad_src);
   558     gst_object_unref(pad_sink);
   559 
   560     pad_src = gst_element_get_pad(src, "src_video");
   561     pad_sink = gst_element_get_compatible_pad(vp,
   562                                               pad_src,
   563                                               gst_pad_get_caps(pad_src));
   564 
   565     if ((pad_src == NULL) || (pad_sink == NULL))
   566         goto error;
   567 
   568     if (gst_pad_link(pad_src, pad_sink) != GST_PAD_LINK_OK) {
   569         g_warning("invalid source. video");
   570         goto error;
   571     }
   572 
   573     priv->sources++;
   574     ret = TRUE;
   575   error:
   576 
   577     if ((src != NULL) && (ret == FALSE)) {
   578         gst_bin_remove(GST_BIN(priv->pipe), src);
   579         gst_object_unref(src);
   580     }
   581 
   582     if (ap != NULL)
   583         gst_object_unref(ap);
   584 
   585     if (vp != NULL)
   586         gst_object_unref(vp);
   587 
   588     if (pad_src != NULL)
   589         gst_object_unref(pad_src);
   590 
   591     if (pad_sink != NULL)
   592         gst_object_unref(pad_sink);
   593 
   594     return ret;
   595 }
   596 
   597 
   598 
   599 void
   600 g_mencoder_remove_uri(GMencoder * self, const gchar * uri)
   601 {
   602     // GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE (self);
   603     // TODO: remove src
   604 }
   605 
   606 void
   607 g_mencoder_play_stream(GMencoder * self)
   608 {
   609     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   610     g_return_if_fail(priv->ready == FALSE);
   611     priv->ready = TRUE;
   612     gst_element_set_state(priv->pipe, GST_STATE_PLAYING);
   613     priv->tick_id = g_timeout_add(500, _tick_cb, self);
   614 }
   615 
   616 void
   617 g_mencoder_pause_stream(GMencoder * self)
   618 {
   619     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   620     g_return_if_fail(priv->ready == TRUE);
   621     gst_element_set_state(priv->pipe, GST_STATE_PAUSED);
   622 }
   623 
   624 void
   625 g_mencoder_close_stream(GMencoder * self)
   626 {
   627 
   628     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   629     if (priv->tick_id != 0) {
   630         g_source_remove(priv->tick_id);
   631         priv->tick_id = 0;
   632     }
   633 
   634     if (priv->pipe != NULL) {
   635         // TODO: fixe pipeline dispose
   636         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   637         // g_debug ("SETING STATE TO NULL: OK");
   638         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   639         // gst_object_unref (priv->pipe);
   640         gst_object_unref(priv->src);
   641         priv->src = NULL;
   642         priv->pipe = NULL;
   643         priv->abin = NULL;
   644         priv->vbin = NULL;
   645         priv->sink = NULL;
   646     }
   647     priv->ready = FALSE;
   648 }
   649 
   650 static GstElement *
   651 _create_pipeline(GMencoder * self,
   652                  const gchar * video_encode,
   653                  const gchar * mux_name,
   654                  gchar ** video_encode_prop,
   655                  gdouble video_fps,
   656                  gdouble video_rate,
   657                  guint video_width,
   658                  guint video_height,
   659                  const gchar * audio_encode,
   660                  gchar ** audio_encode_prop, guint audio_rate)
   661 {
   662     GstBus         *bus = NULL;
   663     GstElement     *pipe = NULL;
   664     GstElement     *sink = NULL;
   665     GstElement     *mux = NULL;
   666     GstElement     *abin = NULL;
   667     GstElement     *vbin = NULL;
   668     GstElement     *queue = NULL;
   669     GstPad         *aux_pad = NULL;
   670     GstPad         *mux_pad = NULL;
   671 #ifdef SUPPORT_MULT_INPUT
   672     GstElement     *ap = NULL;
   673     GstElement     *vp = NULL;
   674 #endif
   675 
   676     pipe = gst_pipeline_new("pipe");
   677 
   678 #ifdef SUPPORT_MULT_INPUT
   679     ap = gst_element_factory_make("concatmux", "ap");
   680     vp = gst_element_factory_make("concatmux", "vp");
   681     gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL);
   682 #endif
   683     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   684 
   685     mux =
   686         gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"),
   687                                  "mux");
   688     if (mux == NULL)
   689         goto error;
   690 
   691     queue = gst_element_factory_make("queue", "queueu_sink");
   692 
   693 
   694 #ifdef USE_MANUAL_SINK
   695     sink = gst_element_factory_make("fakesink", "sink");
   696     g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
   697     g_signal_connect (G_OBJECT (sink),
   698                       "handoff",
   699                       G_CALLBACK (_buffer_arrive_cb),
   700                       self);
   701 #else    
   702     sink = gst_element_factory_make("fdsink", "sink");
   703     if (sink == NULL)
   704         goto error;
   705 
   706     g_object_set(G_OBJECT(sink), "fd", priv->fd, "sync", FALSE, NULL);
   707 #endif
   708 
   709     abin = _create_audio_bin(audio_encode, audio_encode_prop, audio_rate);
   710     if (abin == NULL)
   711         goto error;
   712 
   713     vbin =
   714         _create_video_bin(video_encode, video_encode_prop, video_fps,
   715                           video_rate, video_width, video_height);
   716     if (vbin == NULL)
   717         goto error;
   718 
   719     // Finish Pipe
   720     gst_bin_add_many(GST_BIN(pipe), abin, vbin, mux, queue, sink, NULL);
   721 
   722 
   723 #ifdef SUPPORT_MULT_INPUT
   724     if (gst_element_link(ap, abin) == FALSE) {
   725         g_warning("Fail to link concat and abin");
   726         goto error;
   727     }
   728 
   729     if (gst_element_link(vp, vbin) == FALSE) {
   730         g_warning("Fail to link concat and vbin");
   731     }
   732 #endif
   733 
   734     // Link bins with mux
   735     aux_pad = gst_element_get_pad(abin, "src");
   736     mux_pad =
   737         gst_element_get_compatible_pad(mux, aux_pad,
   738                                        GST_PAD_CAPS(aux_pad));
   739     if (mux_pad == NULL) {
   740         g_warning("Mux element no have audio PAD");
   741         goto error;
   742     }
   743     GstPadLinkReturn ret = gst_pad_link(aux_pad, mux_pad);
   744     if (ret != GST_PAD_LINK_OK) {
   745         g_warning("Fail link audio and mux: %d", ret);
   746         goto error;
   747 
   748     }
   749     gst_object_unref(aux_pad);
   750     gst_object_unref(mux_pad);
   751 
   752     aux_pad = gst_element_get_pad(vbin, "src");
   753     mux_pad =
   754         gst_element_get_compatible_pad(mux, aux_pad,
   755                                        GST_PAD_CAPS(aux_pad));
   756     if (mux_pad == NULL) {
   757         g_warning("Mux element no have video PAD");
   758         goto error;
   759     }
   760     ret = gst_pad_link(aux_pad, mux_pad);
   761     if (ret != GST_PAD_LINK_OK) {
   762         g_warning("Fail link video and mux: %d", ret);
   763         goto error;
   764     }
   765     gst_object_unref(aux_pad);
   766     gst_object_unref(mux_pad);
   767     aux_pad = NULL;
   768     mux_pad = NULL;
   769 
   770     // Link mux with sink
   771     gst_element_link_many(mux, queue, sink, NULL);
   772 
   773     bus = gst_pipeline_get_bus(GST_PIPELINE(pipe));
   774     gst_bus_add_watch(bus, _pipeline_bus_cb, self);
   775     gst_object_unref(bus);
   776     return pipe;
   777 
   778   error:
   779     g_warning("Invalid uri");
   780 
   781     if (pipe != NULL) {
   782         gst_object_unref(pipe);
   783     }
   784 
   785 
   786     if (mux != NULL) {
   787         gst_object_unref(mux);
   788     }
   789 
   790     if (mux_pad != NULL) {
   791         gst_object_unref(mux_pad);
   792     }
   793 
   794     if (aux_pad != NULL) {
   795         gst_object_unref(mux_pad);
   796     }
   797 
   798     if (sink != NULL) {
   799         gst_object_unref(sink);
   800     }
   801 
   802     if (abin != NULL) {
   803         gst_object_unref(abin);
   804     }
   805 
   806     if (vbin != NULL) {
   807         gst_object_unref(vbin);
   808     }
   809 
   810     return FALSE;
   811 }
   812 
   813 
   814 static void
   815 _close_output(GMencoder * self)
   816 {
   817 }
   818 
   819 static GstElement *
   820 _create_source(const gchar * uri)
   821 {
   822 
   823     GstElement     *bsrc = NULL;
   824     GstElement     *src = NULL;
   825     GstElement     *queue = NULL;
   826     GstElement     *aqueue = NULL;
   827     GstElement     *vqueue = NULL;
   828     GstElement     *decode = NULL;
   829     GstPad         *src_pad = NULL;
   830 
   831 
   832     bsrc = gst_bin_new(NULL);
   833 
   834     // src = gst_element_factory_make ("gnomevfssrc", "src");
   835     // g_object_set (G_OBJECT (src), "location", uri, NULL);
   836     src = gst_element_make_from_uri(GST_URI_SRC, uri, "src");
   837     if (src == NULL)
   838         goto error;
   839 
   840     decode = gst_element_factory_make("decodebin", "decode");
   841     if (decode == NULL)
   842         goto error;
   843 
   844     queue = gst_element_factory_make("queue", "queue_src");
   845     aqueue = gst_element_factory_make("queue", "aqueue");
   846     if (aqueue == NULL)
   847         goto error;
   848 
   849     vqueue = gst_element_factory_make("queue", "vqueue");
   850     if (vqueue == NULL)
   851         goto error;
   852 
   853     gst_bin_add_many(GST_BIN(bsrc), src, queue, decode, aqueue, vqueue,
   854                      NULL);
   855     gst_element_link_many(src, queue, decode, NULL);
   856 
   857     g_signal_connect(G_OBJECT(decode),
   858                      "new-decoded-pad",
   859                      G_CALLBACK(_decodebin_new_pad_cb), bsrc);
   860 
   861     g_signal_connect(G_OBJECT(decode),
   862                      "unknown-type",
   863                      G_CALLBACK(_decodebin_unknown_type_cb), pipe);
   864 
   865     src_pad = gst_element_get_pad(aqueue, "src");
   866     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_audio", src_pad));
   867     gst_object_unref(src_pad);
   868 
   869     src_pad = gst_element_get_pad(vqueue, "src");
   870     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_video", src_pad));
   871     gst_object_unref(src_pad);
   872 
   873     return bsrc;
   874 
   875   error:
   876     if (src != NULL) {
   877         gst_object_unref(src);
   878     }
   879 
   880     if (decode != NULL) {
   881         gst_object_unref(decode);
   882     }
   883 
   884     if (aqueue != NULL) {
   885         gst_object_unref(aqueue);
   886     }
   887 
   888     if (vqueue != NULL) {
   889         gst_object_unref(vqueue);
   890     }
   891 
   892     return NULL;
   893 }
   894 
   895 static gboolean
   896 _open_output(GMencoder * self, const gchar * uri)
   897 {
   898     gboolean ret = TRUE;
   899     gchar         **i;
   900     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   901 
   902     i = g_strsplit(uri, "://", 0);
   903     if (strcmp(i[0], "fd") == 0) {
   904         priv->fd = atoi(i[1]);
   905 	fcntl (priv->fd, F_SETFL, O_ASYNC);
   906     } else if (strcmp(i[0], "file") == 0) {
   907         if (g_file_test (i[1], G_FILE_TEST_EXISTS)) {
   908             if (unlink (i[1]) != 0) {
   909                 g_warning ("Fail to write in : %s", uri);
   910                 ret = FALSE;
   911                 goto done;
   912             }
   913         }
   914         priv->fd = open(i[1], O_WRONLY | O_CREAT | O_TRUNC,
   915                         S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
   916 
   917         if (priv->fd == -1) {
   918             g_warning ("Fail to open : %s", uri);
   919             ret = FALSE;
   920         }
   921     } else {
   922         g_warning("Output uri not supported");
   923         ret = FALSE;
   924     }
   925 
   926 done:
   927     g_strfreev(i);
   928     return ret;
   929 }
   930 
   931 static          gboolean
   932 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data)
   933 {
   934     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
   935 
   936     switch (GST_MESSAGE_TYPE(msg)) {
   937 
   938     case GST_MESSAGE_STATE_CHANGED:
   939         {
   940             GstState        oldstate;
   941             GstState        newstate;
   942             GstState        pendingstate;
   943 
   944 
   945             gst_message_parse_state_changed(msg, &oldstate,
   946                                             &newstate, &pendingstate);
   947 
   948             if (pendingstate != GST_STATE_VOID_PENDING)
   949                 break;
   950 
   951             if ((oldstate == GST_STATE_READY)
   952                 && (newstate == GST_STATE_PAUSED)) {
   953                 if (priv->ready)
   954                     g_signal_emit(user_data, g_mencoder_signals[PAUSED],
   955                                   0);
   956             } else if ((oldstate == GST_STATE_PAUSED)
   957                        && (newstate == GST_STATE_PLAYING)) {
   958                 g_signal_emit(user_data, g_mencoder_signals[PLAYING], 0);
   959             } else if ((oldstate == GST_STATE_READY) &&
   960                        (newstate == GST_STATE_NULL)) {
   961                 g_signal_emit(user_data, g_mencoder_signals[STOPED], 0);
   962             }
   963             break;
   964         }
   965 
   966     case GST_MESSAGE_ERROR:
   967         {
   968             GError         *error;
   969             gchar          *debug;
   970             gchar          *err_str;
   971 
   972             if (priv->tick_id != 0) {
   973                 g_source_remove(priv->tick_id);
   974                 priv->tick_id = 0;
   975             }
   976 
   977             gst_message_parse_error(msg, &error, &debug);
   978             err_str = g_strdup_printf("Error [%d] %s (%s)", error->code,
   979                                       error->message, debug);
   980             priv->ready = FALSE;
   981             g_signal_emit(user_data, g_mencoder_signals[ERROR], 0,
   982                           err_str);
   983             g_free(err_str);
   984             g_clear_error(&error);
   985             g_free(debug);
   986             break;
   987         }
   988 
   989     case GST_MESSAGE_EOS:
   990         priv->ready = FALSE;
   991 #ifdef USE_MANUAL_SINK
   992 	_flush_queue (G_MENCODER (user_data));
   993 #endif
   994         g_signal_emit(user_data, g_mencoder_signals[EOS], 0);
   995         break;
   996 
   997     case GST_MESSAGE_DURATION:
   998         {
   999             GstFormat       format;
  1000             gint64          duration;
  1001             gst_message_parse_duration(msg, &format, &duration);
  1002             if (format == GST_FORMAT_BYTES)
  1003                 priv->duration = duration;
  1004             break;
  1005         }
  1006     default:
  1007         {
  1008             break;
  1009         }
  1010     }
  1011     return TRUE;
  1012 }
  1013 
  1014 
  1015 
  1016 static void
  1017 _decodebin_new_pad_cb(GstElement * object,
  1018                       GstPad * pad, gboolean flag, gpointer user_data)
  1019 {
  1020     GstCaps        *caps;
  1021     gchar          *str_caps = NULL;
  1022     GstElement     *sink_element;
  1023     GstPad         *sink_pad;
  1024 
  1025     caps = gst_pad_get_caps(pad);
  1026     str_caps = gst_caps_to_string(caps);
  1027     if (strstr(str_caps, "audio") != NULL) {
  1028         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "aqueue");
  1029     } else if (strstr(str_caps, "video") != NULL) {
  1030         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "vqueue");
  1031     } else {
  1032         g_warning("invalid caps %s", str_caps);
  1033     }
  1034 
  1035     sink_pad = gst_element_get_pad(sink_element, "sink");
  1036     gst_pad_link(pad, sink_pad);
  1037 
  1038     gst_object_unref(sink_element);
  1039     gst_object_unref(sink_pad);
  1040     g_free(str_caps);
  1041     gst_caps_unref(caps);
  1042 }
  1043 
  1044 static void
  1045 _decodebin_unknown_type_cb(GstElement * object,
  1046                            GstPad * pad, GstCaps * caps,
  1047                            gpointer user_data)
  1048 {
  1049     g_warning("Unknown Type");
  1050     // priv->ready = FALSE;
  1051 }
  1052 
  1053 static          gboolean
  1054 _tick_cb(gpointer user_data)
  1055 {
  1056     GstFormat       format = GST_FORMAT_BYTES;
  1057     gint64          cur = 0;
  1058 
  1059     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1060 
  1061     if (priv->duration == 0) {
  1062         gint64          d = 0;
  1063         if (gst_element_query_duration(priv->src, &format, &d))
  1064             priv->duration = d;
  1065     }
  1066 
  1067     if (priv->duration != 0) {
  1068         gst_element_query_position(priv->src, &format, &cur);
  1069         g_print("PROGRESS:%lli\n", (99 * cur) / priv->duration);
  1070     }
  1071 
  1072     return TRUE;
  1073 }
  1074 
  1075 
  1076 #ifdef USE_MANUAL_SINK
  1077 static gboolean
  1078 _send_buffer (gint fd, gpointer buff, gint size)
  1079 {
  1080     gboolean ret = TRUE;
  1081     gchar *msg;
  1082     GByteArray *b_send;
  1083 
  1084     b_send = g_byte_array_new ();
  1085     msg = g_strdup_printf ("%x\r\n", size);
  1086     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1087     g_free (msg);
  1088 
  1089     b_send = g_byte_array_append (b_send, buff, size);
  1090 
  1091     msg = g_strdup ("\r\n");
  1092     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1093     g_free (msg);
  1094 
  1095     if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0) 
  1096         ret = FALSE;
  1097     g_byte_array_free (b_send, TRUE);
  1098 
  1099     return ret;
  1100 }
  1101 
  1102 static void
  1103 _flush_queue (GMencoder *self)
  1104 {
  1105     gchar *end_msg;
  1106     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
  1107 
  1108     if (BUFFER_SIZE == 0)
  1109         return;
  1110 
  1111     if (priv->queue->len > 0) {
  1112         _send_buffer (priv->fd, priv->queue->data, priv->queue->len);
  1113         priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1114     }
  1115 
  1116     end_msg = g_strdup ("0\r\n\r\n");
  1117     write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar));
  1118     g_free (end_msg);
  1119     return;
  1120 }
  1121 static void 
  1122 _buffer_arrive_cb (GstElement* object,
  1123                    GstBuffer* buff,
  1124                    GstPad* pad,
  1125                    gpointer user_data)
  1126 {
  1127     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1128 
  1129     if (BUFFER_SIZE == 0) {
  1130 	if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE)
  1131              goto error;
  1132         return;
  1133     }
  1134 
  1135     priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
  1136     while (priv->queue->len >= BUFFER_SIZE) {
  1137     	//g_usleep (0.2 * G_USEC_PER_SEC);	
  1138 	if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE)
  1139             goto error;
  1140         priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE);
  1141     }
  1142     return;
  1143 
  1144 error:
  1145     if (priv->tick_id != 0) {
  1146         g_source_remove(priv->tick_id);
  1147         priv->tick_id = 0;
  1148     }
  1149     priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1150     g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket");
  1151 }
  1152 #endif 
  1153