gmyth-stream/gmemcoder/src/gmencoder.c
branchtrunk
changeset 766 f1da4fbe667e
parent 761 949291aaba65
child 768 b405295259f3
     1.1 --- a/gmyth-stream/gmemcoder/src/gmencoder.c	Fri Jun 29 14:40:01 2007 +0100
     1.2 +++ b/gmyth-stream/gmemcoder/src/gmencoder.c	Mon Jul 02 08:33:58 2007 +0100
     1.3 @@ -8,6 +8,8 @@
     1.4  #include <glib.h>
     1.5  #include <gst/gst.h>
     1.6  #include <string.h>
     1.7 +#include <sys/types.h>
     1.8 +#include <sys/socket.h>
     1.9  
    1.10  #include "gmencoder.h"
    1.11  
    1.12 @@ -16,7 +18,7 @@
    1.13  
    1.14  // #define SUPPORT_MULT_INPUT 0
    1.15  #define USE_MANUAL_SINK
    1.16 -#define BUFFER_SIZE 512
    1.17 +#define BUFFER_SIZE (1024 * 64)
    1.18  
    1.19  typedef struct _GMencoderPrivate GMencoderPrivate;
    1.20  typedef struct _SetupInfo SetupInfo;
    1.21 @@ -49,6 +51,9 @@
    1.22      gint            sources;
    1.23      gint            tick_id;
    1.24      gint64          duration;
    1.25 +#ifdef USE_MANUAL_SINK
    1.26 +    GByteArray	    *queue;
    1.27 +#endif
    1.28  };
    1.29  
    1.30  enum {
    1.31 @@ -99,6 +104,7 @@
    1.32                                      gchar ** audio_encode_prop,
    1.33                                      guint audio_rate);
    1.34  #ifdef USE_MANUAL_SINK
    1.35 +static void _flush_queue 	    (GMencoder *self);
    1.36  static void _buffer_arrive_cb       (GstElement* object,
    1.37                                       GstBuffer* buff,
    1.38                                       GstPad* pad,
    1.39 @@ -163,6 +169,9 @@
    1.40  {
    1.41      GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
    1.42      priv->info = g_new0(SetupInfo, 1);
    1.43 +#ifdef USE_MANUAL_SINK
    1.44 +    priv->queue = g_byte_array_new ();
    1.45 +#endif
    1.46  }
    1.47  
    1.48  static void
    1.49 @@ -173,8 +182,14 @@
    1.50  static void
    1.51  g_mencoder_finalize(GObject * object)
    1.52  {
    1.53 +    GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object);
    1.54 +
    1.55      // TODO: clear vars
    1.56      g_mencoder_close_stream(G_MENCODER(object));
    1.57 +    g_free (priv->info);
    1.58 +#ifdef USE_MANUAL_SINK
    1.59 +    g_byte_array_free (priv->queue, TRUE);
    1.60 +#endif
    1.61  }
    1.62  
    1.63  GMencoder      *
    1.64 @@ -657,7 +672,6 @@
    1.65      GstElement     *ap = NULL;
    1.66      GstElement     *vp = NULL;
    1.67  #endif
    1.68 -    GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
    1.69  
    1.70      pipe = gst_pipeline_new("pipe");
    1.71  
    1.72 @@ -666,6 +680,7 @@
    1.73      vp = gst_element_factory_make("concatmux", "vp");
    1.74      gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL);
    1.75  #endif
    1.76 +    GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
    1.77  
    1.78      mux =
    1.79          gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"),
    1.80 @@ -678,10 +693,11 @@
    1.81  
    1.82  #ifdef USE_MANUAL_SINK
    1.83      sink = gst_element_factory_make("fakesink", "sink");
    1.84 +    g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
    1.85      g_signal_connect (G_OBJECT (sink),
    1.86                        "handoff",
    1.87                        G_CALLBACK (_buffer_arrive_cb),
    1.88 -                      GINT_TO_POINTER (priv->fd));
    1.89 +                      self);
    1.90  #else    
    1.91      sink = gst_element_factory_make("fdsink", "sink");
    1.92      if (sink == NULL)
    1.93 @@ -886,6 +902,7 @@
    1.94      i = g_strsplit(uri, "://", 0);
    1.95      if (strcmp(i[0], "fd") == 0) {
    1.96          priv->fd = atoi(i[1]);
    1.97 +	fcntl (priv->fd, F_SETFL, O_ASYNC);
    1.98      } else if (strcmp(i[0], "file") == 0) {
    1.99          if (g_file_test (i[1], G_FILE_TEST_EXISTS)) {
   1.100              if (unlink (i[1]) != 0) {
   1.101 @@ -971,6 +988,9 @@
   1.102  
   1.103      case GST_MESSAGE_EOS:
   1.104          priv->ready = FALSE;
   1.105 +#ifdef USE_MANUAL_SINK
   1.106 +	_flush_queue (G_MENCODER (user_data));
   1.107 +#endif
   1.108          g_signal_emit(user_data, g_mencoder_signals[EOS], 0);
   1.109          break;
   1.110  
   1.111 @@ -1054,23 +1074,80 @@
   1.112  
   1.113  
   1.114  #ifdef USE_MANUAL_SINK
   1.115 +static gboolean
   1.116 +_send_buffer (gint fd, gpointer buff, gint size)
   1.117 +{
   1.118 +    gboolean ret = TRUE;
   1.119 +    gchar *msg;
   1.120 +    GByteArray *b_send;
   1.121 +
   1.122 +    b_send = g_byte_array_new ();
   1.123 +    msg = g_strdup_printf ("%x\r\n", size);
   1.124 +    b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
   1.125 +    g_free (msg);
   1.126 +
   1.127 +    b_send = g_byte_array_append (b_send, buff, size);
   1.128 +
   1.129 +    msg = g_strdup ("\r\n");
   1.130 +    b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
   1.131 +    g_free (msg);
   1.132 +
   1.133 +    if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0) 
   1.134 +        ret = FALSE;
   1.135 +    g_byte_array_free (b_send, TRUE);
   1.136 +
   1.137 +    return ret;
   1.138 +}
   1.139 +
   1.140 +static void
   1.141 +_flush_queue (GMencoder *self)
   1.142 +{
   1.143 +    gchar *end_msg;
   1.144 +    GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
   1.145 +
   1.146 +    if (BUFFER_SIZE == 0)
   1.147 +        return;
   1.148 +
   1.149 +    if (priv->queue->len > 0) {
   1.150 +        _send_buffer (priv->fd, priv->queue->data, priv->queue->len);
   1.151 +        priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
   1.152 +    }
   1.153 +
   1.154 +    end_msg = g_strdup ("0\r\n\r\n");
   1.155 +    write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar));
   1.156 +    g_free (end_msg);
   1.157 +    return;
   1.158 +}
   1.159  static void 
   1.160  _buffer_arrive_cb (GstElement* object,
   1.161                     GstBuffer* buff,
   1.162                     GstPad* pad,
   1.163                     gpointer user_data)
   1.164  {
   1.165 -    static GByteArray *queue = NULL;
   1.166 -    static guint offset = 0;
   1.167 +    GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
   1.168  
   1.169 -    gint fd = GPOINTER_TO_INT (user_data);
   1.170 +    if (BUFFER_SIZE == 0) {
   1.171 +	if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE)
   1.172 +             goto error;
   1.173 +        return;
   1.174 +    }
   1.175  
   1.176 -    queue = g_byte_array_append (queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
   1.177 +    priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
   1.178 +    while (priv->queue->len >= BUFFER_SIZE) {
   1.179 +    	//g_usleep (0.2 * G_USEC_PER_SEC);	
   1.180 +	if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE)
   1.181 +            goto error;
   1.182 +        priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE);
   1.183 +    }
   1.184 +    return;
   1.185  
   1.186 -    if (queue->len >= BUFFER_SIZE) {
   1.187 -        write (fd, queue->data, BUFFER_SIZE);
   1.188 -        queue = g_byte_array_remove_range (queue, 0, BUFFER_SIZE);
   1.189 +error:
   1.190 +    if (priv->tick_id != 0) {
   1.191 +        g_source_remove(priv->tick_id);
   1.192 +        priv->tick_id = 0;
   1.193      }
   1.194 +    priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
   1.195 +    g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket");
   1.196  }
   1.197  #endif 
   1.198