# HG changeset patch # User rosfran # Date 1164154593 0 # Node ID d0f379fc4173cd49a752244ca3f0b7efa3d2eb35 # Parent 3b97ffa0634c15aad2ffb8ccbf764ff3117e1e9e [svn r99] Removed GstAdapter, changed it to GByteArray. diff -r 3b97ffa0634c -r d0f379fc4173 gst-plugins-mythtv/src/gstmythtvsrc.c --- a/gst-plugins-mythtv/src/gstmythtvsrc.c Wed Nov 22 00:15:38 2006 +0000 +++ b/gst-plugins-mythtv/src/gstmythtvsrc.c Wed Nov 22 00:16:33 2006 +0000 @@ -24,6 +24,8 @@ #include #include +#include + #include #include @@ -42,12 +44,12 @@ //( 32*1024 ) /* 4*1024 ??? */ -#define MAX_READ_SIZE 6*1024 +#define MAX_READ_SIZE 4*1024 //( 32*1024 ) -#define GST_FLOW_ERROR_NO_DATA -101 +#define GST_FLOW_ERROR_NO_DATA -101 -#define INTERNAL_BUFFER_SIZE 30*1024 +#define INTERNAL_BUFFER_SIZE 64*1024 /* stablish a maximum iteration value to the IS_RECORDING message */ static guint wait_to_transfer = 0; @@ -81,20 +83,20 @@ static void gst_mythtv_src_finalize (GObject * gobject); -/* static GstFlowReturn gst_mythtv_src_create (GstBaseSrc * psrc, guint64 offset, guint size, GstBuffer ** outbuf); -*/ -//static GstFlowReturn gst_mythtv_src_chain ( GstPad* pad, GstBuffer* outbuf ); -static GstFlowReturn gst_mythtv_src_create ( GstPushSrc* psrc, GstBuffer** outbuf ); +//static GstFlowReturn gst_mythtv_src_create ( GstPushSrc* psrc, GstBuffer** outbuf ); static gboolean gst_mythtv_src_start (GstBaseSrc * bsrc); static gboolean gst_mythtv_src_stop (GstBaseSrc * bsrc); static gboolean gst_mythtv_src_get_size (GstBaseSrc * bsrc, guint64 * size); static gboolean gst_mythtv_src_is_seekable( GstBaseSrc *push_src ); -static gboolean gst_mythtv_src_do_seek( GstBaseSrc *base, GstSegment *segment ); +//static void gst_mythtv_src_get_times (GstBaseSrc * src, GstBuffer * buffer, +// GstClockTime * start, GstClockTime * end); + +//static gboolean gst_mythtv_src_do_seek( GstBaseSrc *base, GstSegment *segment ); static gboolean gst_mythtv_src_next_program_chain ( GstMythtvSrc *src ); @@ -135,8 +137,8 @@ //GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstBaseSrc, // GST_TYPE_BASE_SRC, _urihandler_init) -GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstPushSrc, - GST_TYPE_PUSH_SRC, _urihandler_init) +GST_BOILERPLATE_FULL (GstMythtvSrc, gst_mythtv_src, GstBaseSrc, + GST_TYPE_BASE_SRC, _urihandler_init) static void gst_mythtv_src_base_init (gpointer g_class) @@ -156,12 +158,12 @@ gst_mythtv_src_class_init (GstMythtvSrcClass * klass) { GObjectClass *gobject_class; - GstPushSrcClass *gstpushsrc_class; + //GstPushSrcClass *gstpushsrc_class; GstBaseSrcClass *gstbasesrc_class; gobject_class = (GObjectClass *) klass; gstbasesrc_class = (GstBaseSrcClass *) klass; - gstpushsrc_class = (GstPushSrcClass *) klass; + //gstpushsrc_class = (GstPushSrcClass *) klass; gobject_class->set_property = gst_mythtv_src_set_property; gobject_class->get_property = gst_mythtv_src_get_property; @@ -231,8 +233,10 @@ gstbasesrc_class->get_size = gst_mythtv_src_get_size; gstbasesrc_class->is_seekable = gst_mythtv_src_is_seekable; - gstbasesrc_class->do_seek = gst_mythtv_src_do_seek; - gstpushsrc_class->create = gst_mythtv_src_create; + //gstbasesrc_class->get_times = gst_mythtv_src_get_times; + + //gstbasesrc_class->do_seek = gst_mythtv_src_do_seek; + gstbasesrc_class->create = gst_mythtv_src_create; GST_DEBUG_CATEGORY_INIT (mythtvsrc_debug, "mythtvsrc", 0, "MythTV Client Source"); @@ -271,14 +275,14 @@ this->eos = FALSE; - this->adapter = NULL; + this->bytes_queue = NULL; //this->th_read_ahead = NULL; this->th_mutex = NULL; - this->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); - gst_element_add_pad (GST_ELEMENT (this), this->srcpad); + //this->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); + //gst_element_add_pad (GST_ELEMENT (this), this->srcpad); gst_base_src_set_format( GST_BASE_SRC( this ), GST_FORMAT_BYTES ); @@ -338,7 +342,6 @@ /* Loop sending the Myth File Transfer request: * Retry whilst authentication fails and we supply it. */ gint len = 0; - //*data_ptr = g_malloc0( size ); //while ( sizetoread > 0 ) { @@ -362,7 +365,7 @@ if ( len == GMYTHTV_FILE_TRANSFER_READ_ERROR ) { /* -314 */ src->update_prog_chain = TRUE; goto done; - } else if ( abs( src->content_size - src->bytes_read ) < GMYTHTV_TRANSFER_MAX_BUFFER ) { + } /*if ( abs( src->content_size - src->bytes_read ) < GMYTHTV_TRANSFER_MAX_BUFFER ) { src->update_prog_chain = TRUE; if ( src->enable_timing_position ) { gint64 size_tmp = 0; @@ -375,7 +378,7 @@ g_print( "\t[%s]\tGET_POSITION: file_position = %lld\n", __FUNCTION__, size_tmp ); } - } + }*/ goto done; } @@ -400,161 +403,6 @@ #if 0 static GstFlowReturn -gst_mythtv_src_create ( GstBaseSrc * psrc, guint64 offset, guint size, GstBuffer **outbuf) -{ - GstMythtvSrc *src; - GstFlowReturn ret = GST_FLOW_OK; - gint read = -1; - gint adapter_size = -1; - - guint max_adapter_rep = 40; - - src = GST_MYTHTV_SRC (psrc); - - /* The caller should know the number of bytes and not read beyond EOS. */ - if (G_UNLIKELY (src->eos)) - goto eos; - if ( G_UNLIKELY (src->update_prog_chain) ) - goto change_progchain; - - g_static_rec_mutex_lock( th_mutexth_mutex ); - - while ( ( ( adapter_size = gst_adapter_available_fast( src->adapter ) ) < size ) && - --max_adapter_rep > 0 ) - { - g_print ( "[%s] %d - Waiting for read_ahead task...\n", __FUNCTION__, max_adapter_rep ); - GST_TASK_WAIT( src->th_read_ahead ); - } - - g_static_rec_mutex_unlock( th_mutexth_mutex ); - - gint64 new_offset = -1; - /* just get from the adapter, no network effort... */ - if ( offset > src->adapter_offset && size <= adapter_size ) - { - - GstBuffer *buf = gst_adapter_take_buffer( src->adapter, size ); - *outbuf = gst_buffer_create_sub( buf, offset, size ); - src->read_offset = new_offset = offset; - read = size; - - gst_adapter_flush( src->adapter, size ); - - } else { - /* no data on adapter... do all these mythtv network calls! */ - - /* verify if it needs to seek */ - if ( src->read_offset != offset ) - { - - new_offset = gmyth_file_transfer_seek( src->file_transfer, offset, SEEK_SET ); - - g_print( "[%s] SRC Offset = %lld, NEW actual backend SEEK Offset = %lld.\n", - __FUNCTION__, src->read_offset, new_offset ); - if ( G_UNLIKELY (new_offset < 0 ) ) - { - if ( src->live_tv ) - goto change_progchain; - else - goto eos; - } - - } - - src->read_offset = offset; - - /* Create the buffer. */ - ret = gst_pad_alloc_buffer ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), - src->read_offset, size, - GST_PAD_CAPS ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)) ), outbuf); - - if (G_UNLIKELY (ret != GST_FLOW_OK)) { - if ( src->live_tv ) - goto change_progchain; - else - goto done; - } - - read = do_read_request_response ( src, src->read_offset, size, outbuf ); - - } - - if (G_UNLIKELY (src->update_prog_chain) ) - goto change_progchain; - - if (G_UNLIKELY (read <= 0) || *outbuf == NULL) { - if ( src->live_tv ) - goto change_progchain; - else - goto read_error; - } - - if ( read > 0 ) { - src->read_offset += read; - src->bytes_read += read; - - #if 0 - g_print( "[%s]\tBYTES READ (actual) = %d, BYTES READ (cumulative) = %llu, "\ - "OFFSET = %llu, CONTENT SIZE = %llu.\n", __FUNCTION__, read, src->bytes_read, - src->read_offset, src->content_size ); - - GST_BUFFER_SIZE (*outbuf) = read; //GST_BUFFER_SIZE (buffer) = read; - //GST_BUFFER_MALLOCDATA( *outbuf ) = g_malloc0( GST_BUFFER_SIZE (*outbuf) ); - //GST_BUFFER_DATA( *outbuf ) = GST_BUFFER_MALLOCDATA( *outbuf ); - //g_memmove( GST_BUFFER_DATA( *outbuf ), data_ptr, read ); - GST_BUFFER_OFFSET (*outbuf) = offset; //GST_BUFFER_OFFSET (buffer) = offset; - GST_BUFFER_OFFSET_END (*outbuf) = offset + read;//GST_BUFFER_OFFSET_END (buffer) = offset + read; - - g_print( "Got buffer: [%s]\t\tBUFFER --->SIZE = %d, OFFSET = %llu, "\ - "OFFSET_END = %llu.\n\n", __FUNCTION__, GST_BUFFER_SIZE (*outbuf), - GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf) ); - #endif - - } - -done: - { - const gchar *reason = gst_flow_get_name (ret); - - GST_DEBUG_OBJECT (src, "DONE task, reason %s", reason); - return ret; - } -eos: - { - const gchar *reason = gst_flow_get_name (ret); - - GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); - return GST_FLOW_UNEXPECTED; - } - /* ERRORS */ -read_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, - (NULL), ("Could not read any bytes (%i, %s)", read, - src->uri_name)); - return GST_FLOW_ERROR; - } -change_progchain: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, - (NULL), ("Seek failed, go to the next program info... (%i, %s)", read, - src->uri_name)); - - gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (src)), - gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); - // go to the next program chain - src->unique_setup = FALSE; - src->update_prog_chain = TRUE; - - gst_mythtv_src_next_program_chain( src ); - - return GST_FLOW_ERROR_NO_DATA; - } - -} -#endif - -static GstFlowReturn gst_mythtv_src_create ( GstPushSrc* psrc, GstBuffer** outbuf ) { GstMythtvSrc *src; @@ -595,7 +443,9 @@ if ( ( src->buffer_remain = gst_adapter_available_fast( src->adapter ) ) < MAX_READ_SIZE ) { - buffer = gst_buffer_new_and_alloc( INTERNAL_BUFFER_SIZE - src->buffer_remain ); + ret = gst_pad_alloc_buffer ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), + src->read_offset /*GST_BUFFER_OFFSET_NONE*/, INTERNAL_BUFFER_SIZE - src->buffer_remain, + GST_PAD_CAPS (GST_BASE_SRC_PAD (GST_BASE_SRC (psrc))), &buffer ); read = do_read_request_response( src, 0, INTERNAL_BUFFER_SIZE - src->buffer_remain, &(GST_BUFFER_DATA(buffer)) ); @@ -618,6 +468,11 @@ src->read_offset, src->content_size ); } + + if ( now == -1 ) + now = gst_element_get_base_time( GST_ELEMENT( src ) ); + //else + // now; size = ( src->buffer_remain < MAX_READ_SIZE) ? src->buffer_remain : MAX_READ_SIZE; @@ -629,6 +484,7 @@ GST_BUFFER_MALLOCDATA( *outbuf ) = g_malloc0( GST_BUFFER_SIZE (*outbuf) ); GST_BUFFER_DATA( *outbuf ) = GST_BUFFER_MALLOCDATA( *outbuf ); g_memmove( GST_BUFFER_DATA( (*outbuf) ), GST_BUFFER_DATA(buffer), GST_BUFFER_SIZE(*outbuf) ); + //GST_BUFFER_TIMESTAMP( *outbuf ) = now; GST_BUFFER_OFFSET (*outbuf) = src->read_offset; GST_BUFFER_OFFSET_END (*outbuf) = src->read_offset + GST_BUFFER_SIZE (*outbuf); @@ -690,6 +546,142 @@ } } +#endif + +static GstFlowReturn +gst_mythtv_src_create ( GstBaseSrc * psrc, guint64 offset, guint size, GstBuffer **outbuf ) +{ + GstMythtvSrc *src; + GstFlowReturn ret = GST_FLOW_OK; + gint read = -1; + gint adapter_size = 0; + guint max_adapter_rep = 1; + + src = GST_MYTHTV_SRC ( psrc ); + + /* The caller should know the number of bytes and not read beyond EOS. */ + if (G_UNLIKELY (src->eos)) + goto eos; + if ( G_UNLIKELY (src->update_prog_chain) ) + goto change_progchain; + + //g_static_rec_mutex_lock( &th_mutex ); + g_print ( "[%s] %d - Adapter size = (%d), offset = %llu, size = %d...\n", __FUNCTION__, + max_adapter_rep, adapter_size, offset, size ); + + /* just get from the adapter, no network effort... */ + //GstBuffer *buffer = NULL; + + g_print ( "[%s]\t\tCreate: buffer_remain: %d\n", __FUNCTION__, + (gint) src->buffer_remain); + + if ( ( src->buffer_remain = src->bytes_queue->len ) < MAX_READ_SIZE ) { + guint8 *buffer = g_malloc0( INTERNAL_BUFFER_SIZE - src->buffer_remain ); + + read = do_read_request_response( src, 0, INTERNAL_BUFFER_SIZE - src->buffer_remain, &(buffer) ); + + if (G_UNLIKELY (read < 0)) { + if ( src->live_tv ) + goto change_progchain; + else + goto read_error; + } + + if ( G_UNLIKELY (src->update_prog_chain) ) + goto change_progchain; + + src->bytes_queue = g_byte_array_append( src->bytes_queue, buffer, read ); + + src->buffer_remain = src->buffer_remain + read; + + g_print( "[%s]\tBYTES READ (actual) = %d, BYTES READ (cumulative) = %llu, "\ + "OFFSET = %llu, CONTENT SIZE = %llu.\n", __FUNCTION__, read, src->bytes_read, + src->read_offset, src->content_size ); + + } + + guint buffer_size = ( src->buffer_remain < MAX_READ_SIZE) ? src->buffer_remain : MAX_READ_SIZE; + + /* Create the buffer. */ + ret = gst_pad_alloc_buffer ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), + offset, buffer_size, + GST_PAD_CAPS (GST_BASE_SRC_PAD (GST_BASE_SRC (psrc))), outbuf ); + + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + if ( src->live_tv ) + goto change_progchain; + else + goto done; + } + + guint8 *buf = g_memdup( src->bytes_queue->data, buffer_size ); + + g_print( "[%s] read = %d, buffer_remain = %d\n", __FUNCTION__, read, src->buffer_remain ); + //src->read_offset = offset; + + GST_BUFFER_SIZE (*outbuf) = buffer_size; + GST_BUFFER_MALLOCDATA( *outbuf ) = g_malloc0( GST_BUFFER_SIZE (*outbuf) ); + GST_BUFFER_DATA( *outbuf ) = GST_BUFFER_MALLOCDATA( *outbuf ); + g_memmove( GST_BUFFER_DATA( (*outbuf) ), buf, GST_BUFFER_SIZE(*outbuf) ); + GST_BUFFER_OFFSET (*outbuf) = offset; + GST_BUFFER_OFFSET_END (*outbuf) = offset + GST_BUFFER_SIZE (*outbuf); + + src->buffer_remain -= GST_BUFFER_SIZE (*outbuf); + + src->read_offset += GST_BUFFER_SIZE (*outbuf); + src->bytes_read += GST_BUFFER_SIZE (*outbuf); + g_print ( "[%s]\t\tBuffer output with size: %d\n", __FUNCTION__, GST_BUFFER_SIZE (*outbuf) ); + + src->bytes_queue = g_byte_array_remove_range( src->bytes_queue, 0, buffer_size ); + + g_print( "Got buffer: [%s]\t\tBUFFER --->SIZE = %d, OFFSET = %llu, "\ + "OFFSET_END = %llu.\n\n", __FUNCTION__, GST_BUFFER_SIZE (*outbuf), + GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf) ); + + /* just get from the adapter, no network effort... */ + + return ret; + +done: + { + const gchar *reason = gst_flow_get_name (ret); + + GST_DEBUG_OBJECT (src, "DONE task, reason %s", reason); + return ret; + } +eos: + { + const gchar *reason = gst_flow_get_name (ret); + + GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); + return GST_FLOW_UNEXPECTED; + } + /* ERRORS */ +read_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + (NULL), ("Could not read any bytes (%i, %s)", read, + src->uri_name)); + return GST_FLOW_ERROR; + } +change_progchain: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + (NULL), ("Seek failed, go to the next program info... (%i, %s)", read, + src->uri_name)); + + gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (psrc)), + gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); + // go to the next program chain + src->unique_setup = FALSE; + src->update_prog_chain = TRUE; + + gst_mythtv_src_next_program_chain( src ); + + return GST_FLOW_ERROR_NO_DATA; + } + +} gint64 gst_mythtv_src_get_position ( GstMythtvSrc* src ) @@ -717,6 +709,7 @@ } +#if 0 static gboolean gst_mythtv_src_do_seek( GstBaseSrc *base, GstSegment *segment ) { @@ -792,6 +785,7 @@ } } +#endif #if 0 static void @@ -871,6 +865,7 @@ } if ( src->live_tv ) { + gmyth_context_initialize(); src->spawn_livetv = gmyth_livetv_new( ); if ( gmyth_livetv_setup( src->spawn_livetv ) == FALSE ) { ret = FALSE; @@ -907,6 +902,8 @@ /*|| ( gmyth_file_transfer_get_file_position( src->file_transfer ) < ( src->content_size + 327680 ) )*/ ) ) g_usleep( 100 ); } + + //sleep( 30 ); /* sets the FileTransfer instance connection (video/audio download) */ ret = gmyth_file_transfer_setup( &(src->file_transfer), src->live_tv ); @@ -924,7 +921,7 @@ src->do_start = FALSE; //if ( src->live_tv ) { - src->adapter = gst_adapter_new(); + src->bytes_queue = g_byte_array_new(); //src->th_read_ahead = gst_task_create( (GstTaskFunction)gst_mythtv_src_read_ahead, src ); //gst_task_set_lock( src->th_read_ahead, &th_mutex ); //gst_task_start( src->th_read_ahead ); @@ -970,6 +967,16 @@ } } +#if 0 +static void +gst_mythtv_src_get_times (GstBaseSrc * src, GstBuffer * buffer, + GstClockTime * start, GstClockTime * end) +{ + *start = -1; + *end = -1; +} +#endif + /* create a new socket for connecting to the next program chain */ static gboolean gst_mythtv_src_next_program_chain ( GstMythtvSrc *src ) @@ -1101,8 +1108,11 @@ gboolean ret = TRUE; g_print( "[%s] Differs from previous content size: %d (max.: %d)\n", __FUNCTION__, abs( src->content_size - src->prev_content_size ), GMYTHTV_TRANSFER_MAX_BUFFER ); - - if (src->content_size == -1) { + + if ( src->live_tv ) + //src->content_size = (guint64)-1; + ret = FALSE; + else if (src->content_size == -1) { //ret= FALSE; } else if ( src->live_tv && ( abs( src->content_size - src->bytes_read ) < GMYTHTV_TRANSFER_MAX_BUFFER ) ) { @@ -1262,7 +1272,7 @@ static gboolean gst_mythtv_src_is_seekable( GstBaseSrc *push_src ) { - return TRUE; + return FALSE; } static gboolean @@ -1332,8 +1342,11 @@ g_print( "[%s] PLAYING to PAUSED called!\n", __FUNCTION__ ); case GST_STATE_CHANGE_PAUSED_TO_READY: g_print( "[%s] PAUSED to READY called!\n", __FUNCTION__ ); + if ( src->live_tv && src->update_prog_chain ) { + /* + gst_pad_push_event ( GST_BASE_SRC_PAD (GST_BASE_SRC (src)), gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, 0, -1, 0 ) ); @@ -1341,7 +1354,9 @@ src->bytes_read = 0; src->unique_setup = FALSE; gst_mythtv_src_next_program_chain( src ); + */ } + break; default: break; diff -r 3b97ffa0634c -r d0f379fc4173 gst-plugins-mythtv/src/gstmythtvsrc.h --- a/gst-plugins-mythtv/src/gstmythtvsrc.h Wed Nov 22 00:15:38 2006 +0000 +++ b/gst-plugins-mythtv/src/gstmythtvsrc.h Wed Nov 22 00:16:33 2006 +0000 @@ -17,7 +17,7 @@ #define __GST_MYTHTV_SRC_H__ #include -#include +#include #include #include @@ -48,7 +48,7 @@ } GstMythtvState; struct _GstMythtvSrc { - GstPushSrc element; + GstBaseSrc element; /* MythFileTransfer */ GMythFileTransfer *file_transfer; @@ -104,7 +104,7 @@ GStaticRecMutex *th_mutex; - GstAdapter *adapter; + GByteArray *bytes_queue; /* enable Myth TV debug messages */ gboolean mythtv_msgs_dbg; @@ -113,7 +113,7 @@ }; struct _GstMythtvSrcClass { - GstPushSrcClass parent_class; + GstBaseSrcClass parent_class; }; GType gst_mythtv_src_get_type (void);