# HG changeset patch # User renatofilho # Date 1187271971 -3600 # Node ID add4025ca67811e32178614334c12faee7409eae # Parent d35b50f4d77e0482b7e3661d9e5e14ef5ce26ff7 [svn r813] created a timeout function for invalid channels diff -r d35b50f4d77e -r add4025ca678 gmyth-stream/gmemcoder/src/gmencoder.c --- a/gmyth-stream/gmemcoder/src/gmencoder.c Thu Aug 16 14:45:22 2007 +0100 +++ b/gmyth-stream/gmemcoder/src/gmencoder.c Thu Aug 16 14:46:11 2007 +0100 @@ -18,6 +18,9 @@ #define G_MENCODER_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), G_TYPE_MENCODER, GMencoderPrivate)) +#define USE_MANUAL_SINK +#define GMENCODER_TIMEOUT 5000 + typedef struct _GMencoderPrivate GMencoderPrivate; typedef struct _SetupInfo SetupInfo; @@ -42,7 +45,7 @@ GstElement *sink; GstElement *src; - gint out_fd; + GnomeVFSHandle *handle; gboolean ready; SetupInfo *info; @@ -52,7 +55,7 @@ gint tick_id; gint64 duration; gboolean send_chunked; - + gint timeout_id; //V4l info GstElement *v4lsrc; @@ -110,10 +113,16 @@ gchar ** audio_encode_prop, guint audio_rate, gboolean deinterlace); +static gboolean _process_timeout_cb (gpointer user_data); +#ifdef USE_MANUAL_SINK static void _flush_queue (GMencoder *self); static void _buffer_arrive_cb (GstElement* object, GstBuffer* buff, + GstPad* pad, gpointer user_data); +#endif + + static gboolean _tick_cb(gpointer data); static guint g_mencoder_signals[LAST_SIGNAL] = { 0 }; @@ -638,7 +647,15 @@ g_return_if_fail(priv->ready == FALSE); priv->ready = TRUE; gst_element_set_state(priv->pipe, GST_STATE_PLAYING); + if (priv->tick_id != 0) { + g_source_remove (priv->tick_id); + } priv->tick_id = g_timeout_add(500, _tick_cb, self); + + if (priv->timeout_id != 0) { + g_source_remove (priv->timeout_id); + } + priv->timeout_id = g_timeout_add(GMENCODER_TIMEOUT, _process_timeout_cb, self); } void @@ -659,6 +676,11 @@ priv->tick_id = 0; } + if (priv->timeout_id != 0) { + g_source_remove (priv->timeout_id); + priv->timeout_id = 0; + } + if (priv->pipe != NULL) { // TODO: fixe pipeline dispose //gst_element_set_state (priv->pipe, GST_STATE_NULL); @@ -670,6 +692,7 @@ priv->pipe = NULL; priv->abin = NULL; priv->vbin = NULL; + priv->sink = NULL; } priv->ready = FALSE; } @@ -694,10 +717,8 @@ GstElement *abin = NULL; GstElement *vbin = NULL; GstElement *queue = NULL; - GstElement *identity = NULL; GstPad *aux_pad = NULL; GstPad *mux_pad = NULL; - GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE (self); pipe = gst_pipeline_new("pipe"); @@ -708,10 +729,11 @@ goto error; queue = gst_element_factory_make("queue", "queueu_sink"); - identity = gst_element_factory_make ("identity", "identity"); - sink = gst_element_factory_make("fdsink", "sink"); - g_object_set (sink, "fd", priv->out_fd, NULL); - g_signal_connect (G_OBJECT (identity), + + + sink = gst_element_factory_make("fakesink", "sink"); + g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL); + g_signal_connect (G_OBJECT (sink), "handoff", G_CALLBACK (_buffer_arrive_cb), self); @@ -727,7 +749,7 @@ goto error; // Finish Pipe - gst_bin_add_many(GST_BIN(pipe), abin, vbin, mux, queue, identity, sink, NULL); + gst_bin_add_many(GST_BIN(pipe), abin, vbin, mux, queue, sink, NULL); // Link bins with mux @@ -767,12 +789,11 @@ mux_pad = NULL; // Link mux with sink - gst_element_link_many(mux, queue, identity, sink, NULL); + gst_element_link_many(mux, queue, sink, NULL); bus = gst_pipeline_get_bus(GST_PIPELINE(pipe)); gst_bus_add_watch(bus, _pipeline_bus_cb, self); gst_object_unref(bus); - return pipe; error: @@ -928,19 +949,24 @@ _open_output(GMencoder * self, const gchar * uri) { gchar **i; + GnomeVFSResult result; GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self); - priv->out_fd = 0; - i = g_strsplit(uri, "://", 0); if (strcmp(i[0], "fd") == 0) { - priv->out_fd = atoi (i[1]); + result = gnome_vfs_open_fd (&priv->handle, atoi(i[1])); } else { - priv->out_fd = open (i[1], O_CREAT|O_WRONLY|O_TRUNC, 0666); + if (g_file_test (i[1], G_FILE_TEST_EXISTS) == FALSE) { + result = gnome_vfs_create (&priv->handle, uri, GNOME_VFS_OPEN_WRITE, TRUE, + GNOME_VFS_PERM_USER_WRITE | GNOME_VFS_PERM_USER_READ | GNOME_VFS_PERM_GROUP_READ); + } else { + result = gnome_vfs_open (&priv->handle, uri, + GNOME_VFS_OPEN_WRITE | GNOME_VFS_OPEN_TRUNCATE); + } } g_strfreev(i); - return (priv->out_fd > 0); + return (result == GNOME_VFS_OK); } static gboolean @@ -1003,7 +1029,9 @@ case GST_MESSAGE_EOS: priv->ready = FALSE; +#ifdef USE_MANUAL_SINK _flush_queue (G_MENCODER (user_data)); +#endif g_signal_emit(user_data, g_mencoder_signals[EOS], 0); break; @@ -1085,40 +1113,50 @@ return TRUE; } +static gboolean +_process_timeout_cb (gpointer user_data) +{ + GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data); -static void -_chunke_buffer (GstBuffer *buf) + g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "timeout"); + priv->timeout_id = 0; + return FALSE; +} + + +#ifdef USE_MANUAL_SINK +static gboolean +_send_buffer (GnomeVFSHandle *handle, gpointer buff, gint size) { gchar *msg; GByteArray *b_send; + GnomeVFSResult result; + GnomeVFSFileSize bytes_written; b_send = g_byte_array_new (); - msg = g_strdup_printf ("%x\r\n", GST_BUFFER_SIZE(buf)); + msg = g_strdup_printf ("%x\r\n", size); b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar)); g_free (msg); - b_send = g_byte_array_append (b_send, GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf)); + b_send = g_byte_array_append (b_send, buff, size); msg = g_strdup ("\r\n"); b_send = g_byte_array_append (b_send, (const guint8*) msg, strlen (msg) * sizeof (gchar)); g_free (msg); - g_free (GST_BUFFER_DATA(buf)); + result = gnome_vfs_write (handle, b_send->data, b_send->len, &bytes_written); + g_byte_array_free (b_send, TRUE); - GST_BUFFER_SIZE(buf)=b_send->len; - GST_BUFFER_DATA(buf)=b_send->data; - - g_byte_array_free (b_send, FALSE); + return (result == GNOME_VFS_OK); } static void _flush_queue (GMencoder *self) { - /* GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(self); - if (priv->send_chunked) { + GnomeVFSFileSize bytes_written; gchar *end_msg; end_msg = g_strdup ("0\r\n\r\n"); gnome_vfs_write (priv->handle, @@ -1127,17 +1165,46 @@ &bytes_written); g_free (end_msg); } - */ } static void _buffer_arrive_cb (GstElement* object, - GstBuffer* buf, + GstBuffer* buff, + GstPad* pad, gpointer user_data) { GMencoderPrivate *priv = G_MENCODER_GET_PRIVATE(user_data); + if (priv->timeout_id != 0) { + g_source_remove (priv->timeout_id); + priv->timeout_id = 0; + } + if (priv->send_chunked) { - _chunke_buffer (buf); + if (_send_buffer (priv->handle, GST_BUFFER_DATA (buff), GST_BUFFER_SIZE (buff)) == FALSE) + goto error; + } else { + GnomeVFSResult result; + GnomeVFSFileSize bytes_written; + + result = gnome_vfs_write (priv->handle, + GST_BUFFER_DATA (buff), + GST_BUFFER_SIZE (buff), + &bytes_written); + + if (result != GNOME_VFS_OK) + goto error; } + + return; + +error: + if (priv->tick_id != 0) { + g_source_remove(priv->tick_id); + priv->tick_id = 0; + } + g_signal_emit(user_data, g_mencoder_signals[ERROR], 0, "Fail to write on socket"); + gst_element_set_state (priv->pipe, GST_STATE_PAUSED); } + +#endif