1.1 --- a/gmyth-stream/gmemcoder/src/gmencoder.c Fri Jun 29 14:40:01 2007 +0100
1.2 +++ b/gmyth-stream/gmemcoder/src/gmencoder.c Mon Jul 02 08:33:58 2007 +0100
1.3 @@ -8,6 +8,8 @@
1.4 #include <glib.h>
1.5 #include <gst/gst.h>
1.6 #include <string.h>
1.7 +#include <sys/types.h>
1.8 +#include <sys/socket.h>
1.9
1.10 #include "gmencoder.h"
1.11
1.12 @@ -16,7 +18,7 @@
1.13
1.14 // #define SUPPORT_MULT_INPUT 0
1.15 #define USE_MANUAL_SINK
1.16 -#define BUFFER_SIZE 512
1.17 +#define BUFFER_SIZE (1024 * 64)
1.18
1.19 typedef struct _GMencoderPrivate GMencoderPrivate;
1.20 typedef struct _SetupInfo SetupInfo;
1.21 @@ -49,6 +51,9 @@
1.22 gint sources;
1.23 gint tick_id;
1.24 gint64 duration;
1.25 +#ifdef USE_MANUAL_SINK
1.26 + GByteArray *queue;
1.27 +#endif
1.28 };
1.29
1.30 enum {
1.31 @@ -99,6 +104,7 @@
1.32 gchar ** audio_encode_prop,
1.33 guint audio_rate);
1.34 #ifdef USE_MANUAL_SINK
1.35 +static void _flush_queue (GMencoder *self);
1.36 static void _buffer_arrive_cb (GstElement* object,
1.37 GstBuffer* buff,
1.38 GstPad* pad,
1.39 @@ -163,6 +169,9 @@
1.40 {
1.41 GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
1.42 priv->info = g_new0(SetupInfo, 1);
1.43 +#ifdef USE_MANUAL_SINK
1.44 + priv->queue = g_byte_array_new ();
1.45 +#endif
1.46 }
1.47
1.48 static void
1.49 @@ -173,8 +182,14 @@
1.50 static void
1.51 g_mencoder_finalize(GObject * object)
1.52 {
1.53 + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object);
1.54 +
1.55 // TODO: clear vars
1.56 g_mencoder_close_stream(G_MENCODER(object));
1.57 + g_free (priv->info);
1.58 +#ifdef USE_MANUAL_SINK
1.59 + g_byte_array_free (priv->queue, TRUE);
1.60 +#endif
1.61 }
1.62
1.63 GMencoder *
1.64 @@ -657,7 +672,6 @@
1.65 GstElement *ap = NULL;
1.66 GstElement *vp = NULL;
1.67 #endif
1.68 - GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
1.69
1.70 pipe = gst_pipeline_new("pipe");
1.71
1.72 @@ -666,6 +680,7 @@
1.73 vp = gst_element_factory_make("concatmux", "vp");
1.74 gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL);
1.75 #endif
1.76 + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
1.77
1.78 mux =
1.79 gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"),
1.80 @@ -678,10 +693,11 @@
1.81
1.82 #ifdef USE_MANUAL_SINK
1.83 sink = gst_element_factory_make("fakesink", "sink");
1.84 + g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
1.85 g_signal_connect (G_OBJECT (sink),
1.86 "handoff",
1.87 G_CALLBACK (_buffer_arrive_cb),
1.88 - GINT_TO_POINTER (priv->fd));
1.89 + self);
1.90 #else
1.91 sink = gst_element_factory_make("fdsink", "sink");
1.92 if (sink == NULL)
1.93 @@ -886,6 +902,7 @@
1.94 i = g_strsplit(uri, "://", 0);
1.95 if (strcmp(i[0], "fd") == 0) {
1.96 priv->fd = atoi(i[1]);
1.97 + fcntl (priv->fd, F_SETFL, O_ASYNC);
1.98 } else if (strcmp(i[0], "file") == 0) {
1.99 if (g_file_test (i[1], G_FILE_TEST_EXISTS)) {
1.100 if (unlink (i[1]) != 0) {
1.101 @@ -971,6 +988,9 @@
1.102
1.103 case GST_MESSAGE_EOS:
1.104 priv->ready = FALSE;
1.105 +#ifdef USE_MANUAL_SINK
1.106 + _flush_queue (G_MENCODER (user_data));
1.107 +#endif
1.108 g_signal_emit(user_data, g_mencoder_signals[EOS], 0);
1.109 break;
1.110
1.111 @@ -1054,23 +1074,80 @@
1.112
1.113
1.114 #ifdef USE_MANUAL_SINK
1.115 +static gboolean
1.116 +_send_buffer (gint fd, gpointer buff, gint size)
1.117 +{
1.118 + gboolean ret = TRUE;
1.119 + gchar *msg;
1.120 + GByteArray *b_send;
1.121 +
1.122 + b_send = g_byte_array_new ();
1.123 + msg = g_strdup_printf ("%x\r\n", size);
1.124 + b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
1.125 + g_free (msg);
1.126 +
1.127 + b_send = g_byte_array_append (b_send, buff, size);
1.128 +
1.129 + msg = g_strdup ("\r\n");
1.130 + b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar));
1.131 + g_free (msg);
1.132 +
1.133 + if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0)
1.134 + ret = FALSE;
1.135 + g_byte_array_free (b_send, TRUE);
1.136 +
1.137 + return ret;
1.138 +}
1.139 +
1.140 +static void
1.141 +_flush_queue (GMencoder *self)
1.142 +{
1.143 + gchar *end_msg;
1.144 + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self);
1.145 +
1.146 + if (BUFFER_SIZE == 0)
1.147 + return;
1.148 +
1.149 + if (priv->queue->len > 0) {
1.150 + _send_buffer (priv->fd, priv->queue->data, priv->queue->len);
1.151 + priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
1.152 + }
1.153 +
1.154 + end_msg = g_strdup ("0\r\n\r\n");
1.155 + write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar));
1.156 + g_free (end_msg);
1.157 + return;
1.158 +}
1.159 static void
1.160 _buffer_arrive_cb (GstElement* object,
1.161 GstBuffer* buff,
1.162 GstPad* pad,
1.163 gpointer user_data)
1.164 {
1.165 - static GByteArray *queue = NULL;
1.166 - static guint offset = 0;
1.167 + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data);
1.168
1.169 - gint fd = GPOINTER_TO_INT (user_data);
1.170 + if (BUFFER_SIZE == 0) {
1.171 + if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE)
1.172 + goto error;
1.173 + return;
1.174 + }
1.175
1.176 - queue = g_byte_array_append (queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
1.177 + priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff));
1.178 + while (priv->queue->len >= BUFFER_SIZE) {
1.179 + //g_usleep (0.2 * G_USEC_PER_SEC);
1.180 + if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE)
1.181 + goto error;
1.182 + priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE);
1.183 + }
1.184 + return;
1.185
1.186 - if (queue->len >= BUFFER_SIZE) {
1.187 - write (fd, queue->data, BUFFER_SIZE);
1.188 - queue = g_byte_array_remove_range (queue, 0, BUFFER_SIZE);
1.189 +error:
1.190 + if (priv->tick_id != 0) {
1.191 + g_source_remove(priv->tick_id);
1.192 + priv->tick_id = 0;
1.193 }
1.194 + priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len);
1.195 + g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket");
1.196 }
1.197 #endif
1.198