diff -r 949291aaba65 -r 57f3de326cd8 gmyth-stream/gmemcoder/src/gmencoder.c --- a/gmyth-stream/gmemcoder/src/gmencoder.c Fri Jun 29 14:40:01 2007 +0100 +++ b/gmyth-stream/gmemcoder/src/gmencoder.c Sat Jun 30 15:05:50 2007 +0100 @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include "gmencoder.h" @@ -16,7 +18,7 @@ // #define SUPPORT_MULT_INPUT 0 #define USE_MANUAL_SINK -#define BUFFER_SIZE 512 +#define BUFFER_SIZE (1024 * 64) typedef struct _GMencoderPrivate GMencoderPrivate; typedef struct _SetupInfo SetupInfo; @@ -49,6 +51,9 @@ gint sources; gint tick_id; gint64 duration; +#ifdef USE_MANUAL_SINK + GByteArray *queue; +#endif }; enum { @@ -99,6 +104,7 @@ gchar ** audio_encode_prop, guint audio_rate); #ifdef USE_MANUAL_SINK +static void _flush_queue (GMencoder *self); static void _buffer_arrive_cb (GstElement* object, GstBuffer* buff, GstPad* pad, @@ -163,6 +169,9 @@ { GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self); priv->info = g_new0(SetupInfo, 1); +#ifdef USE_MANUAL_SINK + priv->queue = g_byte_array_new (); +#endif } static void @@ -173,8 +182,14 @@ static void g_mencoder_finalize(GObject * object) { + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(object); + // TODO: clear vars g_mencoder_close_stream(G_MENCODER(object)); + g_free (priv->info); +#ifdef USE_MANUAL_SINK + g_byte_array_free (priv->queue, TRUE); +#endif } GMencoder * @@ -657,7 +672,6 @@ GstElement *ap = NULL; GstElement *vp = NULL; #endif - GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self); pipe = gst_pipeline_new("pipe"); @@ -666,6 +680,7 @@ vp = gst_element_factory_make("concatmux", "vp"); gst_bin_add_many(GST_BIN(pipe), ap, vp, NULL); #endif + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self); mux = gst_element_factory_make((mux_name ? mux_name : "ffmux_mpeg"), @@ -678,10 +693,11 @@ #ifdef USE_MANUAL_SINK sink = gst_element_factory_make("fakesink", "sink"); + g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL); g_signal_connect (G_OBJECT (sink), "handoff", G_CALLBACK (_buffer_arrive_cb), - GINT_TO_POINTER (priv->fd)); + self); #else sink = gst_element_factory_make("fdsink", "sink"); if (sink == NULL) @@ -886,6 +902,7 @@ i = g_strsplit(uri, "://", 0); if (strcmp(i[0], "fd") == 0) { priv->fd = atoi(i[1]); + fcntl (priv->fd, F_SETFL, O_ASYNC); } else if (strcmp(i[0], "file") == 0) { if (g_file_test (i[1], G_FILE_TEST_EXISTS)) { if (unlink (i[1]) != 0) { @@ -971,6 +988,9 @@ case GST_MESSAGE_EOS: priv->ready = FALSE; +#ifdef USE_MANUAL_SINK + _flush_queue (G_MENCODER (user_data)); +#endif g_signal_emit(user_data, g_mencoder_signals[EOS], 0); break; @@ -1054,23 +1074,80 @@ #ifdef USE_MANUAL_SINK +static gboolean +_send_buffer (gint fd, gpointer buff, gint size) +{ + gboolean ret = TRUE; + gchar *msg; + GByteArray *b_send; + + b_send = g_byte_array_new (); + msg = g_strdup_printf ("%x\r\n", size); + b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar)); + g_free (msg); + + b_send = g_byte_array_append (b_send, buff, size); + + msg = g_strdup ("\r\n"); + b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar)); + g_free (msg); + + if (send (fd, b_send->data, b_send->len, MSG_MORE) <= 0) + ret = FALSE; + g_byte_array_free (b_send, TRUE); + + return ret; +} + +static void +_flush_queue (GMencoder *self) +{ + gchar *end_msg; + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self); + + if (BUFFER_SIZE == 0) + return; + + if (priv->queue->len > 0) { + _send_buffer (priv->fd, priv->queue->data, priv->queue->len); + priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len); + } + + end_msg = g_strdup ("0\r\n\r\n"); + write (priv->fd, (const guint8*) end_msg, strlen(end_msg) * sizeof(gchar)); + g_free (end_msg); + return; +} static void _buffer_arrive_cb (GstElement* object, GstBuffer* buff, GstPad* pad, gpointer user_data) { - static GByteArray *queue = NULL; - static guint offset = 0; + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data); - gint fd = GPOINTER_TO_INT (user_data); + if (BUFFER_SIZE == 0) { + if (_send_buffer (priv->fd, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE) + goto error; + return; + } - queue = g_byte_array_append (queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)); + priv->queue = g_byte_array_append (priv->queue, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)); + while (priv->queue->len >= BUFFER_SIZE) { + //g_usleep (0.2 * G_USEC_PER_SEC); + if (_send_buffer (priv->fd, priv->queue->data, BUFFER_SIZE) == FALSE) + goto error; + priv->queue = g_byte_array_remove_range (priv->queue, 0, BUFFER_SIZE); + } + return; - if (queue->len >= BUFFER_SIZE) { - write (fd, queue->data, BUFFER_SIZE); - queue = g_byte_array_remove_range (queue, 0, BUFFER_SIZE); +error: + if (priv->tick_id != 0) { + g_source_remove(priv->tick_id); + priv->tick_id = 0; } + priv->queue = g_byte_array_remove_range (priv->queue, 0, priv->queue->len); + g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Error on socket"); } #endif