diff -r 55c7f31b85aa -r 56e39289fa96 gst-plugins-mythtv/src/gstmythtvsrc.c --- a/gst-plugins-mythtv/src/gstmythtvsrc.c Tue Nov 07 22:53:53 2006 +0000 +++ b/gst-plugins-mythtv/src/gstmythtvsrc.c Mon Nov 13 19:58:37 2006 +0000 @@ -38,14 +38,14 @@ #define GMYTHTV_TRANSFER_MAX_WAITS 100 -#define GMYTHTV_TRANSFER_MAX_BUFFER 4*1024 +#define GMYTHTV_TRANSFER_MAX_BUFFER 128*1024 //( 32*1024 ) /* 4*1024 ??? */ -#define MAX_READ_SIZE 4*1024 +#define MAX_READ_SIZE 2*1024 //( 32*1024 ) -#define GST_FLOW_ERROR_NO_DATA -101 +#define GST_FLOW_ERROR_NO_DATA -101 /* stablish a maximum iteration value to the IS_RECORDING message */ static guint wait_to_transfer = 0; @@ -56,18 +56,11 @@ "Control and receive data as a client over the network via raw socket connections using the MythTV protocol", "Rosfran Borges " ); -static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ( "src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS ("video/x-nuv") ); - -/* -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_SOMETIMES, - GST_STATIC_CAPS ("video/x-nuv") ); -*/ - + enum { PROP_0, @@ -86,13 +79,18 @@ static void gst_mythtv_src_finalize (GObject * gobject); +/* static GstFlowReturn gst_mythtv_src_create (GstBaseSrc * psrc, guint64 offset, guint size, GstBuffer ** outbuf); +*/ + +//static GstFlowReturn gst_mythtv_src_chain ( GstPad* pad, GstBuffer* outbuf ); +static GstFlowReturn gst_mythtv_src_create ( GstPushSrc* psrc, GstBuffer** outbuf ); static gboolean gst_mythtv_src_start (GstBaseSrc * bsrc); static gboolean gst_mythtv_src_stop (GstBaseSrc * bsrc); static gboolean gst_mythtv_src_get_size (GstBaseSrc * bsrc, guint64 * size); -static gboolean gst_mythtv_src_is_seekable( GstBaseSrc *push_src ); +//static gboolean gst_mythtv_src_is_seekable( GstBaseSrc *push_src ); static gboolean gst_mythtv_src_next_program_chain ( GstMythtvSrc *src ); @@ -127,11 +125,11 @@ "MythTV src"); } -GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstBaseSrc, - GST_TYPE_BASE_SRC, _urihandler_init) +//GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstBaseSrc, +// GST_TYPE_BASE_SRC, _urihandler_init) -//GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstPushSrc, -// GST_TYPE_PUSH_SRC, _urihandler_init) +GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstPushSrc, + GST_TYPE_PUSH_SRC, _urihandler_init) static void gst_mythtv_src_base_init (gpointer g_class) @@ -140,21 +138,23 @@ gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&srctemplate)); - + gst_element_class_set_details (element_class, &gst_mythtv_src_details); element_class->change_state = gst_mythtv_src_change_state; + } static void gst_mythtv_src_class_init (GstMythtvSrcClass * klass) { GObjectClass *gobject_class; - //GstPushSrcClass *gstpushsrc_class; + GstPushSrcClass *gstpushsrc_class; GstBaseSrcClass *gstbasesrc_class; gobject_class = (GObjectClass *) klass; gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpushsrc_class = (GstPushSrcClass *) klass; gobject_class->set_property = gst_mythtv_src_set_property; gobject_class->get_property = gst_mythtv_src_get_property; @@ -222,9 +222,10 @@ gstbasesrc_class->start = gst_mythtv_src_start; gstbasesrc_class->stop = gst_mythtv_src_stop; gstbasesrc_class->get_size = gst_mythtv_src_get_size; - gstbasesrc_class->is_seekable = gst_mythtv_src_is_seekable; - - gstbasesrc_class->create = gst_mythtv_src_create; + //gstbasesrc_class->is_seekable = gst_mythtv_src_is_seekable; + + //gstbasesrc_class->create = gst_mythtv_src_create; + gstpushsrc_class->create = gst_mythtv_src_create; GST_DEBUG_CATEGORY_INIT (mythtvsrc_debug, "mythtvsrc", 0, "MythTV Client Source"); @@ -265,24 +266,21 @@ this->adapter = NULL; - //this->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); - //gst_element_add_pad (GST_ELEMENT (this), this->sinkpad); + //this->th_read_ahead = NULL; - /* + this->th_mutex = NULL; + this->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); + //gst_pad_set_chain_function ( GST_BASE_SRC_PAD( GST_BASE_SRC( this ) ), + // GST_DEBUG_FUNCPTR ( gst_mythtv_src_chain ) ); gst_element_add_pad (GST_ELEMENT (this), this->srcpad); - */ - gst_base_src_set_format( GST_BASE_SRC( this ), GST_FORMAT_BYTES ); + //gst_base_src_set_format( GST_BASE_SRC( this ), GST_FORMAT_BYTES ); gst_base_src_set_live ( GST_BASE_SRC( this ), TRUE ); - + // gst_pad_set_event_function ( GST_BASE_SRC_PAD(GST_BASE_SRC(this)), // gst_mythtv_src_handle_event ); -/* - gst_pad_set_query_function ( GST_BASE_SRC_PAD(GST_BASE_SRC(this)), - gst_mythtv_src_query ); -*/ } @@ -290,6 +288,16 @@ gst_mythtv_src_finalize (GObject * gobject) { GstMythtvSrc *this = GST_MYTHTV_SRC (gobject); + + if ( this->th_mutex != NULL ) { + g_static_rec_mutex_free( this->th_mutex ); + this->th_mutex = NULL; + } + + if ( this->th_read_ahead != NULL ) { + gst_task_stop( this->th_read_ahead ); + this->th_read_ahead = NULL; + } if (this->mythtv_caps) { gst_caps_unref (this->mythtv_caps); @@ -328,19 +336,20 @@ /* Loop sending the Myth File Transfer request: * Retry whilst authentication fails and we supply it. */ gint len = 0; + gint8 *data_ptr = g_malloc0( size ); - GST_OBJECT_LOCK(src); + //GST_OBJECT_LOCK(src); while ( sizetoread > 0 ) { len = gmyth_file_transfer_read( src->file_transfer, - GST_BUFFER_DATA( *outbuf ) + read, sizetoread, TRUE ); + data_ptr + read, sizetoread, TRUE ); if ( len > 0 ) { - read += len; + read += len; sizetoread -= len; } - else if ( len <= 0 ) + else if ( len <= 0 ) { read = -1; @@ -366,16 +375,15 @@ goto get_file_pos; g_print( "\t[%s]\tGET_POSITION: file_position = %lld\n", __FUNCTION__, size_tmp ); - } else if ( abs( src->content_size - src->bytes_read ) < GMYTHTV_TRANSFER_MAX_BUFFER ) { - src->prev_content_size = src->content_size; + } /*else { gint64 new_offset = gmyth_file_transfer_get_file_position( src->file_transfer ); if ( new_offset > 0 && src->content_size <= new_offset ) { src->content_size = new_offset; } else { src->update_prog_chain = TRUE; } - } - goto done; + src->prev_content_size = src->content_size; + }*/ } goto done; } @@ -389,19 +397,21 @@ if ( read > 0 ) { src->read_offset += read; src->bytes_read += read; - //src->content_size += src->bytes_read; g_print( "[%s]\tBYTES READ (actual) = %d, BYTES READ (cumulative) = %llu, "\ - "OFFSET = %llu, CONTENT SIZE = %llu.\n", __FUNCTION__, read, src->bytes_read, - src->read_offset, src->content_size ); + "OFFSET = %llu, CONTENT SIZE = %llu.\n", __FUNCTION__, read, src->bytes_read, + src->read_offset, src->content_size ); GST_BUFFER_SIZE (*outbuf) = read; //GST_BUFFER_SIZE (buffer) = read; + GST_BUFFER_MALLOCDATA( *outbuf ) = g_malloc0( GST_BUFFER_SIZE (*outbuf) ); + GST_BUFFER_DATA( *outbuf ) = GST_BUFFER_MALLOCDATA( *outbuf ); + g_memmove( GST_BUFFER_DATA( *outbuf ), data_ptr, read ); GST_BUFFER_OFFSET (*outbuf) = offset; //GST_BUFFER_OFFSET (buffer) = offset; GST_BUFFER_OFFSET_END (*outbuf) = offset + read;//GST_BUFFER_OFFSET_END (buffer) = offset + read; g_print( "Got buffer: [%s]\t\tBUFFER --->SIZE = %d, OFFSET = %llu, "\ - "OFFSET_END = %llu.\n\n", __FUNCTION__, GST_BUFFER_SIZE (*outbuf), - GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf) ); + "OFFSET_END = %llu.\n\n", __FUNCTION__, GST_BUFFER_SIZE (*outbuf), + GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf) ); } else if ( !src->live_tv ) goto eos; @@ -412,58 +422,91 @@ src->eos = TRUE; done: - GST_OBJECT_UNLOCK(src); + //GST_OBJECT_UNLOCK(src); return read; } +#if 0 static GstFlowReturn gst_mythtv_src_create ( GstBaseSrc * psrc, guint64 offset, guint size, GstBuffer **outbuf) { GstMythtvSrc *src; GstFlowReturn ret = GST_FLOW_OK; gint read = -1; + gint adapter_size = -1; + + guint max_adapter_rep = 40; src = GST_MYTHTV_SRC (psrc); + /* The caller should know the number of bytes and not read beyond EOS. */ if (G_UNLIKELY (src->eos)) goto eos; if ( G_UNLIKELY (src->update_prog_chain) ) goto change_progchain; + + g_static_rec_mutex_lock( src->th_mutex ); - GST_OBJECT_LOCK(src); + while ( ( ( adapter_size = gst_adapter_available_fast( src->adapter ) ) < size ) && + --max_adapter_rep > 0 ) + { + g_print ( "[%s] %d - Waiting for read_ahead task...\n", __FUNCTION__, max_adapter_rep ); + GST_TASK_WAIT( src->th_read_ahead ); + } + + g_static_rec_mutex_unlock( src->th_mutex ); - if (G_UNLIKELY (src->read_offset != offset)) { - gint64 new_offset = gmyth_file_transfer_seek(src->file_transfer, offset, SEEK_SET); - g_print( "[%s] SRC Offset = %lld, NEW actual backend SEEK Offset = %lld.\n", - __FUNCTION__, src->read_offset, new_offset ); - if (G_UNLIKELY (new_offset < 0 ) )//|| new_offset != src->read_offset)) { - { - GST_OBJECT_UNLOCK(src); - if ( src->live_tv ) - goto change_progchain; - else - goto eos; - } - + gint64 new_offset = -1; + /* just get from the adapter, no network effort... */ + if ( offset > src->adapter_offset && size <= adapter_size ) + { + + GstBuffer *buf = gst_adapter_take_buffer( src->adapter, size ); + *outbuf = gst_buffer_create_sub( buf, offset, size ); + src->read_offset = new_offset = offset; + read = size; + + gst_adapter_flush( src->adapter, size ); + + } else { + /* no data on adapter... do all these mythtv network calls! */ + + /* verify if it needs to seek */ + if ( src->read_offset != offset ) + { + + new_offset = gmyth_file_transfer_seek( src->file_transfer, offset, SEEK_SET ); + + g_print( "[%s] SRC Offset = %lld, NEW actual backend SEEK Offset = %lld.\n", + __FUNCTION__, src->read_offset, new_offset ); + if ( G_UNLIKELY (new_offset < 0 ) ) + { + if ( src->live_tv ) + goto change_progchain; + else + goto eos; + } + + } + src->read_offset = offset; - } - GST_OBJECT_UNLOCK(src); - - /* Create the buffer. */ - ret = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), - src->read_offset, size, - //src->icy_caps ? src->icy_caps : - GST_PAD_CAPS (GST_BASE_SRC_PAD (GST_BASE_SRC (psrc))), outbuf); - - if (G_UNLIKELY (ret != GST_FLOW_OK)) { - if ( src->live_tv ) - goto change_progchain; - else - goto done; - } - - read = do_read_request_response ( src, src->read_offset, size, outbuf ); + + /* Create the buffer. */ + ret = gst_pad_alloc_buffer ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), + src->read_offset, size, + GST_PAD_CAPS ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)) ), outbuf); + + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + if ( src->live_tv ) + goto change_progchain; + else + goto done; + } + + read = do_read_request_response ( src, src->read_offset, size, outbuf ); + + } if (G_UNLIKELY (src->update_prog_chain) ) goto change_progchain; @@ -474,6 +517,29 @@ else goto read_error; } + + if ( read > 0 ) { + src->read_offset += read; + src->bytes_read += read; + + #if 0 + g_print( "[%s]\tBYTES READ (actual) = %d, BYTES READ (cumulative) = %llu, "\ + "OFFSET = %llu, CONTENT SIZE = %llu.\n", __FUNCTION__, read, src->bytes_read, + src->read_offset, src->content_size ); + + GST_BUFFER_SIZE (*outbuf) = read; //GST_BUFFER_SIZE (buffer) = read; + //GST_BUFFER_MALLOCDATA( *outbuf ) = g_malloc0( GST_BUFFER_SIZE (*outbuf) ); + //GST_BUFFER_DATA( *outbuf ) = GST_BUFFER_MALLOCDATA( *outbuf ); + //g_memmove( GST_BUFFER_DATA( *outbuf ), data_ptr, read ); + GST_BUFFER_OFFSET (*outbuf) = offset; //GST_BUFFER_OFFSET (buffer) = offset; + GST_BUFFER_OFFSET_END (*outbuf) = offset + read;//GST_BUFFER_OFFSET_END (buffer) = offset + read; + + g_print( "Got buffer: [%s]\t\tBUFFER --->SIZE = %d, OFFSET = %llu, "\ + "OFFSET_END = %llu.\n\n", __FUNCTION__, GST_BUFFER_SIZE (*outbuf), + GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf) ); + #endif + + } done: { @@ -502,13 +568,101 @@ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Seek failed, go to the next program info... (%i, %s)", read, src->uri_name)); + + gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (src)), + gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); // go to the next program chain src->unique_setup = FALSE; src->update_prog_chain = TRUE; - //GST_OBJECT_LOCK(src); gst_mythtv_src_next_program_chain( src ); - //GST_OBJECT_UNLOCK(src); + + return GST_FLOW_ERROR_NO_DATA; + } + +} +#endif + +static GstFlowReturn +gst_mythtv_src_create ( GstPushSrc* psrc, GstBuffer** outbuf ) +{ + GstMythtvSrc *src; + GstFlowReturn ret = GST_FLOW_OK; + gint read = -1; + + src = GST_MYTHTV_SRC ( psrc ); + + /* The caller should know the number of bytes and not read beyond EOS. */ + if (G_UNLIKELY (src->eos)) + goto eos; + if ( G_UNLIKELY (src->update_prog_chain) ) + goto change_progchain; + + //gint64 new_offset = -1; + /* just get from the adapter, no network effort... */ + //do + //{ + /* Create the buffer. */ + ret = gst_pad_alloc_buffer ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), + GST_BUFFER_OFFSET_NONE, MAX_READ_SIZE, + GST_PAD_CAPS (GST_BASE_SRC_PAD (GST_BASE_SRC (psrc))), outbuf ); + + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + if ( src->live_tv ) + goto change_progchain; + else + goto done; + } + + read = do_read_request_response ( src, src->read_offset, MAX_READ_SIZE, outbuf ); + + if (G_UNLIKELY (src->update_prog_chain) ) + goto change_progchain; + + if (G_UNLIKELY (read <= 0) || *outbuf == NULL) { + if ( src->live_tv ) + goto change_progchain; + else + goto read_error; + } + + //} while ( TRUE ); + +done: + { + const gchar *reason = gst_flow_get_name (ret); + + GST_DEBUG_OBJECT (src, "DONE task, reason %s", reason); + return ret; + } +eos: + { + const gchar *reason = gst_flow_get_name (ret); + + GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); + return GST_FLOW_UNEXPECTED; + } + /* ERRORS */ +read_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + (NULL), ("Could not read any bytes (%i, %s)", read, + src->uri_name)); + return GST_FLOW_ERROR; + } +change_progchain: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + (NULL), ("Seek failed, go to the next program info... (%i, %s)", read, + src->uri_name)); + + gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), + gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); + // go to the next program chain + src->unique_setup = FALSE; + src->update_prog_chain = TRUE; + + gst_mythtv_src_next_program_chain( src ); return GST_FLOW_ERROR_NO_DATA; } @@ -541,6 +695,43 @@ } +#if 0 +static void +gst_mythtv_src_read_ahead ( void *data ) { + + GstMythtvSrc *src = NULL; + + GstBuffer *buffer = NULL; + + guint size = 512; + + gint read = 0; + + src = GST_MYTHTV_SRC( data ); + + GST_PAD_STREAM_TRYLOCK( GST_BASE_SRC_PAD (GST_BASE_SRC (src)) ); + + /*if ( gst_adapter_available( src->adapter ) ) && ( read < MAX_READ_SIZE )) */ + do { + buffer = gst_buffer_new_and_alloc( size ); + + read += do_read_request_response ( src, src->adapter_offset, size, &buffer ); + + gst_adapter_push( src->adapter, buffer ); + + //gst_buffer_unref( buffer ); + } while ( read < MAX_READ_SIZE ); + + GST_PAD_BLOCK_SIGNAL( GST_BASE_SRC_PAD (GST_BASE_SRC (src)) ); + + GST_PAD_STREAM_UNLOCK( GST_BASE_SRC_PAD (GST_BASE_SRC (src)) ); + + gst_object_unref( src ); + + return; +} +#endif + /* create a socket for connecting to remote server */ static gboolean gst_mythtv_src_start ( GstBaseSrc * bsrc ) @@ -560,13 +751,13 @@ goto done; } - GST_OBJECT_LOCK(src); + //GST_OBJECT_LOCK(src); if ( src->live_tv ) { src->spawn_livetv = gmyth_livetv_new( ); if ( gmyth_livetv_setup( src->spawn_livetv ) == FALSE ) { ret = FALSE; - GST_OBJECT_UNLOCK( src ); + //GST_OBJECT_UNLOCK( src ); goto init_failed; } @@ -585,7 +776,7 @@ g_string_new( src->uri_name ), -1, src->mythtv_version ); if ( src->file_transfer == NULL ) { - GST_OBJECT_UNLOCK(src); + //GST_OBJECT_UNLOCK(src); goto init_failed; } @@ -607,7 +798,7 @@ ret = gmyth_file_transfer_setup( &(src->file_transfer), src->live_tv ); if ( ret == FALSE ) { - GST_OBJECT_UNLOCK(src); + //GST_OBJECT_UNLOCK(src); #ifndef GST_DISABLE_GST_DEBUG if ( src->mythtv_msgs_dbg ) g_printerr( "MythTV FileTransfer request failed when setting up socket connection!\n" ); @@ -618,8 +809,14 @@ src->content_size = src->file_transfer->filesize; src->do_start = FALSE; + + if ( src->live_tv ) { + //src->adapter = gst_adapter_new(); + //g_static_rec_mutex_init( src->th_mutex ); + //src->th_read_ahead = gst_task_create( (GstTaskFunction)gst_mythtv_src_read_ahead, src ); + } - GST_OBJECT_UNLOCK(src); + //GST_OBJECT_UNLOCK(src); done: return TRUE; @@ -645,13 +842,15 @@ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Seek failed, go to the next program info... (%s)", src->uri_name)); + + gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (src)), + gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); + // go to the next program chain src->unique_setup = FALSE; src->update_prog_chain = TRUE; - //GST_OBJECT_LOCK(src); gst_mythtv_src_next_program_chain( src ); - //GST_OBJECT_UNLOCK(src); return TRUE; } @@ -673,6 +872,8 @@ } else { goto done; } + + GST_PAD_STREAM_LOCK( GST_BASE_SRC_PAD (GST_BASE_SRC (src)) ); if (src->file_transfer) { g_object_unref (src->file_transfer); @@ -748,11 +949,13 @@ while ( src->content_size < GMYTHTV_TRANSFER_MAX_BUFFER*4 ) src->content_size = gst_mythtv_src_get_position( src ); - src->read_offset = 0; - - src->update_prog_chain = FALSE; + src->read_offset = 0; done: + src->update_prog_chain = FALSE; + + GST_PAD_STREAM_UNLOCK( GST_BASE_SRC_PAD (GST_BASE_SRC (src)) ); + return TRUE; /* ERRORS */ @@ -1007,11 +1210,13 @@ } #endif +#if 0 static gboolean gst_mythtv_src_is_seekable( GstBaseSrc *push_src ) { return TRUE; } +#endif #if 0 static GstFlowReturn @@ -1172,15 +1377,14 @@ case GST_STATE_CHANGE_PAUSED_TO_READY: g_print( "[%s] PAUSED to READY called!\n", __FUNCTION__ ); if ( src->live_tv && src->update_prog_chain ) { - GstPad *pad_peer; - gst_pad_push_event ( pad_peer = gst_pad_get_peer( GST_BASE_SRC_PAD (GST_BASE_SRC (src)) ), - gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_BYTES, 0, -1, 0 ) ); + + gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (src)), + gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); + src->read_offset = 0; src->bytes_read = 0; - src->unique_setup = FALSE; - GST_OBJECT_LOCK( src ); + src->unique_setup = FALSE; gst_mythtv_src_next_program_chain( src ); - GST_OBJECT_UNLOCK( src ); } break; default: