# HG changeset patch # User rosfran # Date 1164154538 0 # Node ID 3b97ffa0634c15aad2ffb8ccbf764ff3117e1e9e # Parent 28041df0da6e152d0e2efb4fc0fa5c1b00733596 [svn r98] Fixes some minor bugs on synchronization with monitor socket. diff -r 28041df0da6e -r 3b97ffa0634c gmyth/src/gmyth_file_transfer.c --- a/gmyth/src/gmyth_file_transfer.c Fri Nov 17 20:07:38 2006 +0000 +++ b/gmyth/src/gmyth_file_transfer.c Wed Nov 22 00:15:38 2006 +0000 @@ -53,8 +53,8 @@ #define GMYTHTV_RECORDER_HEADER "QUERY_RECORDER" /* default values to the file transfer parameters */ -#define GMYTHTV_USER_READ_AHEAD FALSE -#define GMYTHTV_RETRIES 1 +#define GMYTHTV_USER_READ_AHEAD TRUE +#define GMYTHTV_RETRIES -1 #define GMYTHTV_FILE_SIZE 0 #define GMYTHTV_BUFFER_SIZE 16*1024 @@ -76,6 +76,8 @@ static guint wait_to_transfer = 0; +static gboolean has_io_access = TRUE; + enum myth_sock_types { GMYTH_PLAYBACK_TYPE = 0, GMYTH_MONITOR_TYPE, @@ -83,7 +85,11 @@ GMYTH_RINGBUFFER_TYPE }; -static GStaticMutex mutex = G_STATIC_MUTEX_INIT; +//static GStaticMutex mutex = G_STATIC_MUTEX_INIT; + +static GMutex* mutex = NULL; + +static GCond* io_watcher_cond = NULL; static GMainContext *io_watcher_context = NULL; @@ -94,10 +100,14 @@ static void gmyth_file_transfer_finalize (GObject *object); static GMythSocket *gmyth_connect_to_transfer_backend( GMythFileTransfer **transfer, guint sock_type ); -static void* myth_init_io_watchers( void *data ); +static gboolean myth_init_io_watchers( GMythFileTransfer *transfer ); void gmyth_file_transfer_close( GMythFileTransfer *transfer ); +static gboolean myth_control_acquire_context( gboolean do_wait ); + +static gboolean myth_control_release_context( ); + G_DEFINE_TYPE(GMythFileTransfer, gmyth_file_transfer, G_TYPE_OBJECT) #if 0 @@ -139,6 +149,12 @@ { g_return_if_fail( gmyth_file_transfer != NULL ); gmyth_file_transfer->mythtv_version = GMYTHTV_VERSION; + + /* it is used for signalizing the event socket consumer thread */ + io_watcher_cond = g_cond_new(); + + /* mutex to control access to the event socket consumer thread */ + mutex = g_mutex_new(); } static void @@ -265,6 +281,7 @@ } else { g_warning("Remote transfer control socket already created.\n"); } + #endif /* configure the socket */ @@ -310,7 +327,7 @@ g_return_val_if_fail( transfer != NULL && *transfer != NULL, NULL ); g_return_val_if_fail( (*transfer)->uri != NULL, NULL ); - g_static_mutex_lock (&mutex); + //myth_control_acquire_context (TRUE); gchar *path_dir = gmyth_uri_getpath( (*transfer)->uri ); //g_print( "\t--> %s: path_dir = %s\n", __FUNCTION__, path_dir ); @@ -346,7 +363,7 @@ "\t\t\tCould not connect to server \"%s\" @ port %d\n", stype, (*transfer)->hostname->str, (*transfer)->port ); g_object_unref(sock); - g_static_mutex_unlock (&mutex); + //myth_control_release_context( ); return NULL; } @@ -357,11 +374,12 @@ if ( sock_type == GMYTH_PLAYBACK_TYPE ) { (*transfer)->control_sock = sock; - g_string_printf( base_str, "ANN Playback %s %d", hostname->str, TRUE ); + g_string_printf( base_str, "ANN Playback %s %d", hostname->str, FALSE ); gmyth_socket_send_command( (*transfer)->control_sock, base_str ); GString *resp = gmyth_socket_receive_response( (*transfer)->control_sock ); g_print( "[%s] Got Playback response from %s: %s\n", __FUNCTION__, base_str->str, resp->str ); + //myth_init_io_watchers ( (*transfer) ); } else if ( sock_type == GMYTH_MONITOR_TYPE ) { @@ -372,7 +390,7 @@ GString *resp = gmyth_socket_receive_response( (*transfer)->event_sock ); g_print( "[%s] Got Monitor response from %s: %s\n", __FUNCTION__, base_str->str, resp->str ); //g_thread_create( myth_init_io_watchers, (void*)(*transfer), FALSE, NULL ); - myth_init_io_watchers ( (void*)(*transfer) ); + g_thread_create( myth_init_io_watchers, (*transfer), FALSE, NULL ); g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() ); @@ -381,9 +399,9 @@ { (*transfer)->sock = sock; strlist = gmyth_string_list_new(); - //g_string_printf( base_str, "ANN FileTransfer %s %d %d", hostname->str, - // transfer->userreadahead, transfer->retries ); - g_string_printf( base_str, "ANN FileTransfer %s", hostname->str ); + g_string_printf( base_str, "ANN FileTransfer %s %d %d", hostname->str, + (*transfer)->userreadahead, (*transfer)->retries ); + //g_string_printf( base_str, "ANN FileTransfer %s", hostname->str ); gmyth_string_list_append_string( strlist, base_str ); gmyth_string_list_append_char_array( strlist, path_dir ); @@ -427,7 +445,7 @@ if ( strlist != NULL ) g_object_unref( strlist ); - g_static_mutex_unlock (&mutex); + //myth_control_release_context( ); return sock; } @@ -471,7 +489,7 @@ GMythStringList *str_list = gmyth_string_list_new (); g_debug ( "[%s]\n", __FUNCTION__ ); - g_static_mutex_lock (&mutex); + myth_control_acquire_context (TRUE); g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER, file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id ); @@ -492,7 +510,7 @@ } } g_print( "[%s] %s, stream is %s being recorded!\n", __FUNCTION__, ret ? "YES" : "NO", ret ? "" : "NOT" ); - g_static_mutex_unlock (&mutex); + myth_control_release_context( ); if ( str_list != NULL ) g_object_unref (str_list); @@ -509,7 +527,7 @@ GMythStringList *str_list = gmyth_string_list_new (); g_debug ( "[%s]\n", __FUNCTION__ ); - g_static_mutex_lock (&mutex); + myth_control_acquire_context (TRUE); g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER, file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id ); @@ -525,7 +543,7 @@ if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strstr ( str->str, "bad" ) == NULL ) pos = gmyth_util_decode_long_long( str_list, 0 ); } - g_static_mutex_unlock (&mutex); + myth_control_release_context( ); #ifndef GMYTHTV_ENABLE_DEBUG @@ -640,6 +658,8 @@ // if (!controlSock->isOpen() || controlSock->error()) // return 0; + + myth_control_acquire_context( TRUE ); GMythStringList *strlist = gmyth_string_list_new(); g_string_printf (transfer->query, "%s %d", GMYTHTV_QUERY_HEADER, transfer->recordernum); @@ -661,45 +681,131 @@ g_print( "[%s] got reading position pointer from the streaming = %lld\n", __FUNCTION__, retval ); - //gmyth_file_transfer_reset( transfer ); + myth_control_release_context( ); return retval; } +static gboolean +myth_control_acquire_context( gboolean do_wait ) +{ + + gboolean ret = TRUE; + + //g_mutex_lock( mutex ); + + //while ( !has_io_access ) + // g_cond_wait( io_watcher_cond, mutex ); + + //has_io_access = FALSE; + + //myth_control_acquire_context (FALSE); + + /* + g_mutex_lock( mutex ); + + if ( do_wait ) { + if ( !g_main_context_wait( io_watcher_context, io_watcher_cond, mutex ) ) + ret = FALSE; + } else if ( !g_main_context_acquire( io_watcher_context ) ) + ret = FALSE; + */ + + return ret; + +} + +static gboolean +myth_control_release_context( ) +{ + + gboolean ret = TRUE; + + //g_main_context_release( io_watcher_context ); + + //g_mutex_unlock( mutex ); + + //has_io_access = TRUE; + + //g_cond_broadcast( io_watcher_cond ); + + //g_mutex_unlock( mutex ); + + + return ret; + +} + static gboolean -myth_control_sock_listener( GIOChannel *source, GIOCondition condition, gpointer data ) +myth_control_sock_listener( GIOChannel *io_channel, GIOCondition condition, gpointer data ) { - GIOStatus ret; - GError *err = NULL; - gchar *msg = g_strdup(""); + GIOStatus io_status; + GError *error = NULL; + GIOCondition io_cond; + gchar *trash = g_new0( gchar, GMYTHTV_BUFFER_SIZE*10 ); + guint recv = 0; + //gchar *msg = g_strdup(""); - g_static_mutex_lock( &mutex ); + while ( !has_io_access ) + g_cond_wait( io_watcher_cond, mutex ); + + //myth_control_acquire_context (TRUE); + + has_io_access = FALSE; + + g_mutex_lock( mutex ); + + gsize len = 0; + if (condition & G_IO_HUP) + g_print ("Read end of pipe died!\n"); + + if ( ( condition & G_IO_IN ) != 0 ) { + io_status = g_io_channel_set_encoding( io_channel, NULL, &error ); + do + { + //trash = g_new0( gchar, GMYTHTV_BUFFER_SIZE ); - gsize len; - if (condition & G_IO_HUP) - g_error ("Read end of pipe died!\n"); - ret = g_io_channel_read_line ( source, &msg, &len, NULL, &err); - if ( ret == G_IO_STATUS_ERROR ) - g_error ("[%s] Error reading: %s\n", __FUNCTION__, err != NULL ? err->message : "" ); - g_print ("\n\n\n\n\n\n[%s]\t\tEVENT: Read %u bytes: %s\n\n\n\n\n", __FUNCTION__, len, msg != NULL ? msg : "" ); - if ( msg != NULL ) - g_free (msg); + io_status = g_io_channel_read_chars( io_channel, trash + recv, + GMYTHTV_BUFFER_SIZE, &len, &error); + + g_print( "[%s] Received data buffer from IO binary channel... %d bytes gone!\n", + __FUNCTION__, len ); + + recv += len; + + //msg = g_strconcat( msg, g_strdup(trash), NULL ); + + //if ( trash != NULL ) + // g_free( trash ); + + io_cond = g_io_channel_get_buffer_condition( io_channel ); + + } while ( ( io_cond & G_IO_IN ) != 0 && ( io_status != G_IO_STATUS_ERROR ) ); + } + //ret = g_io_channel_read_chars ( source, &msg, &len, NULL, &err); + if ( io_status == G_IO_STATUS_ERROR ) + g_print ("[%s] Error reading: %s\n", __FUNCTION__, error != NULL ? error->message : "" ); + g_print ("\n[%s]\tEVENT: Read %d bytes: %s\n\n", __FUNCTION__, len, trash != NULL ? trash : "[no event data]" ); + + has_io_access = TRUE; - g_static_mutex_unlock( &mutex ); - + //myth_control_release_context( ); + g_cond_broadcast( io_watcher_cond ); + + g_mutex_unlock( mutex ); + return TRUE; } -static void* -myth_init_io_watchers( void *data ) +static gboolean +myth_init_io_watchers( GMythFileTransfer *transfer ) { - GMythFileTransfer *transfer = (GMythFileTransfer*)data; - io_watcher_context = g_main_context_new(); - GMainLoop *loop = g_main_loop_new( NULL, FALSE ); + io_watcher_context = g_main_context_default(); + //GMainLoop *loop = g_main_loop_new( io_watcher_context, FALSE ); - GSource *source = NULL; + GSource *source; if ( transfer->event_sock->sd_io_ch != NULL ) source = g_io_create_watch( transfer->event_sock->sd_io_ch, G_IO_IN | G_IO_HUP ); @@ -716,19 +822,15 @@ } g_print( "[%s]\tOK! Starting listener on the MONITOR event socket...\n", __FUNCTION__ ); - - g_main_loop_run( loop ); + + //g_main_loop_run( loop ); cleanup: if ( source != NULL ) g_source_unref( source ); - g_main_loop_unref( loop ); - - g_main_context_unref( io_watcher_context ); - - return NULL; -} + return TRUE; +} static gboolean gmyth_file_transfer_is_backend_message( GMythFileTransfer *transfer, @@ -779,6 +881,17 @@ gchar *trash = g_strdup(""); g_return_val_if_fail ( data != NULL, -2 ); + + #if 0 + while ( !has_io_access ) + g_cond_wait( io_watcher_cond, mutex ); + + has_io_access = FALSE; + #endif + + myth_control_acquire_context (FALSE); + + //g_mutex_lock( mutex ); /* gets the size of the entire file, if the size requested is lesser than 0 */ if ( size <= 0 ) @@ -848,14 +961,10 @@ wait_to_transfer = 0; - //while ( transfer->live_tv && ( gmyth_file_transfer_get_file_position( transfer ) < 4096 ) && - // wait_to_transfer++ < GMYTHTV_TRANSFER_MAX_WAITS ) - // g_usleep( 1000*50 ); /* waits just for 2/10 second */ - //g_thread_create( myth_init_io_watchers, (void*)transfer, FALSE, NULL ); //g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() ); - //g_static_mutex_lock (&mutex); + //g_mutex_lock (mutex); //strlist = gmyth_string_list_new(); g_string_printf ( transfer->query, "%s %d", @@ -865,27 +974,39 @@ sent = size; remaining = size - recv; - //g_static_mutex_unlock( &mutex ); + //g_mutex_unlock( mutex ); //data = (void*)g_new0( gchar, size ); - g_io_channel_flush( io_channel_control, NULL ); - - //g_static_mutex_lock( &mutex ); + //g_mutex_lock( mutex ); io_cond = g_io_channel_get_buffer_condition( io_channel ); + + if ( ( io_cond_control & G_IO_IN ) != 0 ) { + g_print( "[%s] Finishind reading function: cleaning all data on the I/O control socket...\n", __FUNCTION__ ); + while ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 ) + { + if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 ) + { + gint backend_code = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error + g_print( "[%s]\t backend code = %d -\t"\ + "[%s prepared for reading]! (G_IO_IN) !!!\n\n", __FUNCTION__, + backend_code, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); + } + } + } while ( recv < size && !response )//&& ( io_cond & G_IO_IN ) != 0 ) { //g_io_channel_flush( io_channel_control, NULL ); - + gint backend_msg_count = 1; + +resend_request: strlist = gmyth_string_list_new(); gmyth_string_list_append_char_array( strlist, transfer->query->str ); gmyth_string_list_append_char_array( strlist, /*transfer->live_tv ? "REQUEST_BLOCK_RINGBUF" :*/ "REQUEST_BLOCK" ); gmyth_string_list_append_int( strlist, remaining ); gmyth_socket_write_stringlist( transfer->control_sock, strlist ); - - gint backend_msg_count = 1; /* iterates until find some non-MythTV backend message */ do { @@ -907,25 +1028,35 @@ if ( sent != 0 ) { g_print( "[%s]\t received = %d bytes, backend says %d bytes sent, "\ - "io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__, - recv, sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); + "io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__, + recv, sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); if ( sent == remaining ) { - //response = ( recv == size ); g_print( "[%s]\t\tsent %d, which is equals to requested size = %d\n\n", __FUNCTION__, sent, remaining ); + break; } else { g_print( "[%s]\t\tsent %d, which is NOT equals to requested size = %d\n\n", __FUNCTION__, sent, remaining ); - remaining = sent; - break; - + size = remaining = sent; + if ( sent < 0 ) { + if ( --backend_msg_count > 0 ) + goto resend_request; + else + goto cleanup; + } else + break; } } else { - goto cleanup; + if ( --backend_msg_count > 0 ) + { + g_usleep( 200 ); + goto resend_request; + } else + goto cleanup; } // if } } @@ -946,12 +1077,6 @@ io_status = g_io_channel_read_chars( io_channel, data + recv, buf_len, &bytes_read, &error ); - /* - GString *sss = g_string_new(""); - sss = g_string_append_len( sss, (gchar*)data+recv, bytes_read ); - - g_print( "[%s] Reading buffer (length = %d)\n", __FUNCTION__, bytes_read); - */ if ( bytes_read > 0 ) { recv += bytes_read; @@ -984,7 +1109,7 @@ g_print( "[%s]\t io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); - } while ( remaining > 0 );//&& ( io_status == G_IO_STATUS_NORMAL ) ); + } while ( remaining > 0 );//&& ( ( io_cond & G_IO_IN ) != 0 ) ); //io_cond_control = g_io_channel_get_buffer_condition( io_channel_control ); if ( remaining == 0 )//( io_cond_control & G_IO_IN ) != 0 ) @@ -992,31 +1117,33 @@ response = TRUE; break; } + + if ( ( io_cond_control & G_IO_IN ) != 0 ) { + g_print( "[%s] Finishind reading function: cleaning all data on the I/O control socket...\n", __FUNCTION__ ); + while ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 ) + { + if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 ) + { + gint backend_code = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error + g_print( "[%s]\t backend code = %d -\t"\ + "[%s prepared for reading]! (G_IO_IN) !!!\n\n", __FUNCTION__, + backend_code, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); + } + } + } + g_io_channel_flush( io_channel_control, NULL ); } // while - - //io_cond_control = g_io_channel_get_buffer_condition( io_channel_control ); - - if ( ( ( io_cond_control & G_IO_IN ) != 0 ) /*&& - ( response || ( recv == size ) ) ) - ( recv <= 0 || sent <= 0 ) */ ) - { - if ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 ) - { - if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 ) - { - gint backend_code = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error - g_print( "[%s]\t backend code = %d -\t"\ - "[%s prepared for reading]! (G_IO_IN) !!!\n\n", __FUNCTION__, - backend_code, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); - } - } - } cleanup: - //g_static_mutex_unlock (&mutex); - //g_io_channel_flush( io_channel_control, NULL ); + myth_control_release_context (mutex); + + //has_io_access = TRUE; + //g_cond_broadcast( io_watcher_cond ); + + //g_mutex_unlock( mutex ); + if ( trash != NULL ) g_free( trash );