gmyth-stream/gmemcoder/src/gmencoder.c
author renatofilho
Wed Jul 04 15:25:52 2007 +0100 (2007-07-04)
branchtrunk
changeset 780 7feaeeed26d5
parent 777 4127375c2a03
child 781 97fd2d4783bf
permissions -rw-r--r--
[svn r786] fixed some bugs on destructor
     1 #ifdef HAVE_CONFIG_H
     2 #include "config.h"
     3 #endif
     4 
     5 #include <sys/stat.h>
     6 #include <fcntl.h>
     7 #include <unistd.h>
     8 #include <glib.h>
     9 #include <gst/gst.h>
    10 #include <string.h>
    11 #include <sys/types.h>
    12 #include <sys/socket.h>
    13 
    14 #include "gmencoder.h"
    15 
    16 #define G_MENCODER_GET_PRIVATE(obj) \
    17     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), G_TYPE_MENCODER, GMencoderPrivate))
    18 
    19 // #define SUPPORT_MULT_INPUT 0
    20 #define USE_MANUAL_SINK
    21 #define BUFFER_SIZE (1024 * 64)
    22 
    23 typedef struct _GMencoderPrivate GMencoderPrivate;
    24 typedef struct _SetupInfo SetupInfo;
    25 
    26 struct _SetupInfo {
    27     gchar          *video_encode;
    28     gchar          *mux_name;
    29     gchar         **video_encode_prop;
    30     gdouble         video_fps;
    31     gdouble         video_rate;
    32     guint           video_width;
    33     guint           video_height;
    34     gchar          *audio_encode;
    35     gchar         **audio_encode_prop;
    36     guint           audio_rate;
    37 };
    38 
    39 
    40 struct _GMencoderPrivate {
    41     GstElement     *pipe;
    42     GstElement     *abin;
    43     GstElement     *vbin;
    44     GstElement     *sink;
    45     GstElement     *src;
    46     gboolean        ready;
    47     SetupInfo      *info;
    48     GstClockTime    videot;
    49     GstClockTime    audiot;
    50     gint            fd;
    51     gint            sources;
    52     gint            tick_id;
    53     gint64          duration;
    54     gboolean        send_chunked;
    55 #ifdef USE_MANUAL_SINK
    56     GByteArray	    *queue;
    57 #endif
    58 };
    59 
    60 enum {
    61     PAUSED,
    62     PLAYING,
    63     STOPED,
    64     EOS,
    65     ERROR,
    66     LAST_SIGNAL
    67 };
    68 
    69 static void     g_mencoder_class_init(GMencoderClass * klass);
    70 static void     g_mencoder_init(GMencoder * object);
    71 static void     g_mencoder_dispose(GObject * object);
    72 static void     g_mencoder_finalize(GObject * object);
    73 static GstElement *_create_audio_bin(const gchar * encode,
    74                                      gchar ** encode_prop, gint rate);
    75 static GstElement *_create_video_bin(const gchar * encode,
    76                                      gchar ** encode_prop,
    77                                      gdouble fps,
    78                                      gint rate, guint width, guint height,
    79                                      gboolean use_deinterlace);
    80 
    81 static          gboolean
    82 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data);
    83 
    84 static void     _decodebin_new_pad_cb(GstElement * object,
    85                                       GstPad * pad,
    86                                       gboolean flag, gpointer user_data);
    87 
    88 static void     _decodebin_unknown_type_cb(GstElement * object,
    89                                            GstPad * pad,
    90                                            GstCaps * caps,
    91                                            gpointer user_data);
    92 
    93 static void     _close_output(GMencoder * self);
    94 static gboolean _open_output(GMencoder * self, const gchar * uri);
    95 
    96 static GstElement *_create_source(const gchar * uri);
    97 static GstElement *_create_pipeline(GMencoder * self,
    98                                     const gchar * video_encode,
    99                                     const gchar * mux_name,
   100                                     gchar ** video_encode_prop,
   101                                     gdouble video_fps,
   102                                     gdouble video_rate,
   103                                     guint video_width,
   104                                     guint video_height,
   105                                     const gchar * audio_encode,
   106                                     gchar ** audio_encode_prop,
   107                                     guint audio_rate,
   108 				    gboolean deinterlace);
   109 #ifdef USE_MANUAL_SINK
   110 static void _flush_queue 	    (GMencoder *self);
   111 static void _buffer_arrive_cb       (GstElement* object,
   112                                      GstBuffer* buff,
   113                                      GstPad* pad,
   114                                      gpointer user_data);
   115 #endif
   116 
   117 
   118 static gboolean _tick_cb(gpointer data);
   119 
   120 static guint    g_mencoder_signals[LAST_SIGNAL] = { 0 };
   121 
   122 G_DEFINE_TYPE(GMencoder, g_mencoder, G_TYPE_OBJECT)
   123 
   124 static void     g_mencoder_class_init(GMencoderClass * klass)
   125 {
   126     GObjectClass   *object_class;
   127     object_class = (GObjectClass *) klass;
   128     g_type_class_add_private(klass, sizeof(GMencoderPrivate));
   129 
   130     object_class->dispose = g_mencoder_dispose;
   131     object_class->finalize = g_mencoder_finalize;
   132 
   133     g_mencoder_signals[PAUSED] =
   134         g_signal_new("paused",
   135                      G_OBJECT_CLASS_TYPE(object_class),
   136                      G_SIGNAL_RUN_FIRST,
   137                      0, NULL, NULL,
   138                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   139 
   140     g_mencoder_signals[PLAYING] =
   141         g_signal_new("playing",
   142                      G_OBJECT_CLASS_TYPE(object_class),
   143                      G_SIGNAL_RUN_FIRST,
   144                      0, NULL, NULL,
   145                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   146 
   147     g_mencoder_signals[STOPED] =
   148         g_signal_new("stoped",
   149                      G_OBJECT_CLASS_TYPE(object_class),
   150                      G_SIGNAL_RUN_FIRST,
   151                      0, NULL, NULL,
   152                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   153 
   154     g_mencoder_signals[EOS] =
   155         g_signal_new("eos",
   156                      G_OBJECT_CLASS_TYPE(object_class),
   157                      G_SIGNAL_RUN_FIRST,
   158                      0, NULL, NULL,
   159                      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   160 
   161     g_mencoder_signals[ERROR] =
   162         g_signal_new("error",
   163                      G_OBJECT_CLASS_TYPE(object_class),
   164                      G_SIGNAL_RUN_LAST,
   165                      0, NULL, NULL,
   166                      g_cclosure_marshal_VOID__STRING,
   167                      G_TYPE_NONE, 1, G_TYPE_STRING);
   168 }
   169 
   170 static void
   171 g_mencoder_init(GMencoder * self)
   172 {
   173     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   174     priv->info = g_new0(SetupInfo, 1);
   175 #ifdef USE_MANUAL_SINK
   176     priv->queue = g_byte_array_new ();
   177 #endif
   178 }
   179 
   180 static void
   181 g_mencoder_dispose(GObject * object)
   182 {
   183 }
   184 
   185 static void
   186 g_mencoder_finalize(GObject * object)
   187 {
   188     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object);
   189 
   190     // TODO: clear vars
   191     g_mencoder_close_stream(G_MENCODER(object));
   192     g_free (priv->info);
   193 #ifdef USE_MANUAL_SINK
   194     g_byte_array_free (priv->queue, TRUE);
   195 #endif
   196 }
   197 
   198 GMencoder      *
   199 g_mencoder_new(void)
   200 {
   201     return g_object_new(G_TYPE_MENCODER, NULL);
   202 }
   203 
   204 
   205 static void
   206 _obj_set_prop(GObject * obj, const gchar * prop_name,
   207               const gchar * prop_val)
   208 {
   209     GValue          p = { 0 };
   210     GValue          v = { 0 };
   211     GParamSpec     *s = NULL;
   212     GObjectClass   *k = G_OBJECT_GET_CLASS(obj);
   213 
   214 
   215     g_value_init(&v, G_TYPE_STRING);
   216     g_value_set_string(&v, prop_val);
   217 
   218     s = g_object_class_find_property(k, prop_name);
   219     if (s == NULL) {
   220         g_print("Invalid property name: %s\n", prop_name);
   221         return;
   222     }
   223 
   224     g_value_init(&p, s->value_type);
   225     switch (s->value_type) {
   226     case G_TYPE_INT:
   227         g_value_set_int(&p, atoi(prop_val));
   228         break;
   229     case G_TYPE_ULONG:
   230         g_value_set_ulong (&p, atol(prop_val));
   231         break;
   232     case G_TYPE_STRING:
   233         g_value_set_string(&p, prop_val);
   234         break;
   235     case G_TYPE_BOOLEAN:
   236         g_value_set_boolean(&p, (gboolean) atoi (prop_val));
   237         break;
   238     case G_TYPE_DOUBLE:
   239         g_value_set_double(&p, atof (prop_val));
   240         break;
   241     case G_TYPE_FLOAT:
   242         g_value_set_float(&p, (float) atof (prop_val));
   243         break;
   244     default:
   245         g_value_set_enum(&p, atoi(prop_val));
   246         g_warning ("Property %s of type %s. Not supported using default enum", 
   247                    prop_name, g_type_name (s->value_type));
   248         return;
   249     }
   250 
   251     g_object_set_property(obj, prop_name, &p);
   252     g_value_unset(&v);
   253     g_value_unset(&p);
   254 }
   255 
   256 static GstElement *
   257 _create_element_with_prop(const gchar * factory_name,
   258                           const gchar * element_name, gchar ** prop)
   259 {
   260     GstElement     *ret;
   261     int             i;
   262 
   263     ret = gst_element_factory_make(factory_name, element_name);
   264     if (ret == NULL)
   265         return NULL;
   266 
   267     if (prop != NULL) {
   268         for (i = 0; i < g_strv_length(prop); i++) {
   269             if (prop[i] != NULL) {
   270                 char          **v = g_strsplit(prop[i], "=", 2);
   271                 if (g_strv_length(v) == 2) {
   272                     _obj_set_prop(G_OBJECT(ret), v[0], v[1]);
   273                 }
   274                 g_strfreev(v);
   275             }
   276         }
   277     }
   278 
   279     return ret;
   280 
   281 }
   282 
   283 static GstElement *
   284 _create_audio_bin(const gchar * encode, gchar ** encode_prop, gint rate)
   285 {
   286     GstElement     *abin = NULL;
   287     GstElement     *aqueue = NULL;
   288     GstElement     *aconvert = NULL;
   289     GstElement     *aencode = NULL;
   290     GstElement     *aqueue_src = NULL;
   291     GstPad         *apad = NULL;
   292 
   293     // audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
   294     // udpsink name=upd_audio host=224.0.0.1 port=5002
   295     abin = gst_bin_new("abin");
   296     aqueue = gst_element_factory_make("queue", "aqueue");
   297     aconvert = gst_element_factory_make("audioconvert", "aconvert");
   298     aencode =
   299         _create_element_with_prop((encode ? encode : "lame"), "aencode",
   300                                   encode_prop);
   301     aqueue_src = gst_element_factory_make("queue", "aqueue_src");
   302 
   303     if ((abin == NULL) || (aqueue == NULL) || (aconvert == NULL)
   304         || (aencode == NULL) || (aqueue_src == NULL)) {
   305         g_warning("Audio elements not found");
   306         goto error;
   307     }
   308 
   309     g_object_set(G_OBJECT(aencode), "bitrate", 32, NULL);
   310     /*
   311      * if (rate > 0) { g_object_set (G_OBJECT (aencode), "bitrate", 32,
   312      * NULL); } 
   313      */
   314 
   315     gst_bin_add_many(GST_BIN(abin), aqueue, aconvert, aencode, aqueue_src,
   316                      NULL);
   317     if (gst_element_link_many(aqueue, aconvert, aencode, aqueue_src, NULL)
   318         == FALSE) {
   319         g_warning("Not Link audio elements");
   320     }
   321     // TODO: apply audio rate
   322 
   323     // ghost pad the audio bin
   324     apad = gst_element_get_pad(aqueue, "sink");
   325     gst_element_add_pad(abin, gst_ghost_pad_new("sink", apad));
   326     gst_object_unref(apad);
   327 
   328     apad = gst_element_get_pad(aqueue_src, "src");
   329     gst_element_add_pad(abin, gst_ghost_pad_new("src", apad));
   330     gst_object_unref(apad);
   331 
   332     return abin;
   333   error:
   334     if (abin != NULL)
   335         gst_object_unref(abin);
   336 
   337     if (aqueue != NULL)
   338         gst_object_unref(aqueue);
   339 
   340     if (aconvert != NULL)
   341         gst_object_unref(aconvert);
   342 
   343     if (aencode != NULL)
   344         gst_object_unref(aencode);
   345 
   346     if (aqueue_src != NULL)
   347         gst_object_unref(aqueue_src);
   348 
   349     if (apad != NULL)
   350         gst_object_unref(apad);
   351 
   352     return NULL;
   353 }
   354 
   355 
   356 
   357 
   358 // queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! colorspace
   359 // ! rate ! encode ! queue
   360 static GstElement *
   361 _create_video_bin(const gchar * encode,
   362                   gchar ** encode_prop,
   363                   gdouble fps, gint rate, guint width, guint height,
   364                   gboolean use_deinterlace)
   365 {
   366     GstElement     *vbin = NULL;
   367     GstElement     *vqueue = NULL;
   368     GstElement     *vqueue_src = NULL;
   369     GstElement     *vcolorspace = NULL;
   370     GstElement     *vencode = NULL;
   371     GstElement     *vrate = NULL;
   372     GstElement 	   *deinterlace = NULL;
   373     GstElement     *walk = NULL;
   374     GstPad         *vpad = NULL;
   375 
   376     vbin = gst_bin_new("vbin");
   377     vqueue = gst_element_factory_make("queue", "vqueue");
   378     vcolorspace =
   379         gst_element_factory_make("ffmpegcolorspace", "colorspace");
   380 
   381     if (use_deinterlace) {
   382 	deinterlace = gst_element_factory_make ("ffdeinterlace", "deinterlace");
   383 	if (deinterlace == NULL) {
   384 	    g_warning ("Fail to create deinterlace element: Continue without deinterlace.");
   385 	} 
   386     }
   387 
   388 
   389     vencode = _create_element_with_prop((encode !=
   390                                          NULL ? encode :
   391                                          "ffenc_mpeg1video"), "vencode",
   392                                         encode_prop);
   393     vqueue_src = gst_element_factory_make("queue", "queue_src");
   394 
   395     if ((vbin == NULL) || (vqueue == NULL) || (vcolorspace == NULL)
   396         || (vencode == NULL) || (vqueue_src == NULL)) {
   397         g_warning("Video elements not found");
   398         goto error;
   399     }
   400 
   401     gst_bin_add_many(GST_BIN(vbin), vqueue, vcolorspace, vencode,
   402                      vqueue_src, NULL);
   403 
   404     if (deinterlace != NULL) {
   405         gst_bin_add(GST_BIN(vbin), deinterlace);
   406         gst_element_link (vqueue, deinterlace);
   407         walk = deinterlace;
   408     } else {
   409         walk = vqueue;
   410     }
   411 
   412     if ((width > 0) && (height > 0)) {
   413         // Scalling video
   414         GstCaps        *vcaps;
   415         GstElement     *vscale =
   416             gst_element_factory_make("videoscale", "vscale");
   417 
   418 	g_object_set (G_OBJECT (vscale), "method", 1, NULL);
   419 
   420         gst_bin_add(GST_BIN(vbin), vscale);
   421 
   422         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   423                                     "width", G_TYPE_INT, width,
   424                                     "height", G_TYPE_INT, height, NULL);
   425 
   426         gst_element_link(walk, vscale);
   427 
   428         if (gst_element_link_filtered(vscale, vcolorspace, vcaps) == FALSE) {
   429             g_warning("Fail to resize video");
   430             gst_object_unref(vcaps);
   431             gst_object_unref(vscale);
   432             goto error;
   433         }
   434         gst_caps_unref(vcaps);
   435     } else {
   436         gst_element_link(walk, vcolorspace);
   437     }
   438 
   439     if (fps > 0) {
   440         // Changing the video fps
   441         GstCaps        *vcaps;
   442         vrate = gst_element_factory_make("videorate", "vrate");
   443 
   444         gst_bin_add(GST_BIN(vbin), vrate);
   445 
   446         if (gst_element_link(vcolorspace, vrate) == FALSE) {
   447             g_warning("Fail to link video elements");
   448             goto error;
   449         }
   450 
   451         vcaps = gst_caps_new_simple("video/x-raw-yuv",
   452                                     "framerate", GST_TYPE_FRACTION,
   453                                     (int) (fps * 1000), 1000, NULL);
   454 
   455         if (gst_element_link_filtered(vrate, vencode, vcaps) == FALSE) {
   456             g_warning("Fail to link vrate with vencode.");
   457             goto error;
   458         }
   459         gst_caps_unref(vcaps);
   460     } else {
   461         if (gst_element_link(vcolorspace, vencode) == FALSE) {
   462             g_warning("Fail to link colorspace and video encode element.");
   463             goto error;
   464         }
   465     }
   466 
   467     gst_element_link(vencode, vqueue_src);
   468 
   469     // ghost pad the video bin
   470     vpad = gst_element_get_pad(vqueue, "sink");
   471     gst_element_add_pad(vbin, gst_ghost_pad_new("sink", vpad));
   472     gst_object_unref(vpad);
   473 
   474     vpad = gst_element_get_pad(vqueue_src, "src");
   475     gst_element_add_pad(vbin, gst_ghost_pad_new("src", vpad));
   476     gst_object_unref(vpad);
   477 
   478     return vbin;
   479 
   480   error:
   481     if (vpad != NULL)
   482         gst_object_unref(vpad);
   483 
   484     if (vbin != NULL)
   485         gst_object_unref(vbin);
   486 
   487     if (vqueue != NULL)
   488         gst_object_unref(vqueue);
   489 
   490     if (vencode != NULL)
   491         gst_object_unref(vencode);
   492 
   493     if (vqueue_src != NULL)
   494         gst_object_unref(vqueue_src);
   495 
   496     if (vcolorspace != NULL)
   497         gst_object_unref(vcolorspace);
   498 
   499     return NULL;
   500 }
   501 
   502 
   503 
   504 gboolean 
   505 g_mencoder_setup_stream(GMencoder * self,
   506 			gboolean chunked,
   507 			gboolean deinterlace,
   508                         const gchar * mux_name,
   509                         const gchar * video_encode,
   510                         gchar ** video_encode_prop,
   511                         gdouble video_fps,
   512                         gdouble video_rate,
   513                         guint video_width,
   514                         guint video_height,
   515                         const gchar * audio_encode,
   516                         gchar ** audio_encode_prop,
   517                         guint audio_rate, const gchar * out_uri)
   518 {
   519     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   520     if (priv->ready == TRUE) {
   521         g_warning
   522             ("Stream already configured. You need close stream first.");
   523         return FALSE;
   524     }
   525 
   526     _close_output(self);
   527     if (_open_output(self, out_uri) == FALSE)
   528         return FALSE;
   529 
   530     priv->sources = 0;
   531     priv->send_chunked = chunked;
   532     priv->pipe = _create_pipeline(self,
   533                                   video_encode,
   534                                   mux_name,
   535                                   video_encode_prop,
   536                                   video_fps,
   537                                   video_rate,
   538                                   video_width,
   539                                   video_height,
   540                                   audio_encode, audio_encode_prop,
   541                                   audio_rate,
   542                                   deinterlace);
   543 
   544     return (priv->pipe != NULL);
   545 }
   546 
   547 
   548 gboolean
   549 g_mencoder_append_uri(GMencoder * self, const gchar * uri)
   550 {
   551     GstPad         *pad_src;
   552     GstPad         *pad_sink;
   553     GstElement     *src;
   554     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   555     gboolean        ret = FALSE;
   556     GstElement     *ap = NULL;
   557     GstElement     *vp = NULL;
   558 
   559 
   560     g_return_val_if_fail(priv->pipe != NULL, FALSE);
   561     g_return_val_if_fail(priv->ready == FALSE, FALSE);
   562 
   563 #ifndef SUPPORT_MULT_INPUT
   564     g_return_val_if_fail(priv->sources < 1, FALSE);
   565 #endif
   566 
   567     src = _create_source(uri);
   568     if (src == NULL)
   569         return FALSE;
   570 
   571     priv->src = gst_bin_get_by_name(GST_BIN(src), "src");
   572 
   573     gst_bin_add(GST_BIN(priv->pipe), src);
   574 
   575 #ifdef SUPPORT_MULT_INPUT
   576     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "ap");
   577     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vp");
   578 #else
   579     ap = gst_bin_get_by_name(GST_BIN(priv->pipe), "abin");
   580     vp = gst_bin_get_by_name(GST_BIN(priv->pipe), "vbin");
   581 #endif
   582 
   583     if ((vp == NULL) || (ap == NULL)) {
   584         g_warning("Fail to get output bin");
   585         goto error;
   586     }
   587 
   588     pad_src = gst_element_get_pad(src, "src_audio");
   589     pad_sink = gst_element_get_compatible_pad(ap,
   590                                               pad_src,
   591                                               gst_pad_get_caps(pad_src));
   592 
   593     if ((pad_sink == NULL) || (pad_src == NULL))
   594         goto error;
   595 
   596     GstPadLinkReturn lret = gst_pad_link(pad_src, pad_sink);
   597     if (lret != GST_PAD_LINK_OK)
   598         goto error;
   599 
   600     gst_object_unref(pad_src);
   601     gst_object_unref(pad_sink);
   602 
   603     pad_src = gst_element_get_pad(src, "src_video");
   604     pad_sink = gst_element_get_compatible_pad(vp,
   605                                               pad_src,
   606                                               gst_pad_get_caps(pad_src));
   607 
   608     if ((pad_src == NULL) || (pad_sink == NULL))
   609         goto error;
   610 
   611     if (gst_pad_link(pad_src, pad_sink) != GST_PAD_LINK_OK) {
   612         g_warning("invalid source. video");
   613         goto error;
   614     }
   615 
   616     priv->sources++;
   617     ret = TRUE;
   618   error:
   619 
   620     if ((src != NULL) && (ret == FALSE)) {
   621         gst_bin_remove(GST_BIN(priv->pipe), src);
   622         gst_object_unref(src);
   623     }
   624 
   625     if (ap != NULL)
   626         gst_object_unref(ap);
   627 
   628     if (vp != NULL)
   629         gst_object_unref(vp);
   630 
   631     if (pad_src != NULL)
   632         gst_object_unref(pad_src);
   633 
   634     if (pad_sink != NULL)
   635         gst_object_unref(pad_sink);
   636 
   637     return ret;
   638 }
   639 
   640 
   641 
   642 void
   643 g_mencoder_remove_uri(GMencoder * self, const gchar * uri)
   644 {
   645     // GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE (self);
   646     // TODO: remove src
   647 }
   648 
   649 void
   650 g_mencoder_play_stream(GMencoder * self)
   651 {
   652     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   653     g_return_if_fail(priv->ready == FALSE);
   654     priv->ready = TRUE;
   655     gst_element_set_state(priv->pipe, GST_STATE_PLAYING);
   656     priv->tick_id = g_timeout_add(500, _tick_cb, self);
   657 }
   658 
   659 void
   660 g_mencoder_pause_stream(GMencoder * self)
   661 {
   662     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   663     g_return_if_fail(priv->ready == TRUE);
   664     gst_element_set_state(priv->pipe, GST_STATE_PAUSED);
   665 }
   666 
   667 void
   668 g_mencoder_close_stream(GMencoder * self)
   669 {
   670 
   671     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   672     if (priv->tick_id != 0) {
   673         g_source_remove(priv->tick_id);
   674         priv->tick_id = 0;
   675     }
   676 
   677     if (priv->pipe != NULL) {
   678         // TODO: fixe pipeline dispose
   679         gst_element_set_state (priv->pipe, GST_STATE_NULL);
   680         // g_debug ("SETING STATE TO NULL: OK");
   681         // gst_element_set_state (priv->pipe, GST_STATE_NULL);
   682         gst_object_unref (priv->pipe);
   683         //gst_object_unref(priv->src);
   684         priv->src = NULL;
   685         priv->pipe = NULL;
   686         priv->abin = NULL;
   687         priv->vbin = NULL;
   688         priv->sink = NULL;
   689     }
   690     priv->ready = FALSE;
   691 }
   692 
   693 static GstElement *
   694 _create_pipeline(GMencoder * self,
   695                  const gchar * video_encode,
   696                  const gchar * mux_name,
   697                  gchar ** video_encode_prop,
   698                  gdouble video_fps,
   699                  gdouble video_rate,
   700                  guint video_width,
   701                  guint video_height,
   702                  const gchar * audio_encode,
   703                  gchar ** audio_encode_prop, guint audio_rate,
   704 	     	 gboolean deinterlace)
   705 {
   706     GstBus         *bus = NULL;
   707     GstElement     *pipe = NULL;
   708     GstElement     *sink = NULL;
   709     GstElement     *mux = NULL;
   710     GstElement     *abin = NULL;
   711     GstElement     *vbin = NULL;
   712     GstElement     *queue = NULL;
   713     GstPad         *aux_pad = NULL;
   714     GstPad         *mux_pad = NULL;
   715 #ifdef SUPPORT_MULT_INPUT
   716     GstElement     *ap = NULL;
   717     GstElement     *vp = NULL;
   718 #endif
   719 
   720     pipe = gst_pipeline_new("pipe");
   721 
   722 #ifdef SUPPORT_MULT_INPUT
   723     ap = gst_element_factory_make("concatmux", "ap");
   724     vp = gst_element_factory_make("concatmux", "vp");
   725     gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL);
   726 #endif
   727     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   728 
   729     mux =
   730         gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"),
   731                                  "mux");
   732     if (mux == NULL)
   733         goto error;
   734 
   735     queue = gst_element_factory_make("queue", "queueu_sink");
   736 
   737 
   738 #ifdef USE_MANUAL_SINK
   739     sink = gst_element_factory_make("fakesink", "sink");
   740     g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
   741     g_signal_connect (G_OBJECT (sink),
   742                       "handoff",
   743                       G_CALLBACK (_buffer_arrive_cb),
   744                       self);
   745 #else    
   746     sink = gst_element_factory_make("fdsink", "sink");
   747     if (sink == NULL)
   748         goto error;
   749 
   750     g_object_set(G_OBJECT(sink), "fd", priv->fd, "sync", FALSE, NULL);
   751 #endif
   752 
   753     abin = _create_audio_bin(audio_encode, audio_encode_prop, audio_rate);
   754     if (abin == NULL)
   755         goto error;
   756 
   757     vbin =
   758         _create_video_bin(video_encode, video_encode_prop, video_fps,
   759                           video_rate, video_width, video_height, deinterlace);
   760     if (vbin == NULL)
   761         goto error;
   762 
   763     // Finish Pipe
   764     gst_bin_add_many(GST_BIN(pipe), abin, vbin, mux, queue, sink, NULL);
   765 
   766 
   767 #ifdef SUPPORT_MULT_INPUT
   768     if (gst_element_link(ap, abin) == FALSE) {
   769         g_warning("Fail to link concat and abin");
   770         goto error;
   771     }
   772 
   773     if (gst_element_link(vp, vbin) == FALSE) {
   774         g_warning("Fail to link concat and vbin");
   775     }
   776 #endif
   777 
   778     // Link bins with mux
   779     aux_pad = gst_element_get_pad(abin, "src");
   780     mux_pad =
   781         gst_element_get_compatible_pad(mux, aux_pad,
   782                                        GST_PAD_CAPS(aux_pad));
   783     if (mux_pad == NULL) {
   784         g_warning("Mux element no have audio PAD");
   785         goto error;
   786     }
   787     GstPadLinkReturn ret = gst_pad_link(aux_pad, mux_pad);
   788     if (ret != GST_PAD_LINK_OK) {
   789         g_warning("Fail link audio and mux: %d", ret);
   790         goto error;
   791 
   792     }
   793     gst_object_unref(aux_pad);
   794     gst_object_unref(mux_pad);
   795 
   796     aux_pad = gst_element_get_pad(vbin, "src");
   797     mux_pad =
   798         gst_element_get_compatible_pad(mux, aux_pad,
   799                                        GST_PAD_CAPS(aux_pad));
   800     if (mux_pad == NULL) {
   801         g_warning("Mux element no have video PAD");
   802         goto error;
   803     }
   804     ret = gst_pad_link(aux_pad, mux_pad);
   805     if (ret != GST_PAD_LINK_OK) {
   806         g_warning("Fail link video and mux: %d", ret);
   807         goto error;
   808     }
   809     gst_object_unref(aux_pad);
   810     gst_object_unref(mux_pad);
   811     aux_pad = NULL;
   812     mux_pad = NULL;
   813 
   814     // Link mux with sink
   815     gst_element_link_many(mux, queue, sink, NULL);
   816 
   817     bus = gst_pipeline_get_bus(GST_PIPELINE(pipe));
   818     gst_bus_add_watch(bus, _pipeline_bus_cb, self);
   819     gst_object_unref(bus);
   820     return pipe;
   821 
   822   error:
   823     g_warning("Invalid uri");
   824 
   825     if (pipe != NULL) {
   826         gst_object_unref(pipe);
   827     }
   828 
   829 
   830     if (mux != NULL) {
   831         gst_object_unref(mux);
   832     }
   833 
   834     if (mux_pad != NULL) {
   835         gst_object_unref(mux_pad);
   836     }
   837 
   838     if (aux_pad != NULL) {
   839         gst_object_unref(mux_pad);
   840     }
   841 
   842     if (sink != NULL) {
   843         gst_object_unref(sink);
   844     }
   845 
   846     if (abin != NULL) {
   847         gst_object_unref(abin);
   848     }
   849 
   850     if (vbin != NULL) {
   851         gst_object_unref(vbin);
   852     }
   853 
   854     return FALSE;
   855 }
   856 
   857 
   858 static void
   859 _close_output(GMencoder * self)
   860 {
   861 }
   862 
   863 static GstElement *
   864 _create_source(const gchar * uri)
   865 {
   866 
   867     GstElement     *bsrc = NULL;
   868     GstElement     *src = NULL;
   869     GstElement     *queue = NULL;
   870     GstElement     *aqueue = NULL;
   871     GstElement     *vqueue = NULL;
   872     GstElement     *decode = NULL;
   873     GstPad         *src_pad = NULL;
   874 
   875 
   876     bsrc = gst_bin_new(NULL);
   877 
   878     // src = gst_element_factory_make ("gnomevfssrc", "src");
   879     // g_object_set (G_OBJECT (src), "location", uri, NULL);
   880     src = gst_element_make_from_uri(GST_URI_SRC, uri, "src");
   881     if (src == NULL)
   882         goto error;
   883 
   884     decode = gst_element_factory_make("decodebin", "decode");
   885     if (decode == NULL)
   886         goto error;
   887 
   888     queue = gst_element_factory_make("queue", "queue_src");
   889     aqueue = gst_element_factory_make("queue", "aqueue");
   890     if (aqueue == NULL)
   891         goto error;
   892 
   893     vqueue = gst_element_factory_make("queue", "vqueue");
   894     if (vqueue == NULL)
   895         goto error;
   896 
   897     gst_bin_add_many(GST_BIN(bsrc), src, queue, decode, aqueue, vqueue,
   898                      NULL);
   899     gst_element_link_many(src, queue, decode, NULL);
   900 
   901     g_signal_connect(G_OBJECT(decode),
   902                      "new-decoded-pad",
   903                      G_CALLBACK(_decodebin_new_pad_cb), bsrc);
   904 
   905     g_signal_connect(G_OBJECT(decode),
   906                      "unknown-type",
   907                      G_CALLBACK(_decodebin_unknown_type_cb), pipe);
   908 
   909     src_pad = gst_element_get_pad(aqueue, "src");
   910     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_audio", src_pad));
   911     gst_object_unref(src_pad);
   912 
   913     src_pad = gst_element_get_pad(vqueue, "src");
   914     gst_element_add_pad(bsrc, gst_ghost_pad_new("src_video", src_pad));
   915     gst_object_unref(src_pad);
   916 
   917     return bsrc;
   918 
   919   error:
   920     if (src != NULL) {
   921         gst_object_unref(src);
   922     }
   923 
   924     if (decode != NULL) {
   925         gst_object_unref(decode);
   926     }
   927 
   928     if (aqueue != NULL) {
   929         gst_object_unref(aqueue);
   930     }
   931 
   932     if (vqueue != NULL) {
   933         gst_object_unref(vqueue);
   934     }
   935 
   936     return NULL;
   937 }
   938 
   939 static gboolean
   940 _open_output(GMencoder * self, const gchar * uri)
   941 {
   942     gboolean ret = TRUE;
   943     gchar         **i;
   944     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   945 
   946     i = g_strsplit(uri, "://", 0);
   947     if (strcmp(i[0], "fd") == 0) {
   948         priv->fd = atoi(i[1]);
   949 	if (priv->send_chunked)
   950 	    fcntl (priv->fd, F_SETFL, O_ASYNC);
   951     } else if (strcmp(i[0], "file") == 0) {
   952         if (g_file_test (i[1], G_FILE_TEST_EXISTS)) {
   953             if (unlink (i[1]) != 0) {
   954                 g_warning ("Fail to write in : %s", uri);
   955                 ret = FALSE;
   956                 goto done;
   957             }
   958         }
   959         priv->fd = open(i[1], O_WRONLY | O_CREAT | O_TRUNC,
   960                         S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
   961 
   962         if (priv->fd == -1) {
   963             g_warning ("Fail to open : %s", uri);
   964             ret = FALSE;
   965         }
   966     } else {
   967         g_warning("Output uri not supported");
   968         ret = FALSE;
   969     }
   970 
   971 done:
   972     g_strfreev(i);
   973     return ret;
   974 }
   975 
   976 static          gboolean
   977 _pipeline_bus_cb(GstBus * bus, GstMessage * msg, gpointer user_data)
   978 {
   979     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
   980 
   981     switch (GST_MESSAGE_TYPE(msg)) {
   982 
   983     case GST_MESSAGE_STATE_CHANGED:
   984         {
   985             GstState        oldstate;
   986             GstState        newstate;
   987             GstState        pendingstate;
   988 
   989 
   990             gst_message_parse_state_changed(msg, &oldstate,
   991                                             &newstate, &pendingstate);
   992 
   993             if (pendingstate != GST_STATE_VOID_PENDING)
   994                 break;
   995 
   996             if ((oldstate == GST_STATE_READY)
   997                 && (newstate == GST_STATE_PAUSED)) {
   998                 if (priv->ready)
   999                     g_signal_emit(user_data, g_mencoder_signals[PAUSED],
  1000                                   0);
  1001             } else if ((oldstate == GST_STATE_PAUSED)
  1002                        && (newstate == GST_STATE_PLAYING)) {
  1003                 g_signal_emit(user_data, g_mencoder_signals[PLAYING], 0);
  1004             } else if ((oldstate == GST_STATE_READY) &&
  1005                        (newstate == GST_STATE_NULL)) {
  1006                 g_signal_emit(user_data, g_mencoder_signals[STOPED], 0);
  1007             }
  1008             break;
  1009         }
  1010 
  1011     case GST_MESSAGE_ERROR:
  1012         {
  1013             GError         *error;
  1014             gchar          *debug;
  1015             gchar          *err_str;
  1016 
  1017             if (priv->tick_id != 0) {
  1018                 g_source_remove(priv->tick_id);
  1019                 priv->tick_id = 0;
  1020             }
  1021 
  1022             gst_message_parse_error(msg, &error, &debug);
  1023             err_str = g_strdup_printf("Error [%d] %s (%s)", error->code,
  1024                                       error->message, debug);
  1025             priv->ready = FALSE;
  1026             g_signal_emit(user_data, g_mencoder_signals[ERROR], 0,
  1027                           err_str);
  1028             g_free(err_str);
  1029             g_clear_error(&error);
  1030             g_free(debug);
  1031             break;
  1032         }
  1033 
  1034     case GST_MESSAGE_EOS:
  1035         priv->ready = FALSE;
  1036 #ifdef USE_MANUAL_SINK
  1037 	_flush_queue (G_MENCODER (user_data));
  1038 #endif
  1039         g_signal_emit(user_data, g_mencoder_signals[EOS], 0);
  1040         break;
  1041 
  1042     case GST_MESSAGE_DURATION:
  1043         {
  1044             GstFormat       format;
  1045             gint64          duration;
  1046             gst_message_parse_duration(msg, &format, &duration);
  1047             if (format == GST_FORMAT_BYTES)
  1048                 priv->duration = duration;
  1049             break;
  1050         }
  1051     default:
  1052         {
  1053             break;
  1054         }
  1055     }
  1056     return TRUE;
  1057 }
  1058 
  1059 
  1060 
  1061 static void
  1062 _decodebin_new_pad_cb(GstElement * object,
  1063                       GstPad * pad, gboolean flag, gpointer user_data)
  1064 {
  1065     GstCaps        *caps;
  1066     gchar          *str_caps = NULL;
  1067     GstElement     *sink_element;
  1068     GstPad         *sink_pad;
  1069 
  1070     caps = gst_pad_get_caps(pad);
  1071     str_caps = gst_caps_to_string(caps);
  1072     if (strstr(str_caps, "audio") != NULL) {
  1073         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "aqueue");
  1074     } else if (strstr(str_caps, "video") != NULL) {
  1075         sink_element = gst_bin_get_by_name(GST_BIN(user_data), "vqueue");
  1076     } else {
  1077         g_warning("invalid caps %s", str_caps);
  1078     }
  1079 
  1080     sink_pad = gst_element_get_pad(sink_element, "sink");
  1081     gst_pad_link(pad, sink_pad);
  1082 
  1083     gst_object_unref(sink_element);
  1084     gst_object_unref(sink_pad);
  1085     g_free(str_caps);
  1086     gst_caps_unref(caps);
  1087 }
  1088 
  1089 static void
  1090 _decodebin_unknown_type_cb(GstElement * object,
  1091                            GstPad * pad, GstCaps * caps,
  1092                            gpointer user_data)
  1093 {
  1094     g_warning("Unknown Type");
  1095     // priv->ready = FALSE;
  1096 }
  1097 
  1098 static          gboolean
  1099 _tick_cb(gpointer user_data)
  1100 {
  1101     GstFormat       format = GST_FORMAT_BYTES;
  1102     gint64          cur = 0;
  1103 
  1104     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1105 
  1106     if (priv->duration == 0) {
  1107         gint64          d = 0;
  1108         if (gst_element_query_duration(priv->src, &format, &d))
  1109             priv->duration = d;
  1110     }
  1111 
  1112     if (priv->duration != 0) {
  1113         gst_element_query_position(priv->src, &format, &cur);
  1114         g_print("PROGRESS:%lli\n", (99 * cur) / priv->duration);
  1115     }
  1116 
  1117     return TRUE;
  1118 }
  1119 
  1120 
  1121 #ifdef USE_MANUAL_SINK
  1122 static gboolean
  1123 _send_buffer (gint fd, gpointer buff, gint size)
  1124 {
  1125     gboolean ret = TRUE;
  1126     gchar *msg;
  1127     GByteArray *b_send;
  1128 
  1129     b_send = g_byte_array_new ();
  1130     msg = g_strdup_printf ("%x\r\n", size);
  1131     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1132     g_free (msg);
  1133 
  1134     b_send = g_byte_array_append (b_send, buff, size);
  1135 
  1136     msg = g_strdup ("\r\n");
  1137     b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
  1138     g_free (msg);
  1139 
  1140     if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0) 
  1141         ret = FALSE;
  1142     g_byte_array_free (b_send, TRUE);
  1143 
  1144     return ret;
  1145 }
  1146 
  1147 static void
  1148 _flush_queue (GMencoder *self)
  1149 {
  1150     gchar *end_msg;
  1151     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
  1152 
  1153     if (BUFFER_SIZE == 0)
  1154         return;
  1155 
  1156     if (priv->queue->len > 0) {
  1157         _send_buffer (priv->fd, priv->queue->data, priv->queue->len);
  1158         priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1159     }
  1160 
  1161     end_msg = g_strdup ("0\r\n\r\n");
  1162     write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar));
  1163     g_free (end_msg);
  1164     return;
  1165 }
  1166 static void 
  1167 _buffer_arrive_cb (GstElement* object,
  1168                    GstBuffer* buff,
  1169                    GstPad* pad,
  1170                    gpointer user_data)
  1171 {
  1172     GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
  1173 
  1174     if (GST_BUFFER_SIZE (buff) == 0)
  1175         return;
  1176 
  1177     if (BUFFER_SIZE == 0) {
  1178 	    if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE)
  1179              goto error;
  1180         return;
  1181     }
  1182 
  1183     priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
  1184     while (priv->queue->len >= BUFFER_SIZE) {
  1185     	if (priv->send_chunked) {
  1186             if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE)
  1187                 goto error;
  1188         } else {
  1189 	        if (write (priv->fd, priv->queue->data, BUFFER_SIZE) == -1)
  1190                 goto error;
  1191 	    }
  1192         priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE);
  1193     }
  1194     return;
  1195 
  1196 error:
  1197     if (priv->tick_id != 0) {
  1198         g_source_remove(priv->tick_id);
  1199         priv->tick_id = 0;
  1200     }
  1201     priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
  1202     g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket");
  1203 }
  1204 #endif 
  1205