[svn r38] Added program chain backend events.
1 /* vim: set sw=2: -*- Mode: C; tab-width: 2; indent-tabs-mode: t; c-basic-offset: 2; c-indent-level: 2-*- */
3 * GStreamer plug-in properties:
4 * - location (backend server hostname/URL) [ex.: myth://192.168.1.73:28722/1000_1092091.nuv]
5 * - path (qurl - remote file to be opened)
7 * @author Rosfran Lins Borges <rosfran.borges@indt.org.br>
10 #include "myth_file_transfer.h"
12 #include "myth_livetv.h"
13 #include <gmyth/gmyth_util.h>
14 #include <gmyth/gmyth_socket.h>
15 #include <gmyth/gmyth_stringlist.h>
20 #include <arpa/inet.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
27 #define MYTHTV_QUERY_HEADER "QUERY_FILETRANSFER"
28 #define MYTHTV_RECORDER_HEADER "QUERY_RECORDER"
30 /* default values to the file transfer parameters */
31 #define MYTHTV_USER_READ_AHEAD FALSE
32 #define MYTHTV_RETRIES 1
33 #define MYTHTV_FILE_SIZE -1
35 #define MYTHTV_BUFFER_SIZE 8*1024
37 #define MYTHTV_VERSION 30
39 #define MYTHTV_TRANSFER_MAX_WAITS 700
41 #ifdef MYTHTV_ENABLE_DEBUG
42 #define MYTHTV_ENABLE_DEBUG 1
44 #undef MYTHTV_ENABLE_DEBUG
47 /* this NDEBUG is to maintain compatibility with GMyth library */
49 #define MYTHTV_ENABLE_DEBUG 1
52 static guint wait_to_transfer = 0;
54 enum myth_sock_types {
55 MYTH_PLAYBACK_TYPE = 0,
57 MYTH_FILETRANSFER_TYPE,
61 static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
63 static GMainContext *io_watcher_context = NULL;
65 static void myth_file_transfer_class_init (MythFileTransferClass *klass);
66 static void myth_file_transfer_init (MythFileTransfer *object);
68 static void myth_file_transfer_dispose (GObject *object);
69 static void myth_file_transfer_finalize (GObject *object);
71 static GMythSocket *myth_connect_to_transfer_backend( MythFileTransfer **transfer, guint sock_type );
72 static void* myth_init_io_watchers( void *data );
74 void myth_file_transfer_close( MythFileTransfer *transfer );
76 G_DEFINE_TYPE(MythFileTransfer, myth_file_transfer, G_TYPE_OBJECT)
80 mmyth_util_decode_long_long( GMythStringList *strlist, guint offset )
83 guint64 ret_value = 0LL;
85 g_return_val_if_fail( strlist != NULL, ret_value );
87 if ( offset < gmyth_string_list_length( strlist ))
88 g_printerr( "[%s] Offset is lower than the GMythStringList (offset = %d)!\n", __FUNCTION__, offset );
89 g_return_val_if_fail( offset < gmyth_string_list_length( strlist ), ret_value );
91 gint l1 = gmyth_string_list_get_int( strlist, offset );
92 gint l2 = gmyth_string_list_get_int( strlist, offset + 1 );
94 ret_value = ((guint64)(l2) & 0xffffffffLL) | ((guint64)(l1) << 32);
102 myth_file_transfer_class_init (MythFileTransferClass *klass)
104 GObjectClass *gobject_class;
106 gobject_class = (GObjectClass *) klass;
108 gobject_class->dispose = myth_file_transfer_dispose;
109 gobject_class->finalize = myth_file_transfer_finalize;
113 myth_file_transfer_init (MythFileTransfer *myth_file_transfer)
115 g_return_if_fail( myth_file_transfer != NULL );
116 myth_file_transfer->mythtv_version = MYTHTV_VERSION;
120 myth_file_transfer_dispose (GObject *object)
122 MythFileTransfer *myth_file_transfer = MYTH_FILE_TRANSFER(object);
124 myth_file_transfer_close( myth_file_transfer );
126 G_OBJECT_CLASS (myth_file_transfer_parent_class)->dispose (object);
130 myth_file_transfer_finalize (GObject *object)
132 g_signal_handlers_destroy (object);
134 G_OBJECT_CLASS (myth_file_transfer_parent_class)->finalize (object);
138 myth_file_transfer_new (gint num, GString *uri_str, gshort port, gint mythtv_version)
140 MythFileTransfer *transfer = MYTH_FILE_TRANSFER ( g_object_new (
141 MYTH_FILE_TRANSFER_TYPE, FALSE ));
143 if ( mythtv_version > 0 )
144 transfer->mythtv_version = mythtv_version;
146 transfer->card_id = num;
148 transfer->rec_id = -1;
150 transfer->recordernum = num;
151 transfer->uri = myth_uri_new ( uri_str->str );
153 transfer->hostname = g_string_new( myth_uri_gethost(transfer->uri) );
154 g_print( "\t--> transfer->hostname = %s\n", transfer->hostname->str );
157 transfer->port = port;
159 transfer->port = myth_uri_getport( transfer->uri );
161 g_print( "\t--> transfer->port = %d\n", transfer->port );
163 transfer->readposition = 0;
164 transfer->filesize = MYTHTV_FILE_SIZE;
165 transfer->timeoutisfast = FALSE;
167 transfer->userreadahead = MYTHTV_USER_READ_AHEAD;
168 transfer->retries = MYTHTV_RETRIES;
170 transfer->live_tv = FALSE;
172 transfer->query = g_string_new( MYTHTV_QUERY_HEADER );
173 g_string_append_printf ( transfer->query, " %d", transfer->recordernum );
174 g_print( "\t--> transfer->query = %s\n", transfer->query->str );
176 transfer->control_sock = NULL;
177 transfer->event_sock = NULL;
178 transfer->sock = NULL;
184 myth_file_transfer_livetv_setup( MythFileTransfer **transfer, GMythSocket *live_socket )
186 (*transfer)->sock = live_socket;
187 g_object_ref( live_socket );
193 myth_file_transfer_playback_setup( MythFileTransfer **transfer, gboolean live_tv )
198 (*transfer)->live_tv = live_tv;
200 printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" );
202 /* configure the control socket */
203 if ((*transfer)->control_sock == NULL) {
205 if ( myth_connect_to_transfer_backend ( transfer, MYTH_PLAYBACK_TYPE ) == NULL ) {
206 g_printerr( "Connection to backend failed (Control Socket).\n" );
211 g_warning("Remote transfer control socket already created.\n");
219 myth_file_transfer_setup( MythFileTransfer **transfer, gboolean live_tv )
221 GMythStringList *strlist = NULL;
225 (*transfer)->live_tv = live_tv;
227 printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" );
230 /* configure the control socket */
231 if ((*transfer)->event_sock == NULL) {
233 if ( myth_connect_to_transfer_backend ( transfer, MYTH_MONITOR_TYPE ) == NULL ) {
234 g_printerr( "Connection to backend failed (Event Socket).\n" );
239 g_warning("Remote transfer control socket already created.\n");
243 /* configure the socket */
244 if ( (*transfer)->sock == NULL ) {
246 //if ( live_tv == FALSE ) {
248 if ( myth_connect_to_transfer_backend ( transfer, MYTH_FILETRANSFER_TYPE ) == NULL ) {
249 g_printerr ("Connection to backend failed (Raw Transfer Socket).\n");
253 if ( !(*transfer)->live_tv && (*transfer)->control_sock != NULL) {
254 strlist = gmyth_string_list_new();
255 g_string_printf ( (*transfer)->query, "%s %d", MYTHTV_QUERY_HEADER, (*transfer)->recordernum );
257 gmyth_string_list_append_string( strlist, (*transfer)->query );
258 gmyth_string_list_append_char_array( strlist, "IS_OPEN" );
260 gmyth_socket_write_stringlist( (*transfer)->control_sock, strlist );
261 gmyth_socket_read_stringlist( (*transfer)->control_sock, strlist );
263 if ( strlist!=NULL && gmyth_string_list_get_int( strlist, 0 ) == 1 ) {
264 g_print( "[%s] Remote Myth FileTransfer socket is open!\n", __FUNCTION__ );
266 g_print( "[%s] Remote Myth FileTransfer socket is CLOSED! See the MythTV Server Backend for configuration details...\n", __FUNCTION__ );
272 g_warning("Remote transfer (raw) socket already created.\n");
279 myth_connect_to_transfer_backend( MythFileTransfer **transfer, guint sock_type )
281 GMythSocket *sock = NULL;
283 g_return_val_if_fail( transfer != NULL && *transfer != NULL, NULL );
284 g_return_val_if_fail( (*transfer)->uri != NULL, NULL );
286 g_static_mutex_lock (&mutex);
288 gchar *path_dir = myth_uri_getpath( (*transfer)->uri );
289 //g_print( "\t--> %s: path_dir = %s\n", __FUNCTION__, path_dir );
291 gchar *stype = g_strdup( "" );
293 // if ( (*transfer)->live_tv == FALSE ) {
295 sock = gmyth_socket_new();
297 gmyth_socket_connect( &sock, (*transfer)->hostname->str, (*transfer)->port );
301 sock = (*transfer)->sock;
304 #ifdef MYTHTV_ENABLE_DEBUG
306 g_print( "[%s] --> Creating socket... (%s, %d)\n", __FUNCTION__, (*transfer)->hostname->str, (*transfer)->port );
309 GMythStringList *strlist = NULL;
311 GString *hostname = g_string_new( myth_uri_gethost( (*transfer)->uri ) );
312 GString *base_str = g_string_new( "" );
314 if ( gmyth_socket_check_protocol_version_number (sock, (*transfer)->mythtv_version) ) {
317 stype = (sock_type==MYTH_PLAYBACK_TYPE) ? "control socket" : "file data socket";
318 g_printerr( "FileTransfer, open_socket(%s): \n"
319 "\t\t\tCould not connect to server \"%s\" @ port %d\n", stype,
320 (*transfer)->hostname->str, (*transfer)->port );
321 g_object_unref(sock);
322 g_static_mutex_unlock (&mutex);
326 hostname = gmyth_socket_get_local_hostname();
328 g_print( "[%s] local hostname = %s\n", __FUNCTION__, hostname->str );
330 if ( sock_type == MYTH_PLAYBACK_TYPE )
332 (*transfer)->control_sock = sock;
333 g_string_printf( base_str, "ANN Playback %s %d", hostname->str, TRUE );
335 gmyth_socket_send_command( (*transfer)->control_sock, base_str );
336 GString *resp = gmyth_socket_receive_response( (*transfer)->control_sock );
337 g_print( "[%s] Got Playback response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
339 else if ( sock_type == MYTH_MONITOR_TYPE )
341 (*transfer)->event_sock = sock;
342 g_string_printf( base_str, "ANN Monitor %s %d", hostname->str, TRUE );
344 gmyth_socket_send_command( (*transfer)->event_sock, base_str );
345 GString *resp = gmyth_socket_receive_response( (*transfer)->event_sock );
346 g_print( "[%s] Got Monitor response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
347 //g_thread_create( myth_init_io_watchers, (void*)(*transfer), FALSE, NULL );
348 myth_init_io_watchers ( (void*)(*transfer) );
350 g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() );
353 else if ( sock_type == MYTH_FILETRANSFER_TYPE )
355 (*transfer)->sock = sock;
356 strlist = gmyth_string_list_new();
357 //g_string_printf( base_str, "ANN FileTransfer %s %d %d", hostname->str,
358 // transfer->userreadahead, transfer->retries );
359 g_string_printf( base_str, "ANN FileTransfer %s", hostname->str );
361 gmyth_string_list_append_string( strlist, base_str );
362 gmyth_string_list_append_char_array( strlist, path_dir );
364 gmyth_socket_write_stringlist( (*transfer)->sock, strlist );
365 gmyth_socket_read_stringlist( (*transfer)->sock, strlist );
367 /* socket number, where all the stream data comes from - got from the MythTV remote backend */
368 (*transfer)->recordernum = gmyth_string_list_get_int( strlist, 1 );
370 /* Myth URI stream file size - decoded using two 8-bytes sequences (64 bits/long long types) */
371 (*transfer)->filesize = gmyth_util_decode_long_long( strlist, 2 );
373 printf( "[%s] ***** Received: recordernum = %d, filesize = %" G_GUINT64_FORMAT "\n", __FUNCTION__,
374 (*transfer)->recordernum, (*transfer)->filesize );
376 if ( (*transfer)->filesize <= 0 ) {
377 g_print( "[%s] Got filesize equals to %llu is lesser than 0 [invalid stream file]\n", __FUNCTION__, (*transfer)->filesize );
378 g_object_unref(sock);
382 else if ( sock_type == MYTH_RINGBUFFER_TYPE )
384 (*transfer)->sock = sock;
385 //myth_file_transfer_spawntv( (*transfer), NULL );
387 strlist = gmyth_string_list_new();
388 g_string_printf( base_str, "ANN RingBuffer %s %d", hostname->str, (*transfer)->card_id );
390 gmyth_socket_send_command( (*transfer)->sock, base_str );
391 GString *resp = gmyth_socket_receive_response( (*transfer)->sock );
392 g_print( "[%s] Got RingBuffer response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
398 printf("[%s] ANN %s sent: %s\n", (sock_type==MYTH_PLAYBACK_TYPE) ? "Playback" : (sock_type==MYTH_FILETRANSFER_TYPE) ? "FileTransfer" : "Monitor", __FUNCTION__, base_str->str);
400 if ( strlist != NULL )
401 g_object_unref( strlist );
403 g_static_mutex_unlock (&mutex);
409 myth_file_transfer_spawntv ( MythFileTransfer *file_transfer,
410 GString *tvchain_id )
412 GMythStringList *str_list;
414 g_debug ("myth_file_transfer_spawntv.\n");
416 str_list = gmyth_string_list_new ();
418 g_string_printf( file_transfer->query, "%s %d", MYTHTV_RECORDER_HEADER,
419 file_transfer->card_id );
420 gmyth_string_list_append_string (str_list, file_transfer->query);
421 gmyth_string_list_append_string (str_list, g_string_new ("SPAWN_LIVETV"));
422 if (tvchain_id!=NULL) {
423 gmyth_string_list_append_string (str_list, tvchain_id);
424 gmyth_string_list_append_int (str_list, FALSE); // PIP = FALSE (0)
427 gmyth_socket_sendreceive_stringlist ( file_transfer->sock, str_list );
429 //GString *str = NULL;
431 //if (str_list!=NULL && (str = gmyth_string_list_get_string( str_list, 0 )) != NULL && strcasecmp( str->str, "ok" ) != 0 ) {
432 // g_print( "[%s]\t\tSpawnLiveTV is OK!\n", __FUNCTION__ );
435 g_object_unref (str_list);
440 myth_file_transfer_is_recording ( MythFileTransfer *file_transfer )
444 GMythStringList *str_list = gmyth_string_list_new ();
446 g_debug ( "[%s]\n", __FUNCTION__ );
447 g_static_mutex_lock (&mutex);
449 g_string_printf( file_transfer->query, "%s %d", MYTHTV_RECORDER_HEADER,
450 file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id );
451 gmyth_string_list_append_string (str_list, file_transfer->query);
452 gmyth_string_list_append_string (str_list, g_string_new ("IS_RECORDING"));
454 gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list );
456 if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 )
459 if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strcmp( str->str, "bad" )!= 0 ) {
460 gint is_rec = gmyth_string_list_get_int( str_list, 0 );
467 g_print( "[%s] %s, stream is %s being recorded!\n", __FUNCTION__, ret ? "YES" : "NO", ret ? "" : "NOT" );
468 g_static_mutex_unlock (&mutex);
470 if ( str_list != NULL )
471 g_object_unref (str_list);
478 myth_file_transfer_get_file_position ( MythFileTransfer *file_transfer )
482 GMythStringList *str_list = gmyth_string_list_new ();
484 g_debug ( "[%s]\n", __FUNCTION__ );
485 g_static_mutex_lock (&mutex);
487 g_string_printf( file_transfer->query, "%s %d", MYTHTV_RECORDER_HEADER,
488 file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id );
490 gmyth_string_list_append_string (str_list, file_transfer->query);
491 gmyth_string_list_append_string (str_list, g_string_new ("GET_FILE_POSITION"));
493 gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list );
495 if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 )
498 if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strcmp ( str->str, "bad" ) != 0 )
499 pos = gmyth_util_decode_long_long( str_list, 0 );
501 g_static_mutex_unlock (&mutex);
503 #ifndef MYTHTV_ENABLE_DEBUG
505 g_print( "[%s] Got file position = %llu\n", __FUNCTION__, pos );
508 g_object_unref (str_list);
515 myth_file_transfer_get_recordernum( MythFileTransfer *transfer )
517 return transfer->recordernum;
521 myth_file_transfer_get_filesize( MythFileTransfer *transfer )
523 return transfer->filesize;
527 myth_file_transfer_isopen( MythFileTransfer *transfer )
529 return (transfer->sock != NULL && transfer->control_sock != NULL);
533 myth_file_transfer_close( MythFileTransfer *transfer )
535 GMythStringList *strlist;
537 if (transfer->control_sock == NULL)
540 strlist = gmyth_string_list_new( );
542 g_string_printf( transfer->query, "%s %d", MYTHTV_QUERY_HEADER,
543 transfer->recordernum );
544 gmyth_string_list_append_string( strlist, transfer->query );
545 gmyth_string_list_append_char_array( strlist, "DONE" );
548 if ( gmyth_socket_sendreceive_stringlist(transfer->control_sock, strlist) <= 0 )
550 g_printerr( "Remote file timeout.\n" );
555 g_object_unref( transfer->sock );
556 transfer->sock = NULL;
559 if (transfer->control_sock)
561 g_object_unref( transfer->control_sock );
562 transfer->control_sock = NULL;
568 myth_file_transfer_reset_controlsock( MythFileTransfer *transfer )
570 if (transfer->control_sock == NULL)
572 g_printerr( "myth_file_transfer_reset_controlsock(): Called with no control socket" );
576 GString *str = gmyth_socket_receive_response( transfer->control_sock );
578 g_string_free( str, TRUE );
582 myth_file_transfer_reset_sock( MythFileTransfer *transfer )
584 if ( transfer->sock == NULL )
586 g_printerr( "myth_file_transfer_reset_sock(): Called with no raw socket" );
590 GString *str = gmyth_socket_receive_response( transfer->sock );
592 g_string_free( str, TRUE );
596 myth_file_transfer_reset( MythFileTransfer *transfer )
598 myth_file_transfer_reset_controlsock( transfer );
599 myth_file_transfer_reset_sock( transfer );
603 myth_file_transfer_seek(MythFileTransfer *transfer, guint64 pos, gint whence)
605 if (transfer->sock == NULL)
607 g_printerr( "[%s] myth_file_transfer_seek(): Called with no socket", __FUNCTION__ );
611 if (transfer->control_sock == NULL)
614 // if (!controlSock->isOpen() || controlSock->error())
617 GMythStringList *strlist = gmyth_string_list_new();
618 g_string_printf (transfer->query, "%s %d", MYTHTV_QUERY_HEADER, transfer->recordernum);
619 gmyth_string_list_append_string( strlist, transfer->query );
620 gmyth_string_list_append_char_array( strlist, "SEEK" );
621 gmyth_string_list_append_uint64( strlist, pos );
623 gmyth_string_list_append_int( strlist, whence );
626 gmyth_string_list_append_uint64( strlist, pos );
628 gmyth_string_list_append_uint64( strlist, transfer->readposition );
630 gmyth_socket_sendreceive_stringlist( transfer->control_sock, strlist );
632 guint64 retval = gmyth_string_list_get_uint64(strlist, 0);
633 transfer->readposition = retval;
634 g_print( "[%s] got reading position pointer from the streaming = %llu\n",
635 __FUNCTION__, retval );
637 //myth_file_transfer_reset( transfer );
643 myth_control_sock_listener( GIOChannel *source, GIOCondition condition, gpointer data )
648 gchar *msg = g_strdup("");
650 g_static_mutex_lock( &mutex );
653 if (condition & G_IO_HUP)
654 g_error ("Read end of pipe died!\n");
655 ret = g_io_channel_read_line ( source, &msg, &len, NULL, &err);
656 if ( ret == G_IO_STATUS_ERROR )
657 g_error ("[%s] Error reading: %s\n", __FUNCTION__, err != NULL ? err->message : "" );
658 g_print ("\n\n\n\n\n\n[%s]\t\tEVENT: Read %u bytes: %s\n\n\n\n\n", __FUNCTION__, len, msg != NULL ? msg : "" );
662 g_static_mutex_unlock( &mutex );
669 myth_init_io_watchers( void *data )
671 MythFileTransfer *transfer = (MythFileTransfer*)data;
672 io_watcher_context = g_main_context_new();
673 GMainLoop *loop = g_main_loop_new( NULL, FALSE );
675 GSource *source = NULL;
677 if ( transfer->event_sock->sd_io_ch != NULL )
678 source = g_io_create_watch( transfer->event_sock->sd_io_ch, G_IO_IN | G_IO_HUP );
682 g_source_set_callback ( source, (GSourceFunc)myth_control_sock_listener, NULL, NULL );
684 g_source_attach( source, io_watcher_context );
687 g_printerr( "[%s] Error adding watch listener function to the IO control channel!\n", __FUNCTION__ );
691 g_print( "[%s]\tOK! Starting listener on the MONITOR event socket...\n", __FUNCTION__ );
693 g_main_loop_run( loop );
696 if ( source != NULL )
697 g_source_unref( source );
699 g_main_loop_unref( loop );
701 g_main_context_unref( io_watcher_context );
708 myth_file_transfer_read(MythFileTransfer *transfer, void *data, gint size, gboolean read_unlimited)
711 gsize bytes_read = 0;
714 gboolean response = FALSE;
716 GIOChannel *io_channel;
717 GIOChannel *io_channel_control;
719 GIOCondition io_cond;
720 GIOCondition io_cond_control;
721 GIOStatus io_status = G_IO_STATUS_NORMAL, io_status_control = G_IO_STATUS_NORMAL;
723 gint buf_len = MYTHTV_BUFFER_SIZE;
725 GMythStringList *strlist = NULL;
726 GError *error = NULL;
728 gchar *trash = g_strdup("");
730 g_return_val_if_fail ( data != NULL, -2 );
732 /* gets the size of the entire file, if the size requested is lesser than 0 */
734 size = transfer->filesize;
736 io_channel = transfer->sock->sd_io_ch;
737 io_channel_control = transfer->control_sock->sd_io_ch;
739 //g_io_channel_set_flags( io_channel, G_IO_FLAG_APPEND |
740 // G_IO_STATUS_AGAIN | G_IO_FLAG_IS_READABLE | G_IO_FLAG_IS_WRITEABLE |
741 // G_IO_FLAG_IS_SEEKABLE, NULL );
743 io_status = g_io_channel_set_encoding( io_channel, NULL, &error );
744 if ( io_status == G_IO_STATUS_NORMAL )
745 g_print( "[%s] Setting encoding to binary data socket).\n", __FUNCTION__ );
747 io_cond = g_io_channel_get_buffer_condition( io_channel );
749 io_cond_control = g_io_channel_get_buffer_condition( io_channel );
751 if ( transfer->sock == NULL || ( io_status == G_IO_STATUS_ERROR ) )
753 g_printerr( "myth_file_transfer_read(): Called with no raw socket.\n" );
758 if ( transfer->control_sock == NULL || ( io_status_control == G_IO_STATUS_ERROR ) )
760 g_printerr( "myth_file_transfer_read(): Called with no control socket.\n" );
766 if (!controlSock->isOpen() || controlSock->error())
770 if ( ( io_cond & G_IO_IN ) != 0 ) {
773 trash = g_new0( gchar, MYTHTV_BUFFER_SIZE );
775 io_status = g_io_channel_read_chars( io_channel, trash,
776 MYTHTV_BUFFER_SIZE, &bytes_read, &error);
778 g_print( "[%s] cleaning buffer on IO binary channel... %d bytes gone!\n",
779 __FUNCTION__, bytes_read );
784 io_cond = g_io_channel_get_buffer_condition( io_channel );
786 } while ( ( io_cond & G_IO_IN ) != 0 && ( io_status != G_IO_STATUS_ERROR ) && (error == NULL) );
788 //if ( trash!= NULL )
792 if ( ( io_cond_control & G_IO_IN ) != 0 ) {
793 GMythStringList *strlist_tmp = gmyth_string_list_new();
794 gmyth_socket_read_stringlist( transfer->control_sock, strlist_tmp );
795 g_object_unref( strlist_tmp );
798 wait_to_transfer = 0;
800 //while ( transfer->live_tv && ( myth_file_transfer_get_file_position( transfer ) < 4096 ) &&
801 // wait_to_transfer++ < MYTHTV_TRANSFER_MAX_WAITS )
802 // g_usleep( 1000*50 ); /* waits just for 2/10 second */
804 //g_thread_create( myth_init_io_watchers, (void*)transfer, FALSE, NULL );
805 //g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() );
807 //g_static_mutex_lock (&mutex);
808 //strlist = gmyth_string_list_new();
810 g_string_printf ( transfer->query, "%s %d",
811 /*transfer->live_tv ? MYTHTV_RECORDER_HEADER :*/ MYTHTV_QUERY_HEADER,
812 /* transfer->live_tv ? transfer->card_id :*/ transfer->recordernum ); // transfer->recordernum
813 g_print( "\t[%s] Transfer_query = %s\n", __FUNCTION__, transfer->query->str );
816 remaining = size - recv;
817 //g_static_mutex_unlock( &mutex );
818 //data = (void*)g_new0( gchar, size );
820 //g_io_channel_flush( io_channel, NULL );
822 //g_static_mutex_lock( &mutex );
824 io_cond = g_io_channel_get_buffer_condition( io_channel );
826 while ( recv < size && !response )//&& ( io_cond & G_IO_IN ) != 0 )
828 g_io_channel_flush( io_channel_control, NULL );
830 strlist = gmyth_string_list_new();
831 gmyth_string_list_append_char_array( strlist, transfer->query->str );
832 gmyth_string_list_append_char_array( strlist,
833 /*transfer->live_tv ? "REQUEST_BLOCK_RINGBUF" :*/ "REQUEST_BLOCK" );
834 gmyth_string_list_append_int( strlist, remaining );
835 gmyth_socket_write_stringlist( transfer->control_sock, strlist );
837 guint count_bytes = 0;
841 //buf_len = ( sent - recv ) > MYTHTV_BUFFER_SIZE ? MYTHTV_BUFFER_SIZE : ( sent - recv );
842 if ( remaining > MYTHTV_BUFFER_SIZE ) {
843 buf_len = MYTHTV_BUFFER_SIZE;
850 io_status = g_io_channel_read_chars( io_channel, data + recv,
851 buf_len, &bytes_read, &error );
853 //g_static_mutex_unlock( &mutex );
855 GString *sss = g_string_new("");
856 sss = g_string_append_len( sss, (gchar*)data+recv, bytes_read );
858 g_print( "[%s] Reading buffer (length = %d)\n", __FUNCTION__, bytes_read);
860 if ( bytes_read > 0 )
862 //if ( bytes_read <= buf_len )
864 count_bytes += bytes_read;
865 remaining -= bytes_read;
866 g_print( "[%s] Reading buffer (bytes read = %d, remaining = %d)\n", __FUNCTION__, bytes_read, remaining );
867 if ( remaining == 0 ) {
874 //if ( remaining > 0 ) {
876 if ( io_status == G_IO_STATUS_EOF ) {
877 g_print( "[%s] got EOS!", __FUNCTION__ );
879 } else if ( io_status == G_IO_STATUS_ERROR ) {
880 g_print( "[%s] myth_file_transfer_read(): socket error.\n", __FUNCTION__ );
885 /* increase buffer size, to allow get more data (do not obey to the buffer size) */
886 if ( read_unlimited == TRUE ) {
887 // FOR NOW, DO NOTHING!!!
888 //if ( recv > buf_len )
889 // sent += (bytes_read - buf_len) + 1;
892 /* verify if the input (read) buffer is ready to receive data */
893 io_cond = g_io_channel_get_buffer_condition( io_channel );
895 g_print( "[%s]\t io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
896 ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
898 //if ( recv == size )
901 } while ( remaining > 0 );//&& ( io_status == G_IO_STATUS_NORMAL ) );
903 // if ( ( recv < size ) ) {
904 // finish_read = FALSE;
907 io_cond_control = g_io_channel_get_buffer_condition( io_channel_control );
908 if ( remaining == 0 )//( io_cond_control & G_IO_IN ) != 0 )
910 gmyth_socket_read_stringlist( transfer->control_sock, strlist );
911 if ( strlist != NULL && gmyth_string_list_length( strlist ) > 0 )
913 sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error
914 g_print( "[%s] got SENT buffer message = %d\n", __FUNCTION__, sent );
917 g_print( "[%s]\t received = %d bytes, backend says %d bytes sent, "\
918 "io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
919 recv, sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
921 if ( sent == count_bytes )
923 response = ( recv == size );
924 g_print( "[%s]\t\tsent %d, which is equals to bytes_read = %d\n\n",
925 __FUNCTION__, sent, count_bytes );
926 if ( response == TRUE )
931 g_print( "[%s]\t\tsent %d, which is NOT equals to bytes_read = %d\n\n",
932 __FUNCTION__, sent, count_bytes );
941 } // if - reading control response from backend
944 } // if - stringlist response
948 io_cond_control = g_io_channel_get_buffer_condition( io_channel_control );
949 // io_cond = g_io_channel_get_buffer_condition( io_channel );
951 if ( ( ( io_cond_control & G_IO_IN ) != 0 ) &&
952 ( response || ( recv == size ) ) )
954 if ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 )
956 if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 )
958 sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error
959 g_print( "[%s]\t received = %d bytes -\tNOW returning from reading buffer I/O socket "\
960 "[%s prepared for reading]! (G_IO_IN) !!!\n\n", __FUNCTION__,
961 sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
966 g_printerr ( "myth_file_transfer_read(): No response from control socket.");
971 else if ( error != NULL )
973 g_printerr( "[%s] Error occurred: (%d, %s)\n", __FUNCTION__, error->code, error->message );
977 //g_static_mutex_unlock (&mutex);
982 if ( strlist != NULL )
983 g_object_unref( strlist );
985 g_print( "myth_file_transfer_read(): reqd=%d, rcvd=%d, rept=%d, "\
986 "(rcvd and rept MUST be the same!)\n", size,
989 //if ( ( recv != size ) || ( sent != size ) ) {
993 if ( error != NULL ) {
994 g_printerr( "Cleaning-up ERROR: %s [msg = %s, code = %d]\n", __FUNCTION__, error->message,
996 g_error_free( error );
1003 myth_file_transfer_settimeout( MythFileTransfer *transfer, gboolean fast )
1006 GMythStringList *strlist = NULL;
1008 if ( transfer->timeoutisfast == fast )
1011 if ( transfer->sock == NULL )
1013 g_printerr( "myth_file_transfer_settimeout(): Called with no socket" );
1017 if ( transfer->control_sock == NULL )
1020 strlist = gmyth_string_list_new();
1021 gmyth_string_list_append_string( strlist, transfer->query );
1022 gmyth_string_list_append_char_array( strlist, "SET_TIMEOUT" );
1023 gmyth_string_list_append_int( strlist, fast );
1025 gmyth_socket_write_stringlist( transfer->control_sock, strlist );
1026 gmyth_socket_read_stringlist( transfer->control_sock, strlist );
1028 transfer->timeoutisfast = fast;
1035 main( int argc, char *argv[] )
1039 MythFileTransfer *file_transfer = myth_file_transfer_new( 1,
1040 g_string_new("myth://192.168.1.109:6543/jshks.nuv"), -1, MYTHTV_VERSION );
1041 myth_file_transfer_setup( &file_transfer );
1042 gchar *data = g_strdup("");
1044 gint num = myth_file_transfer_read( file_transfer, data, -1 );