diff -r 324e04989738 -r 686549eb5657 gst-plugins-mythtv/src/gmyth_file_transfer.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gst-plugins-mythtv/src/gmyth_file_transfer.c Mon Oct 23 16:02:15 2006 +0100 @@ -0,0 +1,1050 @@ +/* vim: set sw=2: -*- Mode: C; tab-width: 2; indent-tabs-mode: t; c-basic-offset: 2; c-indent-level: 2-*- */ +/** + * GStreamer plug-in properties: + * - location (backend server hostname/URL) [ex.: myth://192.168.1.73:28722/1000_1092091.nuv] + * - path (qurl - remote file to be opened) + * - port number + * @author Rosfran Lins Borges + */ + +#include "gmyth_file_transfer.h" +#include "gmyth_uri.h" +#include "gmyth_livetv.h" +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#define GMYTHTV_QUERY_HEADER "QUERY_FILETRANSFER" +#define GMYTHTV_RECORDER_HEADER "QUERY_RECORDER" + +/* default values to the file transfer parameters */ +#define GMYTHTV_USER_READ_AHEAD FALSE +#define GMYTHTV_RETRIES 1 +#define GMYTHTV_FILE_SIZE -1 + +#define GMYTHTV_BUFFER_SIZE 8*1024 + +#define GMYTHTV_VERSION 30 + +#define GMYTHTV_TRANSFER_MAX_WAITS 700 + +#ifdef GMYTHTV_ENABLE_DEBUG +#define GMYTHTV_ENABLE_DEBUG 1 +#else +#undef GMYTHTV_ENABLE_DEBUG +#endif + +/* this NDEBUG is to maintain compatibility with GMyth library */ +#ifndef NDEBUG +#define GMYTHTV_ENABLE_DEBUG 1 +#endif + +static guint wait_to_transfer = 0; + +enum myth_sock_types { + GMYTH_PLAYBACK_TYPE = 0, + GMYTH_MONITOR_TYPE, + GMYTH_FILETRANSFER_TYPE, + GMYTH_RINGBUFFER_TYPE +}; + +static GStaticMutex mutex = G_STATIC_MUTEX_INIT; + +static GMainContext *io_watcher_context = NULL; + +static void gmyth_file_transfer_class_init (GMythFileTransferClass *klass); +static void gmyth_file_transfer_init (GMythFileTransfer *object); + +static void gmyth_file_transfer_dispose (GObject *object); +static void gmyth_file_transfer_finalize (GObject *object); + +static GMythSocket *myth_connect_to_transfer_backend( GMythFileTransfer **transfer, guint sock_type ); +static void* myth_init_io_watchers( void *data ); + +void gmyth_file_transfer_close( GMythFileTransfer *transfer ); + +G_DEFINE_TYPE(GMythFileTransfer, gmyth_file_transfer, G_TYPE_OBJECT) + +#if 0 +static guint64 +mmyth_util_decode_long_long( GMythStringList *strlist, guint offset ) +{ + + guint64 ret_value = 0LL; + + g_return_val_if_fail( strlist != NULL, ret_value ); + + if ( offset < gmyth_string_list_length( strlist )) + g_printerr( "[%s] Offset is lower than the GMythStringList (offset = %d)!\n", __FUNCTION__, offset ); + g_return_val_if_fail( offset < gmyth_string_list_length( strlist ), ret_value ); + + gint l1 = gmyth_string_list_get_int( strlist, offset ); + gint l2 = gmyth_string_list_get_int( strlist, offset + 1 ); + + ret_value = ((guint64)(l2) & 0xffffffffLL) | ((guint64)(l1) << 32); + + return ret_value; + +} +#endif + +static void +gmyth_file_transfer_class_init (GMythFileTransferClass *klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + gobject_class->dispose = gmyth_file_transfer_dispose; + gobject_class->finalize = gmyth_file_transfer_finalize; +} + + static void +gmyth_file_transfer_init (GMythFileTransfer *gmyth_file_transfer) +{ + g_return_if_fail( gmyth_file_transfer != NULL ); + gmyth_file_transfer->mythtv_version = GMYTHTV_VERSION; +} + +static void +gmyth_file_transfer_dispose (GObject *object) +{ + GMythFileTransfer *gmyth_file_transfer = GMYTH_FILE_TRANSFER(object); + + gmyth_file_transfer_close( gmyth_file_transfer ); + + G_OBJECT_CLASS (gmyth_file_transfer_parent_class)->dispose (object); +} + + static void +gmyth_file_transfer_finalize (GObject *object) +{ + g_signal_handlers_destroy (object); + + G_OBJECT_CLASS (gmyth_file_transfer_parent_class)->finalize (object); +} + + GMythFileTransfer* +gmyth_file_transfer_new (gint num, GString *uri_str, gshort port, gint mythtv_version) +{ + GMythFileTransfer *transfer = GMYTH_FILE_TRANSFER ( g_object_new ( + GMYTH_FILE_TRANSFER_TYPE, FALSE )); + + if ( mythtv_version > 0 ) + transfer->mythtv_version = mythtv_version; + + transfer->card_id = num; + + transfer->rec_id = -1; + + transfer->recordernum = num; + transfer->uri = gmyth_uri_new ( uri_str->str ); + + transfer->hostname = g_string_new( gmyth_uri_gethost(transfer->uri) ); + g_print( "\t--> transfer->hostname = %s\n", transfer->hostname->str ); + + if ( port >= 0 ) + transfer->port = port; + else + transfer->port = gmyth_uri_getport( transfer->uri ); + + g_print( "\t--> transfer->port = %d\n", transfer->port ); + + transfer->readposition = 0; + transfer->filesize = GMYTHTV_FILE_SIZE; + transfer->timeoutisfast = FALSE; + + transfer->userreadahead = GMYTHTV_USER_READ_AHEAD; + transfer->retries = GMYTHTV_RETRIES; + + transfer->live_tv = FALSE; + + transfer->query = g_string_new( GMYTHTV_QUERY_HEADER ); + g_string_append_printf ( transfer->query, " %d", transfer->recordernum ); + g_print( "\t--> transfer->query = %s\n", transfer->query->str ); + + transfer->control_sock = NULL; + transfer->event_sock = NULL; + transfer->sock = NULL; + + return transfer; +} + +gboolean +gmyth_file_transfer_livetv_setup( GMythFileTransfer **transfer, GMythSocket *live_socket ) +{ + (*transfer)->sock = live_socket; + g_object_ref( live_socket ); + + return TRUE; +} + +gboolean +gmyth_file_transfer_playback_setup( GMythFileTransfer **transfer, gboolean live_tv ) +{ + + gboolean ret = TRUE; + + (*transfer)->live_tv = live_tv; + + printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" ); + + /* configure the control socket */ + if ((*transfer)->control_sock == NULL) { + + if ( myth_connect_to_transfer_backend ( transfer, GMYTH_PLAYBACK_TYPE ) == NULL ) { + g_printerr( "Connection to backend failed (Control Socket).\n" ); + ret = FALSE; + } + + } else { + g_warning("Remote transfer control socket already created.\n"); + } + + return ret; + +} + +gboolean +gmyth_file_transfer_setup( GMythFileTransfer **transfer, gboolean live_tv ) +{ + GMythStringList *strlist = NULL; + + gboolean ret = TRUE; + + (*transfer)->live_tv = live_tv; + + printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" ); + +#if 0 + /* configure the control socket */ + if ((*transfer)->event_sock == NULL) { + + if ( myth_connect_to_transfer_backend ( transfer, GMYTH_MONITOR_TYPE ) == NULL ) { + g_printerr( "Connection to backend failed (Event Socket).\n" ); + ret = FALSE; + } + + } else { + g_warning("Remote transfer control socket already created.\n"); + } +#endif + + /* configure the socket */ + if ( (*transfer)->sock == NULL ) { + + //if ( live_tv == FALSE ) { + + if ( myth_connect_to_transfer_backend ( transfer, GMYTH_FILETRANSFER_TYPE ) == NULL ) { + g_printerr ("Connection to backend failed (Raw Transfer Socket).\n"); + ret = FALSE; + } + + if ( !(*transfer)->live_tv && (*transfer)->control_sock != NULL) { + strlist = gmyth_string_list_new(); + g_string_printf ( (*transfer)->query, "%s %d", GMYTHTV_QUERY_HEADER, (*transfer)->recordernum ); + + gmyth_string_list_append_string( strlist, (*transfer)->query ); + gmyth_string_list_append_char_array( strlist, "IS_OPEN" ); + + gmyth_socket_write_stringlist( (*transfer)->control_sock, strlist ); + gmyth_socket_read_stringlist( (*transfer)->control_sock, strlist ); + + if ( strlist!=NULL && gmyth_string_list_get_int( strlist, 0 ) == 1 ) { + g_print( "[%s] Remote Myth FileTransfer socket is open!\n", __FUNCTION__ ); + } else { + g_print( "[%s] Remote Myth FileTransfer socket is CLOSED! See the MythTV Server Backend for configuration details...\n", __FUNCTION__ ); + ret = FALSE; + } + } + + } else { + g_warning("Remote transfer (raw) socket already created.\n"); + } + + return ret; +} + +static GMythSocket * +myth_connect_to_transfer_backend( GMythFileTransfer **transfer, guint sock_type ) +{ + GMythSocket *sock = NULL; + + g_return_val_if_fail( transfer != NULL && *transfer != NULL, NULL ); + g_return_val_if_fail( (*transfer)->uri != NULL, NULL ); + + g_static_mutex_lock (&mutex); + + gchar *path_dir = gmyth_uri_getpath( (*transfer)->uri ); + //g_print( "\t--> %s: path_dir = %s\n", __FUNCTION__, path_dir ); + + gchar *stype = g_strdup( "" ); + + // if ( (*transfer)->live_tv == FALSE ) { + + sock = gmyth_socket_new(); + + gmyth_socket_connect( &sock, (*transfer)->hostname->str, (*transfer)->port ); + + /* + } else { + sock = (*transfer)->sock; + } + */ +#ifdef GMYTHTV_ENABLE_DEBUG + + g_print( "[%s] --> Creating socket... (%s, %d)\n", __FUNCTION__, (*transfer)->hostname->str, (*transfer)->port ); +#endif + + GMythStringList *strlist = NULL; + + GString *hostname = g_string_new( gmyth_uri_gethost( (*transfer)->uri ) ); + GString *base_str = g_string_new( "" ); + + if ( gmyth_socket_check_protocol_version_number (sock, (*transfer)->mythtv_version) ) { + + if (sock == NULL) { + stype = (sock_type==GMYTH_PLAYBACK_TYPE) ? "control socket" : "file data socket"; + g_printerr( "FileTransfer, open_socket(%s): \n" + "\t\t\tCould not connect to server \"%s\" @ port %d\n", stype, + (*transfer)->hostname->str, (*transfer)->port ); + g_object_unref(sock); + g_static_mutex_unlock (&mutex); + return NULL; + } + + hostname = gmyth_socket_get_local_hostname(); + + g_print( "[%s] local hostname = %s\n", __FUNCTION__, hostname->str ); + + if ( sock_type == GMYTH_PLAYBACK_TYPE ) + { + (*transfer)->control_sock = sock; + g_string_printf( base_str, "ANN Playback %s %d", hostname->str, TRUE ); + + gmyth_socket_send_command( (*transfer)->control_sock, base_str ); + GString *resp = gmyth_socket_receive_response( (*transfer)->control_sock ); + g_print( "[%s] Got Playback response from %s: %s\n", __FUNCTION__, base_str->str, resp->str ); + } + else if ( sock_type == GMYTH_MONITOR_TYPE ) + { + (*transfer)->event_sock = sock; + g_string_printf( base_str, "ANN Monitor %s %d", hostname->str, TRUE ); + + gmyth_socket_send_command( (*transfer)->event_sock, base_str ); + GString *resp = gmyth_socket_receive_response( (*transfer)->event_sock ); + g_print( "[%s] Got Monitor response from %s: %s\n", __FUNCTION__, base_str->str, resp->str ); + //g_thread_create( myth_init_io_watchers, (void*)(*transfer), FALSE, NULL ); + myth_init_io_watchers ( (void*)(*transfer) ); + + g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() ); + + } + else if ( sock_type == GMYTH_FILETRANSFER_TYPE ) + { + (*transfer)->sock = sock; + strlist = gmyth_string_list_new(); + //g_string_printf( base_str, "ANN FileTransfer %s %d %d", hostname->str, + // transfer->userreadahead, transfer->retries ); + g_string_printf( base_str, "ANN FileTransfer %s", hostname->str ); + + gmyth_string_list_append_string( strlist, base_str ); + gmyth_string_list_append_char_array( strlist, path_dir ); + + gmyth_socket_write_stringlist( (*transfer)->sock, strlist ); + gmyth_socket_read_stringlist( (*transfer)->sock, strlist ); + + /* socket number, where all the stream data comes from - got from the MythTV remote backend */ + (*transfer)->recordernum = gmyth_string_list_get_int( strlist, 1 ); + + /* Myth URI stream file size - decoded using two 8-bytes sequences (64 bits/long long types) */ + (*transfer)->filesize = gmyth_util_decode_long_long( strlist, 2 ); + + printf( "[%s] ***** Received: recordernum = %d, filesize = %" G_GUINT64_FORMAT "\n", __FUNCTION__, + (*transfer)->recordernum, (*transfer)->filesize ); + + if ( (*transfer)->filesize <= 0 ) { + g_print( "[%s] Got filesize equals to %llu is lesser than 0 [invalid stream file]\n", __FUNCTION__, (*transfer)->filesize ); + g_object_unref(sock); + sock = NULL; + } + } + else if ( sock_type == GMYTH_RINGBUFFER_TYPE ) + { + (*transfer)->sock = sock; + //gmyth_file_transfer_spawntv( (*transfer), NULL ); + + strlist = gmyth_string_list_new(); + g_string_printf( base_str, "ANN RingBuffer %s %d", hostname->str, (*transfer)->card_id ); + + gmyth_socket_send_command( (*transfer)->sock, base_str ); + GString *resp = gmyth_socket_receive_response( (*transfer)->sock ); + g_print( "[%s] Got RingBuffer response from %s: %s\n", __FUNCTION__, base_str->str, resp->str ); + + } + + } + + printf("[%s] ANN %s sent: %s\n", (sock_type==GMYTH_PLAYBACK_TYPE) ? "Playback" : (sock_type==GMYTH_FILETRANSFER_TYPE) ? "FileTransfer" : "Monitor", __FUNCTION__, base_str->str); + + if ( strlist != NULL ) + g_object_unref( strlist ); + + g_static_mutex_unlock (&mutex); + + return sock; +} + +void +gmyth_file_transfer_spawntv ( GMythFileTransfer *file_transfer, + GString *tvchain_id ) +{ + GMythStringList *str_list; + + g_debug ("gmyth_file_transfer_spawntv.\n"); + + str_list = gmyth_string_list_new (); + + g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER, + file_transfer->card_id ); + gmyth_string_list_append_string (str_list, file_transfer->query); + gmyth_string_list_append_string (str_list, g_string_new ("SPAWN_LIVETV")); + if (tvchain_id!=NULL) { + gmyth_string_list_append_string (str_list, tvchain_id); + gmyth_string_list_append_int (str_list, FALSE); // PIP = FALSE (0) + } + + gmyth_socket_sendreceive_stringlist ( file_transfer->sock, str_list ); + + //GString *str = NULL; + + //if (str_list!=NULL && (str = gmyth_string_list_get_string( str_list, 0 )) != NULL && strcasecmp( str->str, "ok" ) != 0 ) { + // g_print( "[%s]\t\tSpawnLiveTV is OK!\n", __FUNCTION__ ); + //} + if (str_list!=NULL) + g_object_unref (str_list); + +} + +gboolean +gmyth_file_transfer_is_recording ( GMythFileTransfer *file_transfer ) +{ + gboolean ret = TRUE; + + GMythStringList *str_list = gmyth_string_list_new (); + + g_debug ( "[%s]\n", __FUNCTION__ ); + g_static_mutex_lock (&mutex); + + g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER, + file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id ); + gmyth_string_list_append_string (str_list, file_transfer->query); + gmyth_string_list_append_string (str_list, g_string_new ("IS_RECORDING")); + + gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list ); + + if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 ) + { + GString *str = NULL; + if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strcmp( str->str, "bad" )!= 0 ) { + gint is_rec = gmyth_string_list_get_int( str_list, 0 ); + if ( is_rec != 0 ) + ret = TRUE; + else + ret = FALSE; + } + } + g_print( "[%s] %s, stream is %s being recorded!\n", __FUNCTION__, ret ? "YES" : "NO", ret ? "" : "NOT" ); + g_static_mutex_unlock (&mutex); + + if ( str_list != NULL ) + g_object_unref (str_list); + + return ret; + +} + +gint64 +gmyth_file_transfer_get_file_position ( GMythFileTransfer *file_transfer ) +{ + gint64 pos = 0; + + GMythStringList *str_list = gmyth_string_list_new (); + + g_debug ( "[%s]\n", __FUNCTION__ ); + g_static_mutex_lock (&mutex); + + g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER, + file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id ); + + gmyth_string_list_append_string (str_list, file_transfer->query); + gmyth_string_list_append_string (str_list, g_string_new ("GET_FILE_POSITION")); + + gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list ); + + if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 ) + { + GString *str = NULL; + if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strstr ( str->str, "bad" ) == NULL ) + pos = gmyth_util_decode_long_long( str_list, 0 ); + } + g_static_mutex_unlock (&mutex); + +#ifndef GMYTHTV_ENABLE_DEBUG + + g_print( "[%s] Got file position = %lld\n", __FUNCTION__, pos ); +#endif + if (str_list!=NULL) + g_object_unref (str_list); + + return pos; + +} + + glong +gmyth_file_transfer_get_recordernum( GMythFileTransfer *transfer ) +{ + return transfer->recordernum; +} + + glong +gmyth_file_transfer_get_filesize( GMythFileTransfer *transfer ) +{ + return transfer->filesize; +} + + gboolean +gmyth_file_transfer_isopen( GMythFileTransfer *transfer ) +{ + return (transfer->sock != NULL && transfer->control_sock != NULL); +} + + void +gmyth_file_transfer_close( GMythFileTransfer *transfer ) +{ + GMythStringList *strlist; + + if (transfer->control_sock == NULL) + return; + + strlist = gmyth_string_list_new( ); + + g_string_printf( transfer->query, "%s %d", GMYTHTV_QUERY_HEADER, + transfer->recordernum ); + gmyth_string_list_append_string( strlist, transfer->query ); + gmyth_string_list_append_char_array( strlist, "DONE" ); + + + if ( gmyth_socket_sendreceive_stringlist(transfer->control_sock, strlist) <= 0 ) + { + g_printerr( "Remote file timeout.\n" ); + } + + if (transfer->sock) + { + g_object_unref( transfer->sock ); + transfer->sock = NULL; + } + + if (transfer->control_sock) + { + g_object_unref( transfer->control_sock ); + transfer->control_sock = NULL; + } + +} + + void +gmyth_file_transfer_reset_controlsock( GMythFileTransfer *transfer ) +{ + if (transfer->control_sock == NULL) + { + g_printerr( "gmyth_file_transfer_reset_controlsock(): Called with no control socket" ); + return; + } + + GString *str = gmyth_socket_receive_response( transfer->control_sock ); + + g_string_free( str, TRUE ); +} + +void +gmyth_file_transfer_reset_sock( GMythFileTransfer *transfer ) +{ + if ( transfer->sock == NULL ) + { + g_printerr( "gmyth_file_transfer_reset_sock(): Called with no raw socket" ); + return; + } + + GString *str = gmyth_socket_receive_response( transfer->sock ); + + g_string_free( str, TRUE ); +} + +void +gmyth_file_transfer_reset( GMythFileTransfer *transfer ) +{ + gmyth_file_transfer_reset_controlsock( transfer ); + gmyth_file_transfer_reset_sock( transfer ); +} + +gint64 +gmyth_file_transfer_seek(GMythFileTransfer *transfer, guint64 pos, gint whence) +{ + if (transfer->sock == NULL) + { + g_printerr( "[%s] gmyth_file_transfer_seek(): Called with no socket", __FUNCTION__ ); + return 0; + } + + if (transfer->control_sock == NULL) + return 0; + + // if (!controlSock->isOpen() || controlSock->error()) + // return 0; + + GMythStringList *strlist = gmyth_string_list_new(); + g_string_printf (transfer->query, "%s %d", GMYTHTV_QUERY_HEADER, transfer->recordernum); + gmyth_string_list_append_string( strlist, transfer->query ); + gmyth_string_list_append_char_array( strlist, "SEEK" ); + gmyth_string_list_append_uint64( strlist, pos ); + + gmyth_string_list_append_int( strlist, whence ); + + if (pos > 0 ) + gmyth_string_list_append_uint64( strlist, pos ); + else + gmyth_string_list_append_uint64( strlist, transfer->readposition ); + + gmyth_socket_sendreceive_stringlist( transfer->control_sock, strlist ); + + gint64 retval = gmyth_string_list_get_int64(strlist, 0); + transfer->readposition = retval; + g_print( "[%s] got reading position pointer from the streaming = %lld\n", + __FUNCTION__, retval ); + + //gmyth_file_transfer_reset( transfer ); + + return retval; +} + +static gboolean +myth_control_sock_listener( GIOChannel *source, GIOCondition condition, gpointer data ) +{ + + GIOStatus ret; + GError *err = NULL; + gchar *msg = g_strdup(""); + + g_static_mutex_lock( &mutex ); + + gsize len; + if (condition & G_IO_HUP) + g_error ("Read end of pipe died!\n"); + ret = g_io_channel_read_line ( source, &msg, &len, NULL, &err); + if ( ret == G_IO_STATUS_ERROR ) + g_error ("[%s] Error reading: %s\n", __FUNCTION__, err != NULL ? err->message : "" ); + g_print ("\n\n\n\n\n\n[%s]\t\tEVENT: Read %u bytes: %s\n\n\n\n\n", __FUNCTION__, len, msg != NULL ? msg : "" ); + if ( msg != NULL ) + g_free (msg); + + g_static_mutex_unlock( &mutex ); + + return TRUE; + +} + +static void* +myth_init_io_watchers( void *data ) +{ + GMythFileTransfer *transfer = (GMythFileTransfer*)data; + io_watcher_context = g_main_context_new(); + GMainLoop *loop = g_main_loop_new( NULL, FALSE ); + + GSource *source = NULL; + + if ( transfer->event_sock->sd_io_ch != NULL ) + source = g_io_create_watch( transfer->event_sock->sd_io_ch, G_IO_IN | G_IO_HUP ); + else + goto cleanup; + + g_source_set_callback ( source, (GSourceFunc)myth_control_sock_listener, NULL, NULL ); + + g_source_attach( source, io_watcher_context ); + + if (source==NULL) { + g_printerr( "[%s] Error adding watch listener function to the IO control channel!\n", __FUNCTION__ ); + goto cleanup; + } + + g_print( "[%s]\tOK! Starting listener on the MONITOR event socket...\n", __FUNCTION__ ); + + g_main_loop_run( loop ); + +cleanup: + if ( source != NULL ) + g_source_unref( source ); + + g_main_loop_unref( loop ); + + g_main_context_unref( io_watcher_context ); + + return NULL; +} + + +gint +gmyth_file_transfer_read(GMythFileTransfer *transfer, void *data, gint size, gboolean read_unlimited) +{ + gint recv = 0; + gsize bytes_read = 0; + gint sent = 0; + guint remaining = 0; + gboolean response = FALSE; + + GIOChannel *io_channel; + GIOChannel *io_channel_control; + + GIOCondition io_cond; + GIOCondition io_cond_control; + GIOStatus io_status = G_IO_STATUS_NORMAL, io_status_control = G_IO_STATUS_NORMAL; + + gint buf_len = GMYTHTV_BUFFER_SIZE; + + GMythStringList *strlist = NULL; + GError *error = NULL; + + gchar *trash = g_strdup(""); + + g_return_val_if_fail ( data != NULL, -2 ); + + /* gets the size of the entire file, if the size requested is lesser than 0 */ + if ( size <= 0 ) + size = transfer->filesize; + + io_channel = transfer->sock->sd_io_ch; + io_channel_control = transfer->control_sock->sd_io_ch; + + //g_io_channel_set_flags( io_channel, G_IO_FLAG_APPEND | + // G_IO_STATUS_AGAIN | G_IO_FLAG_IS_READABLE | G_IO_FLAG_IS_WRITEABLE | + // G_IO_FLAG_IS_SEEKABLE, NULL ); + + io_status = g_io_channel_set_encoding( io_channel, NULL, &error ); + if ( io_status == G_IO_STATUS_NORMAL ) + g_print( "[%s] Setting encoding to binary data socket).\n", __FUNCTION__ ); + + io_cond = g_io_channel_get_buffer_condition( io_channel ); + + io_cond_control = g_io_channel_get_buffer_condition( io_channel ); + + if ( transfer->sock == NULL || ( io_status == G_IO_STATUS_ERROR ) ) + { + g_printerr( "gmyth_file_transfer_read(): Called with no raw socket.\n" ); + recv = -1; + goto cleanup; + } + + if ( transfer->control_sock == NULL || ( io_status_control == G_IO_STATUS_ERROR ) ) + { + g_printerr( "gmyth_file_transfer_read(): Called with no control socket.\n" ); + recv = -1; + goto cleanup; + } + + /* + if (!controlSock->isOpen() || controlSock->error()) + return -1; + */ + + if ( ( io_cond & G_IO_IN ) != 0 ) { + do + { + trash = g_new0( gchar, GMYTHTV_BUFFER_SIZE ); + + io_status = g_io_channel_read_chars( io_channel, trash, + GMYTHTV_BUFFER_SIZE, &bytes_read, &error); + + g_print( "[%s] cleaning buffer on IO binary channel... %d bytes gone!\n", + __FUNCTION__, bytes_read ); + + if ( trash != NULL ) + g_free( trash ); + + io_cond = g_io_channel_get_buffer_condition( io_channel ); + + } while ( ( io_cond & G_IO_IN ) != 0 && ( io_status != G_IO_STATUS_ERROR ) && (error == NULL) ); + + //if ( trash!= NULL ) + // g_free( trash ); + } + + if ( ( io_cond_control & G_IO_IN ) != 0 ) { + GMythStringList *strlist_tmp = gmyth_string_list_new(); + gmyth_socket_read_stringlist( transfer->control_sock, strlist_tmp ); + g_object_unref( strlist_tmp ); + } + + wait_to_transfer = 0; + + //while ( transfer->live_tv && ( gmyth_file_transfer_get_file_position( transfer ) < 4096 ) && + // wait_to_transfer++ < GMYTHTV_TRANSFER_MAX_WAITS ) + // g_usleep( 1000*50 ); /* waits just for 2/10 second */ + + //g_thread_create( myth_init_io_watchers, (void*)transfer, FALSE, NULL ); + //g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() ); + + //g_static_mutex_lock (&mutex); + //strlist = gmyth_string_list_new(); + + g_string_printf ( transfer->query, "%s %d", + /*transfer->live_tv ? GMYTHTV_RECORDER_HEADER :*/ GMYTHTV_QUERY_HEADER, + /* transfer->live_tv ? transfer->card_id :*/ transfer->recordernum ); // transfer->recordernum + g_print( "\t[%s] Transfer_query = %s\n", __FUNCTION__, transfer->query->str ); + + sent = size; + remaining = size - recv; + //g_static_mutex_unlock( &mutex ); + //data = (void*)g_new0( gchar, size ); + + //g_io_channel_flush( io_channel, NULL ); + + //g_static_mutex_lock( &mutex ); + + io_cond = g_io_channel_get_buffer_condition( io_channel ); + + while ( recv < size && !response )//&& ( io_cond & G_IO_IN ) != 0 ) + { + g_io_channel_flush( io_channel_control, NULL ); + + strlist = gmyth_string_list_new(); + gmyth_string_list_append_char_array( strlist, transfer->query->str ); + gmyth_string_list_append_char_array( strlist, + /*transfer->live_tv ? "REQUEST_BLOCK_RINGBUF" :*/ "REQUEST_BLOCK" ); + gmyth_string_list_append_int( strlist, remaining ); + gmyth_socket_write_stringlist( transfer->control_sock, strlist ); + + guint count_bytes = 0; + + do + { + //buf_len = ( sent - recv ) > GMYTHTV_BUFFER_SIZE ? GMYTHTV_BUFFER_SIZE : ( sent - recv ); + if ( remaining > GMYTHTV_BUFFER_SIZE ) { + buf_len = GMYTHTV_BUFFER_SIZE; + } else { + buf_len = remaining; + } + + bytes_read = 0; + + io_status = g_io_channel_read_chars( io_channel, data + recv, + buf_len, &bytes_read, &error ); + + //g_static_mutex_unlock( &mutex ); + /* + GString *sss = g_string_new(""); + sss = g_string_append_len( sss, (gchar*)data+recv, bytes_read ); + + g_print( "[%s] Reading buffer (length = %d)\n", __FUNCTION__, bytes_read); + */ + if ( bytes_read > 0 ) + { + //if ( bytes_read <= buf_len ) + recv += bytes_read; + count_bytes += bytes_read; + remaining -= bytes_read; + g_print( "[%s] Reading buffer (bytes read = %d, remaining = %d)\n", __FUNCTION__, bytes_read, remaining ); + if ( remaining == 0 ) { + break; + } + } else { + break; + } + + //if ( remaining > 0 ) { + + if ( io_status == G_IO_STATUS_EOF ) { + g_print( "[%s] got EOS!", __FUNCTION__ ); + break; + } else if ( io_status == G_IO_STATUS_ERROR ) { + g_print( "[%s] gmyth_file_transfer_read(): socket error.\n", __FUNCTION__ ); + break; + } + //} + + /* increase buffer size, to allow get more data (do not obey to the buffer size) */ + if ( read_unlimited == TRUE ) { + // FOR NOW, DO NOTHING!!! + //if ( recv > buf_len ) + // sent += (bytes_read - buf_len) + 1; + } + + /* verify if the input (read) buffer is ready to receive data */ + io_cond = g_io_channel_get_buffer_condition( io_channel ); + + g_print( "[%s]\t io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__, + ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); + + //if ( recv == size ) + //break; + + } while ( remaining > 0 );//&& ( io_status == G_IO_STATUS_NORMAL ) ); + + // if ( ( recv < size ) ) { + // finish_read = FALSE; + //} + + io_cond_control = g_io_channel_get_buffer_condition( io_channel_control ); + if ( remaining == 0 )//( io_cond_control & G_IO_IN ) != 0 ) + { + gmyth_socket_read_stringlist( transfer->control_sock, strlist ); + if ( strlist != NULL && gmyth_string_list_length( strlist ) > 0 ) + { + sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error + g_print( "[%s] got SENT buffer message = %d\n", __FUNCTION__, sent ); + if ( sent != 0 ) + { + g_print( "[%s]\t received = %d bytes, backend says %d bytes sent, "\ + "io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__, + recv, sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); + + if ( sent == count_bytes ) + { + response = ( recv == size ); + g_print( "[%s]\t\tsent %d, which is equals to bytes_read = %d\n\n", + __FUNCTION__, sent, count_bytes ); + if ( response == TRUE ) + break; + } + else + { + g_print( "[%s]\t\tsent %d, which is NOT equals to bytes_read = %d\n\n", + __FUNCTION__, sent, count_bytes ); + goto cleanup; + //response = FALSE; + //break; + } + } else { + break; + //goto cleanup; + } // if + } // if - reading control response from backend + } else { + response = FALSE; + } // if - stringlist response + + } // while + + io_cond_control = g_io_channel_get_buffer_condition( io_channel_control ); + // io_cond = g_io_channel_get_buffer_condition( io_channel ); + + if ( ( ( io_cond_control & G_IO_IN ) != 0 ) && + ( response || ( recv == size ) ) ) + { + if ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 ) + { + if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 ) + { + sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error + g_print( "[%s]\t received = %d bytes -\tNOW returning from reading buffer I/O socket "\ + "[%s prepared for reading]! (G_IO_IN) !!!\n\n", __FUNCTION__, + sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" ); + } + } + else + { + g_printerr ( "gmyth_file_transfer_read(): No response from control socket."); + recv = -1; + } + + } + else if ( error != NULL ) + { + g_printerr( "[%s] Error occurred: (%d, %s)\n", __FUNCTION__, error->code, error->message ); + } + +cleanup: + //g_static_mutex_unlock (&mutex); + + if ( trash != NULL ) + g_free( trash ); + + if ( strlist != NULL ) + g_object_unref( strlist ); + + g_print( "gmyth_file_transfer_read(): reqd=%d, rcvd=%d, rept=%d, "\ + "(rcvd and rept MUST be the same!)\n", size, + recv, sent ); + + //if ( ( recv != size ) || ( sent != size ) ) { + //recv = size; + //} + + if ( error != NULL ) { + g_printerr( "Cleaning-up ERROR: %s [msg = %s, code = %d]\n", __FUNCTION__, error->message, + error->code ); + g_error_free( error ); + } + + return recv; +} + +void +gmyth_file_transfer_settimeout( GMythFileTransfer *transfer, gboolean fast ) +{ + + GMythStringList *strlist = NULL; + + if ( transfer->timeoutisfast == fast ) + return; + + if ( transfer->sock == NULL ) + { + g_printerr( "gmyth_file_transfer_settimeout(): Called with no socket" ); + return; + } + + if ( transfer->control_sock == NULL ) + return; + + strlist = gmyth_string_list_new(); + gmyth_string_list_append_string( strlist, transfer->query ); + gmyth_string_list_append_char_array( strlist, "SET_TIMEOUT" ); + gmyth_string_list_append_int( strlist, fast ); + + gmyth_socket_write_stringlist( transfer->control_sock, strlist ); + gmyth_socket_read_stringlist( transfer->control_sock, strlist ); + + transfer->timeoutisfast = fast; + +} + +#ifdef DO_TESTING + + int +main( int argc, char *argv[] ) +{ + g_type_init(); + + GMythFileTransfer *file_transfer = gmyth_file_transfer_new( 1, + g_string_new("myth://192.168.1.109:6543/jshks.nuv"), -1, GMYTHTV_VERSION ); + gmyth_file_transfer_setup( &file_transfer ); + gchar *data = g_strdup(""); + + gint num = gmyth_file_transfer_read( file_transfer, data, -1 ); + + return 0; + +} + +#endif