gmyth-stream/gmemcoder/src/gmencoder.c
author renatofilho
Tue Jul 03 21:26:55 2007 +0100 (2007-07-03)
branchtrunk
changeset 770 800e6485ad15
parent 762 1be4b68bfb0e
child 772 47de5c5976bf
permissions -rw-r--r--
[svn r776] create function in backend info for otimize socket usage
     1 #ifdef HAVE_CONFIG_H
     2 #include "config.h"
     3 #endif
     4 
     5 #include <sys/stat.h>
     6 #include <fcntl.h>
     7 #include <unistd.h>
     8 #include <glib.h>
     9 #include <gst/gst.h>
    10 #include <string.h>
    11 #include <sys/types.h>
    12 #include <sys/socket.h>
    13 
    14 #include "gmencoder.h"
    15 
    16 #define G_MENCODER_GET_PRIVATE(obj) \
    17     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), G_TYPE_MENCODER, GMencoderPrivate))
    18 
    19 // #define SUPPORT_MULT_INPUT 0
    20 #define USE_MANUAL_SINK
    21 #define BUFFER_SIZE (1024 * 64)
    22 
    23 typedef struct _GMencoderPrivate GMencoderPrivate;
    24 typedef struct _SetupInfo SetupInfo;
    25 
    26 struct _SetupInfo {
    27     gchar          *video_encode;
    28     gchar          *mux_name;
    29     gchar         **video_encode_prop;
    30     gdouble         video_fps;
    31     gdouble         video_rate;
    32     guint           video_width;
    33     guint           video_height;
    34     gchar          *audio_encode;
    35     gchar         **audio_encode_prop;
    36     guint           audio_rate;
    37 };
    38 
    39 
    40 struct _GMencoderPrivate {
    41     GstElement     *pipe;
    42     GstElement     *abin;
    43     GstElement     *vbin;
    44     GstElement     *sink;
    45     GstElement     *src;
    46     gboolean        ready;
    47     SetupInfo      *info;
    48     GstClockTime    videot;
    49     GstClockTime    audiot;
    50     gint            fd;
    51     gint            sources;
    52     gint            tick_id;
    53     gint64          duration;
    54     gboolean        send_chunked;
    55 #ifdef USE_MANUAL_SINK
    56     GByteArray	    *queue;
    57 #endif
    58 };
    59 
    60 enum {
    61     PAUSED,
    62     PLAYING,
    63     STOPED,
    64     EOS,
    65     ERROR,
    66     LAST_SIGNAL
    67 };
    68 
    69 static void     g_mencoder_class_init(GMencoderClass * klass);
    70 static void     g_mencoder_init(GMencoder * object);
    71 static void     g_mencoder_dispose(GObject * object);
    72 static void     g_mencoder_finalize(GObject * object);
    73 static GstElement *_create_audio_bin(const gchar * encode,
    74                                      gchar ** encode_prop, gint rate);
    75 static GstElement *_create_video_bin(const gchar * encode,
    76                                      gchar ** encode_prop,
    77                                      gdouble fps,
    78                                      gint rate, guint width, guint height);
    79 
    80 static          gboolean
    81 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data);
    82 
    83 static void     _decodebin_new_pad_cb(GstElement * object,
    84                                       GstPad * pad,
    85                                       gboolean flag, gpointer user_data);
    86 
    87 static void     _decodebin_unknown_type_cb(GstElement * object,
    88                                            GstPad * pad,
    89                                            GstCaps * caps,
    90                                            gpointer user_data);
    91 
    92 static void     _close_output(GMencoder * self);
    93 static gboolean _open_output(GMencoder * self, const gchar * uri);
    94 
    95 static GstElement *_create_source(const gchar * uri);
    96 static GstElement *_create_pipeline(GMencoder * self,
    97                                     const gchar * video_encode,
    98                                     const gchar * mux_name,
    99                                     gchar ** video_encode_prop,
   100                                     gdouble video_fps,
   101                                     gdouble video_rate,
   102                                     guint video_width,
   103                                     guint video_height,
   104                                     const gchar * audio_encode,
   105                                     gchar ** audio_encode_prop,
   106                                     guint audio_rate);
   107 #ifdef USE_MANUAL_SINK
   108 static void _flush_queue 	    (GMencoder *self);
   109 static void _buffer_arrive_cb       (GstElement* object,
   110                                      GstBuffer* buff,
   111                                      GstPad* pad,
   112                                      gpointer user_data);
   113 #endif
   114 
   115 
   116 static gboolean _tick_cb(gpointer data);
   117 
   118 static guint    g_mencoder_signals[LAST_SIGNAL] = { 0 };
   119 
   120 G_DEFINE_TYPE(GMencoder, g_mencoder, G_TYPE_OBJECT)
   121 
   122 static void     g_mencoder_class_init(GMencoderClass * klass)
   123 {
   124     GObjectClass   *object_class;
   125     object_class = (GObjectClass *) klass;
   126     g_type_class_add_private(klass, sizeof(GMencoderPrivate));
   127 
   128     object_class->dispose = g_mencoder_dispose;
   129     object_class->finalize = g_mencoder_finalize;
   130 
   131     g_mencoder_signals[PAUSED] =
   132         g_signal_new("paused",
   133                      G_OBJECT_CLASS_TYPE(object_class),
   134                      G_SIGNAL_RUN_FIRST,
   135                      0, NULL, NULL,
   136                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   137 
   138     g_mencoder_signals[PLAYING] =
   139         g_signal_new("playing",
   140                      G_OBJECT_CLASS_TYPE(object_class),
   141                      G_SIGNAL_RUN_FIRST,
   142                      0, NULL, NULL,
   143                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   144 
   145     g_mencoder_signals[STOPED] =
   146         g_signal_new("stoped",
   147                      G_OBJECT_CLASS_TYPE(object_class),
   148                      G_SIGNAL_RUN_FIRST,
   149                      0, NULL, NULL,
   150                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   151 
   152     g_mencoder_signals[EOS] =
   153         g_signal_new("eos",
   154                      G_OBJECT_CLASS_TYPE(object_class),
   155                      G_SIGNAL_RUN_FIRST,
   156                      0, NULL, NULL,
   157                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   158 
   159     g_mencoder_signals[ERROR] =
   160         g_signal_new("error",
   161                      G_OBJECT_CLASS_TYPE(object_class),
   162                      G_SIGNAL_RUN_LAST,
   163                      0, NULL, NULL,
   164                      g_cclosure_marshal_VOID__STRING,
   165                      G_TYPE_NONE, 1, G_TYPE_STRING);
   166 }
   167 
   168 static void
   169 g_mencoder_init(GMencoder * self)
   170 {
   171     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   172     priv->info = g_new0(SetupInfo, 1);
   173 #ifdef USE_MANUAL_SINK
   174     priv->queue = g_byte_array_new ();
   175 #endif
   176 }
   177 
   178 static void
   179 g_mencoder_dispose(GObject * object)
   180 {
   181 }
   182 
   183 static void
   184 g_mencoder_finalize(GObject * object)
   185 {
   186     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object);
   187 
   188     // TODO: clear vars
   189     g_mencoder_close_stream(G_MENCODER(object));
   190     g_free (priv->info);
   191 #ifdef USE_MANUAL_SINK
   192     g_byte_array_free (priv->queue, TRUE);
   193 #endif
   194 }
   195 
   196 GMencoder      *
   197 g_mencoder_new(void)
   198 {
   199     return g_object_new(G_TYPE_MENCODER, NULL);
   200 }
   201 
   202 
   203 static void
   204 _obj_set_prop(GObject * obj, const gchar * prop_name,
   205               const gchar * prop_val)
   206 {
   207     GValue          p = { 0 };
   208     GValue          v = { 0 };
   209     GParamSpec     *s = NULL;
   210     GObjectClass   *k = G_OBJECT_GET_CLASS(obj);
   211 
   212 
   213     g_value_init(&v, G_TYPE_STRING);
   214     g_value_set_string(&v, prop_val);
   215 
   216     s = g_object_class_find_property(k, prop_name);
   217     if (s == NULL) {
   218         g_print("Invalid property name: %s\n", prop_name);
   219         return;
   220     }
   221 
   222     g_value_init(&p, s->value_type);
   223     switch (s->value_type) {
   224     case G_TYPE_INT:
   225         g_value_set_int(&p, atoi(prop_val));
   226         break;
   227     case G_TYPE_STRING:
   228         g_value_set_string(&p, prop_val);
   229         break;
   230     default:
   231         return;
   232     }
   233 
   234     g_object_set_property(obj, prop_name, &p);
   235     g_value_unset(&v);
   236     g_value_unset(&p);
   237 }
   238 
   239 static GstElement *
   240 _create_element_with_prop(const gchar * factory_name,
   241                           const gchar * element_name, gchar ** prop)
   242 {
   243     GstElement     *ret;
   244     int             i;
   245 
   246     ret = gst_element_factory_make(factory_name, element_name);
   247     if (ret == NULL)
   248         return NULL;
   249 
   250     if (prop != NULL) {
   251         for (i = 0; i < g_strv_length(prop); i++) {
   252             if (prop[i] != NULL) {
   253                 char          **v = g_strsplit(prop[i], "=", 2);
   254                 if (g_strv_length(v) == 2) {
   255                     _obj_set_prop(G_OBJECT(ret), v[0], v[1]);
   256                 }
   257                 g_strfreev(v);
   258             }
   259         }
   260     }
   261 
   262     return ret;
   263 
   264 }
   265 
   266 static GstElement *
   267 _create_audio_bin(const gchar * encode, gchar ** encode_prop, gint rate)
   268 {
   269     GstElement     *abin = NULL;
   270     GstElement     *aqueue = NULL;
   271     GstElement     *aconvert = NULL;
   272     GstElement     *aencode = NULL;
   273     GstElement     *aqueue_src = NULL;
   274     GstPad         *apad = NULL;
   275 
   276     // audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
   277     // udpsink name=upd_audio host=224.0.0.1 port=5002
   278     abin = gst_bin_new("abin");
   279     aqueue = gst_element_factory_make("queue", "aqueue");
   280     aconvert = gst_element_factory_make("audioconvert", "aconvert");
   281     aencode =
   282         _create_element_with_prop((encode ? encode : "lame"), "aencode",
   283                                   encode_prop);
   284     aqueue_src = gst_element_factory_make("queue", "aqueue_src");
   285 
   286     if ((abin == NULL) || (aqueue == NULL) || (aconvert == NULL)
   287         || (aencode == NULL) || (aqueue_src == NULL)) {
   288         g_warning("Audio elements not found");
   289         goto error;
   290     }
   291 
   292     g_object_set(G_OBJECT(aencode), "bitrate", 32, NULL);
   293     /*
   294      * if (rate > 0) { g_object_set (G_OBJECT (aencode), "bitrate", 32,
   295      * NULL); } 
   296      */
   297 
   298     gst_bin_add_many(GST_BIN(abin), aqueue, aconvert, aencode, aqueue_src,
   299                      NULL);
   300     if (gst_element_link_many(aqueue, aconvert, aencode, aqueue_src, NULL)
   301         == FALSE) {
   302         g_warning("Not Link audio elements");
   303     }
   304     // TODO: apply audio rate
   305 
   306     // ghost pad the audio bin
   307     apad = gst_element_get_pad(aqueue, "sink");
   308     gst_element_add_pad(abin, gst_ghost_pad_new("sink", apad));
   309     gst_object_unref(apad);
   310 
   311     apad = gst_element_get_pad(aqueue_src, "src");
   312     gst_element_add_pad(abin, gst_ghost_pad_new("src", apad));
   313     gst_object_unref(apad);
   314 
   315     return abin;
   316   error:
   317     if (abin != NULL)
   318         gst_object_unref(abin);
   319 
   320     if (aqueue != NULL)
   321         gst_object_unref(aqueue);
   322 
   323     if (aconvert != NULL)
   324         gst_object_unref(aconvert);
   325 
   326     if (aencode != NULL)
   327         gst_object_unref(aencode);
   328 
   329     if (aqueue_src != NULL)
   330         gst_object_unref(aqueue_src);
   331 
   332     if (apad != NULL)
   333         gst_object_unref(apad);
   334 
   335     return NULL;
   336 }
   337 
   338 
   339 
   340 
   341 // queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! colorspace
   342 // ! rate ! encode ! queue
   343 static GstElement *
   344 _create_video_bin(const gchar * encode,
   345                   gchar ** encode_prop,
   346                   gdouble fps, gint rate, guint width, guint height)
   347 {
   348     GstElement     *vbin = NULL;
   349     GstElement     *vqueue = NULL;
   350     GstElement     *vqueue_src = NULL;
   351     GstElement     *vcolorspace = NULL;
   352     GstElement     *vencode = NULL;
   353     GstElement     *vrate = NULL;
   354     GstPad         *vpad = NULL;
   355 
   356     vbin = gst_bin_new("vbin");
   357     vqueue = gst_element_factory_make("queue", "vqueue");
   358     vcolorspace =
   359         gst_element_factory_make("ffmpegcolorspace", "colorspace");
   360 
   361     vencode = _create_element_with_prop((encode !=
   362                                          NULL ? encode :
   363                                          "ffenc_mpeg1video"), "vencode",
   364                                         encode_prop);
   365     vqueue_src = gst_element_factory_make("queue", "queue_src");
   366 
   367     if ((vbin == NULL) || (vqueue == NULL) || (vcolorspace == NULL)
   368         || (vencode == NULL) || (vqueue_src == NULL)) {
   369         g_warning("Video elements not found");
   370         goto error;
   371     }
   372 
   373     gst_bin_add_many(GST_BIN(vbin), vqueue, vcolorspace, vencode,
   374                      vqueue_src, NULL);
   375 
   376     if ((width > 0) && (height > 0)) {
   377         // Scalling video
   378         GstCaps        *vcaps;
   379         GstElement     *vscale =
   380             gst_element_factory_make("videoscale", "vscale");
   381 
   382         gst_bin_add(GST_BIN(vbin), vscale);
   383 
   384         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   385                                     "width", G_TYPE_INT, width,
   386                                     "height", G_TYPE_INT, height, NULL);
   387 
   388         gst_element_link(vqueue, vscale);
   389 
   390         if (gst_element_link_filtered(vscale, vcolorspace, vcaps) == FALSE) {
   391             g_warning("Fail to resize video");
   392             gst_object_unref(vcaps);
   393             gst_object_unref(vscale);
   394             goto error;
   395         }
   396         gst_caps_unref(vcaps);
   397     } else {
   398         gst_element_link(vqueue, vcolorspace);
   399     }
   400 
   401     if (fps > 0) {
   402         // Changing the video fps
   403         GstCaps        *vcaps;
   404         vrate = gst_element_factory_make("videorate", "vrate");
   405 
   406         gst_bin_add(GST_BIN(vbin), vrate);
   407 
   408         if (gst_element_link(vcolorspace, vrate) == FALSE) {
   409             g_warning("Fail to link video elements");
   410             goto error;
   411         }
   412 
   413         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   414                                     "framerate", GST_TYPE_FRACTION,
   415                                     (int) (fps * 1000), 1000, NULL);
   416 
   417         if (gst_element_link_filtered(vrate, vencode, vcaps) == FALSE) {
   418             g_warning("Fail to link vrate with vencode.");
   419             goto error;
   420         }
   421         gst_caps_unref(vcaps);
   422     } else {
   423         if (gst_element_link(vcolorspace, vencode) == FALSE) {
   424             g_warning("Fail to link colorspace and video encode element.");
   425             goto error;
   426         }
   427     }
   428 
   429     gst_element_link(vencode, vqueue_src);
   430 
   431     // ghost pad the video bin
   432     vpad = gst_element_get_pad(vqueue, "sink");
   433     gst_element_add_pad(vbin, gst_ghost_pad_new("sink", vpad));
   434     gst_object_unref(vpad);
   435 
   436     vpad = gst_element_get_pad(vqueue_src, "src");
   437     gst_element_add_pad(vbin, gst_ghost_pad_new("src", vpad));
   438     gst_object_unref(vpad);
   439 
   440     return vbin;
   441 
   442   error:
   443     if (vpad != NULL)
   444         gst_object_unref(vpad);
   445 
   446     if (vbin != NULL)
   447         gst_object_unref(vbin);
   448 
   449     if (vqueue != NULL)
   450         gst_object_unref(vqueue);
   451 
   452     if (vencode != NULL)
   453         gst_object_unref(vencode);
   454 
   455     if (vqueue_src != NULL)
   456         gst_object_unref(vqueue_src);
   457 
   458     if (vcolorspace != NULL)
   459         gst_object_unref(vcolorspace);
   460 
   461     return NULL;
   462 }
   463 
   464 
   465 
   466 gboolean 
   467 g_mencoder_setup_stream(GMencoder * self,
   468 			gboolean chunked,
   469                         const gchar * mux_name,
   470                         const gchar * video_encode,
   471                         gchar ** video_encode_prop,
   472                         gdouble video_fps,
   473                         gdouble video_rate,
   474                         guint video_width,
   475                         guint video_height,
   476                         const gchar * audio_encode,
   477                         gchar ** audio_encode_prop,
   478                         guint audio_rate, const gchar * out_uri)
   479 {
   480     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   481     if (priv->ready == TRUE) {
   482         g_warning
   483             ("Stream already configured. You need close stream first.");
   484         return FALSE;
   485     }
   486 
   487     _close_output(self);
   488     if (_open_output(self, out_uri) == FALSE)
   489         return FALSE;
   490 
   491     priv->sources = 0;
   492     priv->send_chunked = chunked;
   493     priv->pipe = _create_pipeline(self,
   494                                   video_encode,
   495                                   mux_name,
   496                                   video_encode_prop,
   497                                   video_fps,
   498                                   video_rate,
   499                                   video_width,
   500                                   video_height,
   501                                   audio_encode, audio_encode_prop,
   502                                   audio_rate);
   503 
   504     return (priv->pipe != NULL);
   505 }
   506 
   507 
   508 gboolean
   509 g_mencoder_append_uri(GMencoder * self, const gchar * uri)
   510 {
   511     GstPad         *pad_src;
   512     GstPad         *pad_sink;
   513     GstElement     *src;
   514     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   515     gboolean        ret = FALSE;
   516     GstElement     *ap = NULL;
   517     GstElement     *vp = NULL;
   518 
   519 
   520     g_return_val_if_fail(priv->pipe != NULL, FALSE);
   521     g_return_val_if_fail(priv->ready == FALSE, FALSE);
   522 
   523 #ifndef SUPPORT_MULT_INPUT
   524     g_return_val_if_fail(priv->sources < 1, FALSE);
   525 #endif
   526 
   527     src = _create_source(uri);
   528     if (src == NULL)
   529         return FALSE;
   530 
   531     priv->src = gst_bin_get_by_name(GST_BIN(src), "src");
   532 
   533     gst_bin_add(GST_BIN(priv->pipe), src);
   534 
   535 #ifdef SUPPORT_MULT_INPUT
   536     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "ap");
   537     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vp");
   538 #else
   539     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "abin");
   540     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vbin");
   541 #endif
   542 
   543     if ((vp == NULL) || (ap == NULL)) {
   544         g_warning("Fail to get output bin");
   545         goto error;
   546     }
   547 
   548     pad_src = gst_element_get_pad(src, "src_audio");
   549     pad_sink = gst_element_get_compatible_pad(ap,
   550                                               pad_src,
   551                                               gst_pad_get_caps(pad_src));
   552 
   553     if ((pad_sink == NULL) || (pad_src == NULL))
   554         goto error;
   555 
   556     GstPadLinkReturn lret = gst_pad_link(pad_src, pad_sink);
   557     if (lret != GST_PAD_LINK_OK)
   558         goto error;
   559 
   560     gst_object_unref(pad_src);
   561     gst_object_unref(pad_sink);
   562 
   563     pad_src = gst_element_get_pad(src, "src_video");
   564     pad_sink = gst_element_get_compatible_pad(vp,
   565                                               pad_src,
   566                                               gst_pad_get_caps(pad_src));
   567 
   568     if ((pad_src == NULL) || (pad_sink == NULL))
   569         goto error;
   570 
   571     if (gst_pad_link(pad_src, pad_sink) != GST_PAD_LINK_OK) {
   572         g_warning("invalid source. video");
   573         goto error;
   574     }
   575 
   576     priv->sources++;
   577     ret = TRUE;
   578   error:
   579 
   580     if ((src != NULL) && (ret == FALSE)) {
   581         gst_bin_remove(GST_BIN(priv->pipe), src);
   582         gst_object_unref(src);
   583     }
   584 
   585     if (ap != NULL)
   586         gst_object_unref(ap);
   587 
   588     if (vp != NULL)
   589         gst_object_unref(vp);
   590 
   591     if (pad_src != NULL)
   592         gst_object_unref(pad_src);
   593 
   594     if (pad_sink != NULL)
   595         gst_object_unref(pad_sink);
   596 
   597     return ret;
   598 }
   599 
   600 
   601 
   602 void
   603 g_mencoder_remove_uri(GMencoder * self, const gchar * uri)
   604 {
   605     // GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE (self);
   606     // TODO: remove src
   607 }
   608 
   609 void
   610 g_mencoder_play_stream(GMencoder * self)
   611 {
   612     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   613     g_return_if_fail(priv->ready == FALSE);
   614     priv->ready = TRUE;
   615     gst_element_set_state(priv->pipe, GST_STATE_PLAYING);
   616     priv->tick_id = g_timeout_add(500, _tick_cb, self);
   617 }
   618 
   619 void
   620 g_mencoder_pause_stream(GMencoder * self)
   621 {
   622     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   623     g_return_if_fail(priv->ready == TRUE);
   624     gst_element_set_state(priv->pipe, GST_STATE_PAUSED);
   625 }
   626 
   627 void
   628 g_mencoder_close_stream(GMencoder * self)
   629 {
   630 
   631     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   632     if (priv->tick_id != 0) {
   633         g_source_remove(priv->tick_id);
   634         priv->tick_id = 0;
   635     }
   636 
   637     if (priv->pipe != NULL) {
   638         // TODO: fixe pipeline dispose
   639         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   640         // g_debug ("SETING STATE TO NULL: OK");
   641         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   642         // gst_object_unref (priv->pipe);
   643         gst_object_unref(priv->src);
   644         priv->src = NULL;
   645         priv->pipe = NULL;
   646         priv->abin = NULL;
   647         priv->vbin = NULL;
   648         priv->sink = NULL;
   649     }
   650     priv->ready = FALSE;
   651 }
   652 
   653 static GstElement *
   654 _create_pipeline(GMencoder * self,
   655                  const gchar * video_encode,
   656                  const gchar * mux_name,
   657                  gchar ** video_encode_prop,
   658                  gdouble video_fps,
   659                  gdouble video_rate,
   660                  guint video_width,
   661                  guint video_height,
   662                  const gchar * audio_encode,
   663                  gchar ** audio_encode_prop, guint audio_rate)
   664 {
   665     GstBus         *bus = NULL;
   666     GstElement     *pipe = NULL;
   667     GstElement     *sink = NULL;
   668     GstElement     *mux = NULL;
   669     GstElement     *abin = NULL;
   670     GstElement     *vbin = NULL;
   671     GstElement     *queue = NULL;
   672     GstPad         *aux_pad = NULL;
   673     GstPad         *mux_pad = NULL;
   674 #ifdef SUPPORT_MULT_INPUT
   675     GstElement     *ap = NULL;
   676     GstElement     *vp = NULL;
   677 #endif
   678 
   679     pipe = gst_pipeline_new("pipe");
   680 
   681 #ifdef SUPPORT_MULT_INPUT
   682     ap = gst_element_factory_make("concatmux", "ap");
   683     vp = gst_element_factory_make("concatmux", "vp");
   684     gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL);
   685 #endif
   686     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   687 
   688     mux =
   689         gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"),
   690                                  "mux");
   691     if (mux == NULL)
   692         goto error;
   693 
   694     queue = gst_element_factory_make("queue", "queueu_sink");
   695 
   696 
   697 #ifdef USE_MANUAL_SINK
   698     sink = gst_element_factory_make("fakesink", "sink");
   699     g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
   700     g_signal_connect (G_OBJECT (sink),
   701                       "handoff",
   702                       G_CALLBACK (_buffer_arrive_cb),
   703                       self);
   704 #else    
   705     sink = gst_element_factory_make("fdsink", "sink");
   706     if (sink == NULL)
   707         goto error;
   708 
   709     g_object_set(G_OBJECT(sink), "fd", priv->fd, "sync", FALSE, NULL);
   710 #endif
   711 
   712     abin = _create_audio_bin(audio_encode, audio_encode_prop, audio_rate);
   713     if (abin == NULL)
   714         goto error;
   715 
   716     vbin =
   717         _create_video_bin(video_encode, video_encode_prop, video_fps,
   718                           video_rate, video_width, video_height);
   719     if (vbin == NULL)
   720         goto error;
   721 
   722     // Finish Pipe
   723     gst_bin_add_many(GST_BIN(pipe), abin, vbin, mux, queue, sink, NULL);
   724 
   725 
   726 #ifdef SUPPORT_MULT_INPUT
   727     if (gst_element_link(ap, abin) == FALSE) {
   728         g_warning("Fail to link concat and abin");
   729         goto error;
   730     }
   731 
   732     if (gst_element_link(vp, vbin) == FALSE) {
   733         g_warning("Fail to link concat and vbin");
   734     }
   735 #endif
   736 
   737     // Link bins with mux
   738     aux_pad = gst_element_get_pad(abin, "src");
   739     mux_pad =
   740         gst_element_get_compatible_pad(mux, aux_pad,
   741                                        GST_PAD_CAPS(aux_pad));
   742     if (mux_pad == NULL) {
   743         g_warning("Mux element no have audio PAD");
   744         goto error;
   745     }
   746     GstPadLinkReturn ret = gst_pad_link(aux_pad, mux_pad);
   747     if (ret != GST_PAD_LINK_OK) {
   748         g_warning("Fail link audio and mux: %d", ret);
   749         goto error;
   750 
   751     }
   752     gst_object_unref(aux_pad);
   753     gst_object_unref(mux_pad);
   754 
   755     aux_pad = gst_element_get_pad(vbin, "src");
   756     mux_pad =
   757         gst_element_get_compatible_pad(mux, aux_pad,
   758                                        GST_PAD_CAPS(aux_pad));
   759     if (mux_pad == NULL) {
   760         g_warning("Mux element no have video PAD");
   761         goto error;
   762     }
   763     ret = gst_pad_link(aux_pad, mux_pad);
   764     if (ret != GST_PAD_LINK_OK) {
   765         g_warning("Fail link video and mux: %d", ret);
   766         goto error;
   767     }
   768     gst_object_unref(aux_pad);
   769     gst_object_unref(mux_pad);
   770     aux_pad = NULL;
   771     mux_pad = NULL;
   772 
   773     // Link mux with sink
   774     gst_element_link_many(mux, queue, sink, NULL);
   775 
   776     bus = gst_pipeline_get_bus(GST_PIPELINE(pipe));
   777     gst_bus_add_watch(bus, _pipeline_bus_cb, self);
   778     gst_object_unref(bus);
   779     return pipe;
   780 
   781   error:
   782     g_warning("Invalid uri");
   783 
   784     if (pipe != NULL) {
   785         gst_object_unref(pipe);
   786     }
   787 
   788 
   789     if (mux != NULL) {
   790         gst_object_unref(mux);
   791     }
   792 
   793     if (mux_pad != NULL) {
   794         gst_object_unref(mux_pad);
   795     }
   796 
   797     if (aux_pad != NULL) {
   798         gst_object_unref(mux_pad);
   799     }
   800 
   801     if (sink != NULL) {
   802         gst_object_unref(sink);
   803     }
   804 
   805     if (abin != NULL) {
   806         gst_object_unref(abin);
   807     }
   808 
   809     if (vbin != NULL) {
   810         gst_object_unref(vbin);
   811     }
   812 
   813     return FALSE;
   814 }
   815 
   816 
   817 static void
   818 _close_output(GMencoder * self)
   819 {
   820 }
   821 
   822 static GstElement *
   823 _create_source(const gchar * uri)
   824 {
   825 
   826     GstElement     *bsrc = NULL;
   827     GstElement     *src = NULL;
   828     GstElement     *queue = NULL;
   829     GstElement     *aqueue = NULL;
   830     GstElement     *vqueue = NULL;
   831     GstElement     *decode = NULL;
   832     GstPad         *src_pad = NULL;
   833 
   834 
   835     bsrc = gst_bin_new(NULL);
   836 
   837     // src = gst_element_factory_make ("gnomevfssrc", "src");
   838     // g_object_set (G_OBJECT (src), "location", uri, NULL);
   839     src = gst_element_make_from_uri(GST_URI_SRC, uri, "src");
   840     if (src == NULL)
   841         goto error;
   842 
   843     decode = gst_element_factory_make("decodebin", "decode");
   844     if (decode == NULL)
   845         goto error;
   846 
   847     queue = gst_element_factory_make("queue", "queue_src");
   848     aqueue = gst_element_factory_make("queue", "aqueue");
   849     if (aqueue == NULL)
   850         goto error;
   851 
   852     vqueue = gst_element_factory_make("queue", "vqueue");
   853     if (vqueue == NULL)
   854         goto error;
   855 
   856     gst_bin_add_many(GST_BIN(bsrc), src, queue, decode, aqueue, vqueue,
   857                      NULL);
   858     gst_element_link_many(src, queue, decode, NULL);
   859 
   860     g_signal_connect(G_OBJECT(decode),
   861                      "new-decoded-pad",
   862                      G_CALLBACK(_decodebin_new_pad_cb), bsrc);
   863 
   864     g_signal_connect(G_OBJECT(decode),
   865                      "unknown-type",
   866                      G_CALLBACK(_decodebin_unknown_type_cb), pipe);
   867 
   868     src_pad = gst_element_get_pad(aqueue, "src");
   869     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_audio", src_pad));
   870     gst_object_unref(src_pad);
   871 
   872     src_pad = gst_element_get_pad(vqueue, "src");
   873     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_video", src_pad));
   874     gst_object_unref(src_pad);
   875 
   876     return bsrc;
   877 
   878   error:
   879     if (src != NULL) {
   880         gst_object_unref(src);
   881     }
   882 
   883     if (decode != NULL) {
   884         gst_object_unref(decode);
   885     }
   886 
   887     if (aqueue != NULL) {
   888         gst_object_unref(aqueue);
   889     }
   890 
   891     if (vqueue != NULL) {
   892         gst_object_unref(vqueue);
   893     }
   894 
   895     return NULL;
   896 }
   897 
   898 static gboolean
   899 _open_output(GMencoder * self, const gchar * uri)
   900 {
   901     gboolean ret = TRUE;
   902     gchar         **i;
   903     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   904 
   905     i = g_strsplit(uri, "://", 0);
   906     if (strcmp(i[0], "fd") == 0) {
   907         priv->fd = atoi(i[1]);
   908 	if (priv->send_chunked)
   909 	    fcntl (priv->fd, F_SETFL, O_ASYNC);
   910     } else if (strcmp(i[0], "file") == 0) {
   911         if (g_file_test (i[1], G_FILE_TEST_EXISTS)) {
   912             if (unlink (i[1]) != 0) {
   913                 g_warning ("Fail to write in : %s", uri);
   914                 ret = FALSE;
   915                 goto done;
   916             }
   917         }
   918         priv->fd = open(i[1], O_WRONLY | O_CREAT | O_TRUNC,
   919                         S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
   920 
   921         if (priv->fd == -1) {
   922             g_warning ("Fail to open : %s", uri);
   923             ret = FALSE;
   924         }
   925     } else {
   926         g_warning("Output uri not supported");
   927         ret = FALSE;
   928     }
   929 
   930 done:
   931     g_strfreev(i);
   932     return ret;
   933 }
   934 
   935 static          gboolean
   936 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data)
   937 {
   938     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
   939 
   940     switch (GST_MESSAGE_TYPE(msg)) {
   941 
   942     case GST_MESSAGE_STATE_CHANGED:
   943         {
   944             GstState        oldstate;
   945             GstState        newstate;
   946             GstState        pendingstate;
   947 
   948 
   949             gst_message_parse_state_changed(msg, &oldstate,
   950                                             &newstate, &pendingstate);
   951 
   952             if (pendingstate != GST_STATE_VOID_PENDING)
   953                 break;
   954 
   955             if ((oldstate == GST_STATE_READY)
   956                 && (newstate == GST_STATE_PAUSED)) {
   957                 if (priv->ready)
   958                     g_signal_emit(user_data, g_mencoder_signals[PAUSED],
   959                                   0);
   960             } else if ((oldstate == GST_STATE_PAUSED)
   961                        && (newstate == GST_STATE_PLAYING)) {
   962                 g_signal_emit(user_data, g_mencoder_signals[PLAYING], 0);
   963             } else if ((oldstate == GST_STATE_READY) &&
   964                        (newstate == GST_STATE_NULL)) {
   965                 g_signal_emit(user_data, g_mencoder_signals[STOPED], 0);
   966             }
   967             break;
   968         }
   969 
   970     case GST_MESSAGE_ERROR:
   971         {
   972             GError         *error;
   973             gchar          *debug;
   974             gchar          *err_str;
   975 
   976             if (priv->tick_id != 0) {
   977                 g_source_remove(priv->tick_id);
   978                 priv->tick_id = 0;
   979             }
   980 
   981             gst_message_parse_error(msg, &error, &debug);
   982             err_str = g_strdup_printf("Error [%d] %s (%s)", error->code,
   983                                       error->message, debug);
   984             priv->ready = FALSE;
   985             g_signal_emit(user_data, g_mencoder_signals[ERROR], 0,
   986                           err_str);
   987             g_free(err_str);
   988             g_clear_error(&error);
   989             g_free(debug);
   990             break;
   991         }
   992 
   993     case GST_MESSAGE_EOS:
   994         priv->ready = FALSE;
   995 #ifdef USE_MANUAL_SINK
   996 	_flush_queue (G_MENCODER (user_data));
   997 #endif
   998         g_signal_emit(user_data, g_mencoder_signals[EOS], 0);
   999         break;
  1000 
  1001     case GST_MESSAGE_DURATION:
  1002         {
  1003             GstFormat       format;
  1004             gint64          duration;
  1005             gst_message_parse_duration(msg, &format, &duration);
  1006             if (format == GST_FORMAT_BYTES)
  1007                 priv->duration = duration;
  1008             break;
  1009         }
  1010     default:
  1011         {
  1012             break;
  1013         }
  1014     }
  1015     return TRUE;
  1016 }
  1017 
  1018 
  1019 
  1020 static void
  1021 _decodebin_new_pad_cb(GstElement * object,
  1022                       GstPad * pad, gboolean flag, gpointer user_data)
  1023 {
  1024     GstCaps        *caps;
  1025     gchar          *str_caps = NULL;
  1026     GstElement     *sink_element;
  1027     GstPad         *sink_pad;
  1028 
  1029     caps = gst_pad_get_caps(pad);
  1030     str_caps = gst_caps_to_string(caps);
  1031     if (strstr(str_caps, "audio") != NULL) {
  1032         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "aqueue");
  1033     } else if (strstr(str_caps, "video") != NULL) {
  1034         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "vqueue");
  1035     } else {
  1036         g_warning("invalid caps %s", str_caps);
  1037     }
  1038 
  1039     sink_pad = gst_element_get_pad(sink_element, "sink");
  1040     gst_pad_link(pad, sink_pad);
  1041 
  1042     gst_object_unref(sink_element);
  1043     gst_object_unref(sink_pad);
  1044     g_free(str_caps);
  1045     gst_caps_unref(caps);
  1046 }
  1047 
  1048 static void
  1049 _decodebin_unknown_type_cb(GstElement * object,
  1050                            GstPad * pad, GstCaps * caps,
  1051                            gpointer user_data)
  1052 {
  1053     g_warning("Unknown Type");
  1054     // priv->ready = FALSE;
  1055 }
  1056 
  1057 static          gboolean
  1058 _tick_cb(gpointer user_data)
  1059 {
  1060     GstFormat       format = GST_FORMAT_BYTES;
  1061     gint64          cur = 0;
  1062 
  1063     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1064 
  1065     if (priv->duration == 0) {
  1066         gint64          d = 0;
  1067         if (gst_element_query_duration(priv->src, &format, &d))
  1068             priv->duration = d;
  1069     }
  1070 
  1071     if (priv->duration != 0) {
  1072         gst_element_query_position(priv->src, &format, &cur);
  1073         g_print("PROGRESS:%lli\n", (99 * cur) / priv->duration);
  1074     }
  1075 
  1076     return TRUE;
  1077 }
  1078 
  1079 
  1080 #ifdef USE_MANUAL_SINK
  1081 static gboolean
  1082 _send_buffer (gint fd, gpointer buff, gint size)
  1083 {
  1084     gboolean ret = TRUE;
  1085     gchar *msg;
  1086     GByteArray *b_send;
  1087 
  1088     b_send = g_byte_array_new ();
  1089     msg = g_strdup_printf ("%x\r\n", size);
  1090     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1091     g_free (msg);
  1092 
  1093     b_send = g_byte_array_append (b_send, buff, size);
  1094 
  1095     msg = g_strdup ("\r\n");
  1096     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1097     g_free (msg);
  1098 
  1099     if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0) 
  1100         ret = FALSE;
  1101     g_byte_array_free (b_send, TRUE);
  1102 
  1103     return ret;
  1104 }
  1105 
  1106 static void
  1107 _flush_queue (GMencoder *self)
  1108 {
  1109     gchar *end_msg;
  1110     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
  1111 
  1112     if (BUFFER_SIZE == 0)
  1113         return;
  1114 
  1115     if (priv->queue->len > 0) {
  1116         _send_buffer (priv->fd, priv->queue->data, priv->queue->len);
  1117         priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1118     }
  1119 
  1120     end_msg = g_strdup ("0\r\n\r\n");
  1121     write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar));
  1122     g_free (end_msg);
  1123     return;
  1124 }
  1125 static void 
  1126 _buffer_arrive_cb (GstElement* object,
  1127                    GstBuffer* buff,
  1128                    GstPad* pad,
  1129                    gpointer user_data)
  1130 {
  1131     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1132 
  1133     if (BUFFER_SIZE == 0) {
  1134 	if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE)
  1135              goto error;
  1136         return;
  1137     }
  1138 
  1139     priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
  1140     while (priv->queue->len >= BUFFER_SIZE) {
  1141     	//g_usleep (0.2 * G_USEC_PER_SEC);
  1142     	if (priv->send_chunked) {	
  1143 	    if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE)
  1144                 goto error;
  1145 	} else {
  1146 	    if (write (priv->fd, priv->queue->data, BUFFER_SIZE) == -1)
  1147                 goto error;
  1148 	}
  1149         priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE);
  1150     }
  1151     return;
  1152 
  1153 error:
  1154     if (priv->tick_id != 0) {
  1155         g_source_remove(priv->tick_id);
  1156         priv->tick_id = 0;
  1157     }
  1158     priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1159     g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket");
  1160 }
  1161 #endif 
  1162