1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/gst-plugins-mythtv/myth_file_transfer.c Thu Sep 21 15:58:58 2006 +0100
1.3 @@ -0,0 +1,960 @@
1.4 +/* vim: set sw=2: -*- Mode: C; tab-width: 2; indent-tabs-mode: t; c-basic-offset: 2; c-indent-level: 2-*- */
1.5 +/**
1.6 + * GStreamer plug-in properties:
1.7 + * - location (backend server hostname/URL) [ex.: myth://192.168.1.73:28722/1000_1092091.nuv]
1.8 + * - path (qurl - remote file to be opened)
1.9 + * - port number
1.10 + * @author Rosfran Lins Borges <rosfran.borges@indt.org.br>
1.11 + */
1.12 +
1.13 +#include "myth_file_transfer.h"
1.14 +#include "myth_uri.h"
1.15 +#include "myth_livetv.h"
1.16 +#include <gmyth/gmyth_util.h>
1.17 +#include <gmyth/gmyth_socket.h>
1.18 +#include <gmyth/gmyth_stringlist.h>
1.19 +
1.20 +#include <unistd.h>
1.21 +#include <glib.h>
1.22 +
1.23 +#include <arpa/inet.h>
1.24 +#include <sys/types.h>
1.25 +#include <sys/socket.h>
1.26 +#include <netdb.h>
1.27 +#include <errno.h>
1.28 +#include <stdlib.h>
1.29 +
1.30 +#define MYTHTV_QUERY_HEADER "QUERY_FILETRANSFER"
1.31 +#define MYTHTV_RECORDER_HEADER "QUERY_RECORDER"
1.32 +
1.33 +/* default values to the file transfer parameters */
1.34 +#define MYTHTV_USER_READ_AHEAD FALSE
1.35 +#define MYTHTV_RETRIES 1
1.36 +#define MYTHTV_FILE_SIZE -1
1.37 +
1.38 +#define MYTHTV_BUFFER_SIZE 2048
1.39 +
1.40 +#define MYTHTV_VERSION 30
1.41 +
1.42 +#define MYTHTV_TRANSFER_MAX_WAITS 700
1.43 +
1.44 +#ifdef MYTHTV_ENABLE_DEBUG
1.45 +#define MYTHTV_ENABLE_DEBUG 1
1.46 +#else
1.47 +#undef MYTHTV_ENABLE_DEBUG
1.48 +#endif
1.49 +
1.50 +/* this NDEBUG is to maintain compatibility with GMyth library */
1.51 +#ifndef NDEBUG
1.52 +#define MYTHTV_ENABLE_DEBUG 1
1.53 +#endif
1.54 +
1.55 +static guint wait_to_transfer = 0;
1.56 +
1.57 +enum myth_sock_types {
1.58 + MYTH_PLAYBACK_TYPE = 0,
1.59 + MYTH_MONITOR_TYPE,
1.60 + MYTH_FILETRANSFER_TYPE,
1.61 + MYTH_RINGBUFFER_TYPE
1.62 +};
1.63 +
1.64 +static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
1.65 +
1.66 +static void myth_file_transfer_class_init (MythFileTransferClass *klass);
1.67 +static void myth_file_transfer_init (MythFileTransfer *object);
1.68 +
1.69 +static void myth_file_transfer_dispose (GObject *object);
1.70 +static void myth_file_transfer_finalize (GObject *object);
1.71 +
1.72 +static GMythSocket *myth_connect_to_transfer_backend( MythFileTransfer **transfer, guint sock_type );
1.73 +static void* myth_init_io_watchers( void *data );
1.74 +
1.75 +void myth_file_transfer_close( MythFileTransfer *transfer );
1.76 +
1.77 +G_DEFINE_TYPE(MythFileTransfer, myth_file_transfer, G_TYPE_OBJECT)
1.78 +
1.79 +static guint64
1.80 +mmyth_util_decode_long_long( GMythStringList *strlist, guint offset )
1.81 +{
1.82 +
1.83 + guint64 ret_value = 0LL;
1.84 +
1.85 + g_return_val_if_fail( strlist != NULL, ret_value );
1.86 +
1.87 + if ( offset < gmyth_string_list_length( strlist ))
1.88 + g_printerr( "[%s] Offset is lower than the GMythStringList (offset = %d)!\n", __FUNCTION__, offset );
1.89 + g_return_val_if_fail( offset < gmyth_string_list_length( strlist ), ret_value );
1.90 +
1.91 + gint l1 = gmyth_string_list_get_int( strlist, offset );
1.92 + gint l2 = gmyth_string_list_get_int( strlist, offset + 1 );
1.93 +
1.94 + ret_value = ((guint64)(l2) & 0xffffffffLL) | ((guint64)(l1) << 32);
1.95 +
1.96 + return ret_value;
1.97 +
1.98 +}
1.99 +
1.100 +static void
1.101 +myth_file_transfer_class_init (MythFileTransferClass *klass)
1.102 +{
1.103 + GObjectClass *gobject_class;
1.104 +
1.105 + gobject_class = (GObjectClass *) klass;
1.106 +
1.107 + gobject_class->dispose = myth_file_transfer_dispose;
1.108 + gobject_class->finalize = myth_file_transfer_finalize;
1.109 +}
1.110 +
1.111 + static void
1.112 +myth_file_transfer_init (MythFileTransfer *myth_file_transfer)
1.113 +{
1.114 + g_return_if_fail( myth_file_transfer != NULL );
1.115 + myth_file_transfer->mythtv_version = MYTHTV_VERSION;
1.116 +}
1.117 +
1.118 +static void
1.119 +myth_file_transfer_dispose (GObject *object)
1.120 +{
1.121 + MythFileTransfer *myth_file_transfer = MYTH_FILE_TRANSFER(object);
1.122 +
1.123 + myth_file_transfer_close( myth_file_transfer );
1.124 +
1.125 + G_OBJECT_CLASS (myth_file_transfer_parent_class)->dispose (object);
1.126 +}
1.127 +
1.128 + static void
1.129 +myth_file_transfer_finalize (GObject *object)
1.130 +{
1.131 + g_signal_handlers_destroy (object);
1.132 +
1.133 + G_OBJECT_CLASS (myth_file_transfer_parent_class)->finalize (object);
1.134 +}
1.135 +
1.136 + MythFileTransfer*
1.137 +myth_file_transfer_new (gint num, GString *uri_str, gshort port, gint mythtv_version)
1.138 +{
1.139 + MythFileTransfer *transfer = MYTH_FILE_TRANSFER ( g_object_new (
1.140 + MYTH_FILE_TRANSFER_TYPE, FALSE ));
1.141 +
1.142 + if ( mythtv_version > 0 )
1.143 + transfer->mythtv_version = mythtv_version;
1.144 +
1.145 + transfer->card_id = num;
1.146 +
1.147 + transfer->rec_id = -1;
1.148 +
1.149 + transfer->recordernum = 0;
1.150 + transfer->uri = myth_uri_new ( uri_str->str );
1.151 +
1.152 + transfer->hostname = g_string_new( myth_uri_gethost(transfer->uri) );
1.153 + g_print( "\t--> transfer->hostname = %s\n", transfer->hostname->str );
1.154 +
1.155 + if ( port >= 0 )
1.156 + transfer->port = port;
1.157 + else
1.158 + transfer->port = myth_uri_getport( transfer->uri );
1.159 +
1.160 + g_print( "\t--> transfer->port = %d\n", transfer->port );
1.161 +
1.162 + transfer->readposition = 0;
1.163 + transfer->filesize = MYTHTV_FILE_SIZE;
1.164 + transfer->timeoutisfast = FALSE;
1.165 +
1.166 + transfer->userreadahead = MYTHTV_USER_READ_AHEAD;
1.167 + transfer->retries = MYTHTV_RETRIES;
1.168 +
1.169 + transfer->live_tv = FALSE;
1.170 +
1.171 + transfer->query = g_string_new( MYTHTV_QUERY_HEADER );
1.172 + g_string_append_printf ( transfer->query, " %d", transfer->recordernum );
1.173 + g_print( "\t--> transfer->query = %s\n", transfer->query->str );
1.174 +
1.175 + transfer->control_sock = NULL;
1.176 + transfer->event_sock = NULL;
1.177 + transfer->sock = NULL;
1.178 +
1.179 + return transfer;
1.180 +}
1.181 +
1.182 +gboolean
1.183 +myth_file_transfer_livetv_setup( MythFileTransfer **transfer, GMythSocket *live_socket )
1.184 +{
1.185 + (*transfer)->sock = live_socket;
1.186 + g_object_ref( live_socket );
1.187 +
1.188 + return TRUE;
1.189 +}
1.190 +
1.191 +gboolean
1.192 +myth_file_transfer_playback_setup( MythFileTransfer **transfer, gboolean live_tv )
1.193 +{
1.194 +
1.195 + gboolean ret = TRUE;
1.196 +
1.197 + (*transfer)->live_tv = live_tv;
1.198 +
1.199 + printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" );
1.200 +
1.201 + /* configure the control socket */
1.202 + if ((*transfer)->control_sock == NULL) {
1.203 +
1.204 + if ( myth_connect_to_transfer_backend ( transfer, MYTH_PLAYBACK_TYPE ) == NULL ) {
1.205 + g_printerr( "Connection to backend failed (Control Socket).\n" );
1.206 + ret = FALSE;
1.207 + }
1.208 +
1.209 + } else {
1.210 + g_warning("Remote transfer control socket already created.\n");
1.211 + }
1.212 +
1.213 + return ret;
1.214 +
1.215 +}
1.216 +
1.217 +gboolean
1.218 +myth_file_transfer_setup( MythFileTransfer **transfer, gboolean live_tv )
1.219 +{
1.220 + GMythStringList *strlist = NULL;
1.221 +
1.222 + gboolean ret = TRUE;
1.223 +
1.224 + (*transfer)->live_tv = live_tv;
1.225 +
1.226 + printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" );
1.227 +
1.228 +#if 0
1.229 + /* configure the event socket */
1.230 + if ((*transfer)->event_sock == NULL) {
1.231 +
1.232 + if ( myth_connect_to_transfer_backend ( transfer, MYTH_MONITOR_TYPE ) == NULL ) {
1.233 + g_printerr( "Connection to backend failed (Event Socket).\n" );
1.234 + ret = FALSE;
1.235 + }
1.236 +
1.237 + } else {
1.238 + g_warning("Remote transfer control socket already created.\n");
1.239 + }
1.240 +#endif
1.241 +
1.242 + /* configure the socket */
1.243 + if ( (*transfer)->sock == NULL ) {
1.244 +
1.245 + //if ( live_tv == FALSE ) {
1.246 +
1.247 + if ( myth_connect_to_transfer_backend ( transfer, MYTH_FILETRANSFER_TYPE ) == NULL ) {
1.248 + g_printerr ("Connection to backend failed (Raw Transfer Socket).\n");
1.249 + ret = FALSE;
1.250 + }
1.251 +
1.252 + if ( !(*transfer)->live_tv && (*transfer)->control_sock != NULL) {
1.253 + strlist = gmyth_string_list_new();
1.254 + g_string_printf ( (*transfer)->query, "%s %d", MYTHTV_QUERY_HEADER, (*transfer)->recordernum );
1.255 +
1.256 + gmyth_string_list_append_string( strlist, (*transfer)->query );
1.257 + gmyth_string_list_append_char_array( strlist, "IS_OPEN" );
1.258 +
1.259 + gmyth_socket_write_stringlist( (*transfer)->control_sock, strlist );
1.260 + gmyth_socket_read_stringlist( (*transfer)->control_sock, strlist );
1.261 +
1.262 + if ( strlist!=NULL && gmyth_string_list_get_int( strlist, 0 ) == 1 ) {
1.263 + g_print( "[%s] Remote Myth FileTransfer socket is open!\n", __FUNCTION__ );
1.264 + } else {
1.265 + g_print( "[%s] Remote Myth FileTransfer socket is CLOSED! See the MythTV Server Backend for configuration details...\n", __FUNCTION__ );
1.266 + ret = FALSE;
1.267 + }
1.268 + }
1.269 +
1.270 + } else {
1.271 + g_warning("Remote transfer (raw) socket already created.\n");
1.272 + }
1.273 +
1.274 + return ret;
1.275 +}
1.276 +
1.277 +static GMythSocket *
1.278 +myth_connect_to_transfer_backend( MythFileTransfer **transfer, guint sock_type )
1.279 +{
1.280 + GMythSocket *sock = NULL;
1.281 +
1.282 + g_return_val_if_fail( transfer != NULL && *transfer != NULL, NULL );
1.283 + g_return_val_if_fail( (*transfer)->uri != NULL, NULL );
1.284 +
1.285 + g_static_mutex_lock (&mutex);
1.286 +
1.287 + gchar *path_dir = myth_uri_getpath( (*transfer)->uri );
1.288 + //g_print( "\t--> %s: path_dir = %s\n", __FUNCTION__, path_dir );
1.289 +
1.290 + gchar *stype = g_strdup( "" );
1.291 +
1.292 + // if ( (*transfer)->live_tv == FALSE ) {
1.293 +
1.294 + sock = gmyth_socket_new();
1.295 +
1.296 + gmyth_socket_connect( &sock, (*transfer)->hostname->str, (*transfer)->port );
1.297 +
1.298 + /*
1.299 + } else {
1.300 + sock = (*transfer)->sock;
1.301 + }
1.302 + */
1.303 +#ifdef MYTHTV_ENABLE_DEBUG
1.304 +
1.305 + g_print( "[%s] --> Creating socket... (%s, %d)\n", __FUNCTION__, (*transfer)->hostname->str, (*transfer)->port );
1.306 +#endif
1.307 +
1.308 + GMythStringList *strlist = NULL;
1.309 +
1.310 + GString *hostname = g_string_new( myth_uri_gethost( (*transfer)->uri ) );
1.311 + GString *base_str = g_string_new( "" );
1.312 +
1.313 + if ( gmyth_socket_check_protocol_version_number (sock, (*transfer)->mythtv_version) ) {
1.314 +
1.315 + if (sock == NULL) {
1.316 + stype = (sock_type==MYTH_PLAYBACK_TYPE) ? "control socket" : "file data socket";
1.317 + g_printerr( "FileTransfer, open_socket(%s): \n"
1.318 + "\t\t\tCould not connect to server \"%s\" @ port %d\n", stype,
1.319 + (*transfer)->hostname->str, (*transfer)->port );
1.320 + g_object_unref(sock);
1.321 + g_static_mutex_unlock (&mutex);
1.322 + return NULL;
1.323 + }
1.324 +
1.325 + hostname = gmyth_socket_get_local_hostname();
1.326 +
1.327 + g_print( "[%s] local hostname = %s\n", __FUNCTION__, hostname->str );
1.328 +
1.329 + if ( sock_type == MYTH_PLAYBACK_TYPE )
1.330 + {
1.331 + (*transfer)->control_sock = sock;
1.332 + g_string_printf( base_str, "ANN Playback %s %d", hostname->str, FALSE );
1.333 +
1.334 + gmyth_socket_send_command( (*transfer)->control_sock, base_str );
1.335 + GString *resp = gmyth_socket_receive_response( (*transfer)->control_sock );
1.336 + g_print( "[%s] Got Playback response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
1.337 + }
1.338 + else if ( sock_type == MYTH_MONITOR_TYPE )
1.339 + {
1.340 + (*transfer)->event_sock = sock;
1.341 + g_string_printf( base_str, "ANN Monitor %s %d", hostname->str, TRUE );
1.342 +
1.343 + gmyth_socket_send_command( (*transfer)->event_sock, base_str );
1.344 + GString *resp = gmyth_socket_receive_response( (*transfer)->event_sock );
1.345 + g_print( "[%s] Got Monitor response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
1.346 + g_thread_create( myth_init_io_watchers, (void*)(*transfer), FALSE, NULL );
1.347 +
1.348 + g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() );
1.349 +
1.350 + }
1.351 + else if ( sock_type == MYTH_FILETRANSFER_TYPE )
1.352 + {
1.353 + (*transfer)->sock = sock;
1.354 + strlist = gmyth_string_list_new();
1.355 + //g_string_printf( base_str, "ANN FileTransfer %s %d %d", hostname->str,
1.356 + // transfer->userreadahead, transfer->retries );
1.357 + g_string_printf( base_str, "ANN FileTransfer %s", hostname->str );
1.358 +
1.359 + gmyth_string_list_append_string( strlist, base_str );
1.360 + gmyth_string_list_append_char_array( strlist, path_dir );
1.361 +
1.362 + gmyth_socket_write_stringlist( (*transfer)->sock, strlist );
1.363 + gmyth_socket_read_stringlist( (*transfer)->sock, strlist );
1.364 +
1.365 + /* socket number, where all the stream data comes from - got from the MythTV remote backend */
1.366 + (*transfer)->recordernum = gmyth_string_list_get_int( strlist, 1 );
1.367 +
1.368 + /* Myth URI stream file size - decoded using two 8-bytes sequences (64 bits/long long types) */
1.369 + (*transfer)->filesize = mmyth_util_decode_long_long( strlist, 2 );
1.370 +
1.371 + printf( "[%s] ***** Received: recordernum = %d, filesize = %" G_GUINT64_FORMAT "\n", __FUNCTION__,
1.372 + (*transfer)->recordernum, (*transfer)->filesize );
1.373 +
1.374 + if ( (*transfer)->filesize <= 0 ) {
1.375 + g_print( "[%s] Got filesize equals to %llu is lesser than 0 [invalid stream file]\n", __FUNCTION__, (*transfer)->filesize );
1.376 + g_object_unref(sock);
1.377 + sock = NULL;
1.378 + }
1.379 + }
1.380 + else if ( sock_type == MYTH_RINGBUFFER_TYPE )
1.381 + {
1.382 + (*transfer)->sock = sock;
1.383 + //myth_file_transfer_spawntv( (*transfer), NULL );
1.384 +
1.385 + strlist = gmyth_string_list_new();
1.386 + g_string_printf( base_str, "ANN RingBuffer %s %d", hostname->str, (*transfer)->card_id );
1.387 +
1.388 + gmyth_socket_send_command( (*transfer)->sock, base_str );
1.389 + GString *resp = gmyth_socket_receive_response( (*transfer)->sock );
1.390 + g_print( "[%s] Got RingBuffer response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
1.391 +
1.392 + }
1.393 +
1.394 + }
1.395 +
1.396 + printf("[%s] ANN %s sent: %s\n", (sock_type==MYTH_PLAYBACK_TYPE) ? "Playback" : (sock_type==MYTH_FILETRANSFER_TYPE) ? "FileTransfer" : "Monitor", __FUNCTION__, base_str->str);
1.397 +
1.398 + if ( strlist != NULL )
1.399 + g_object_unref( strlist );
1.400 +
1.401 + g_static_mutex_unlock (&mutex);
1.402 +
1.403 + return sock;
1.404 +}
1.405 +
1.406 +void
1.407 +myth_file_transfer_spawntv ( MythFileTransfer *file_transfer,
1.408 + GString *tvchain_id )
1.409 +{
1.410 + GMythStringList *str_list;
1.411 +
1.412 + g_debug ("myth_file_transfer_spawntv.\n");
1.413 +
1.414 + str_list = gmyth_string_list_new ();
1.415 +
1.416 + g_string_printf( file_transfer->query, "%s %d", MYTHTV_RECORDER_HEADER,
1.417 + file_transfer->card_id );
1.418 + gmyth_string_list_append_string (str_list, file_transfer->query);
1.419 + gmyth_string_list_append_string (str_list, g_string_new ("SPAWN_LIVETV"));
1.420 + if (tvchain_id!=NULL) {
1.421 + gmyth_string_list_append_string (str_list, tvchain_id);
1.422 + gmyth_string_list_append_int (str_list, FALSE); // PIP = FALSE (0)
1.423 + }
1.424 +
1.425 + gmyth_socket_sendreceive_stringlist ( file_transfer->sock, str_list );
1.426 +
1.427 + //GString *str = NULL;
1.428 +
1.429 + //if (str_list!=NULL && (str = gmyth_string_list_get_string( str_list, 0 )) != NULL && strcasecmp( str->str, "ok" ) != 0 ) {
1.430 + // g_print( "[%s]\t\tSpawnLiveTV is OK!\n", __FUNCTION__ );
1.431 + //}
1.432 + if (str_list!=NULL)
1.433 + g_object_unref (str_list);
1.434 +
1.435 +}
1.436 +
1.437 +gboolean
1.438 +myth_file_transfer_is_recording ( MythFileTransfer *file_transfer )
1.439 +{
1.440 + gboolean ret = TRUE;
1.441 +
1.442 + GMythStringList *str_list = gmyth_string_list_new ();
1.443 +
1.444 + g_debug ( "[%s]\n", __FUNCTION__ );
1.445 + g_static_mutex_lock (&mutex);
1.446 +
1.447 + g_string_printf( file_transfer->query, "%s %d", MYTHTV_RECORDER_HEADER,
1.448 + file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id );
1.449 + gmyth_string_list_append_string (str_list, file_transfer->query);
1.450 + gmyth_string_list_append_string (str_list, g_string_new ("IS_RECORDING"));
1.451 +
1.452 + gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list );
1.453 +
1.454 + if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 )
1.455 + {
1.456 + GString *str = NULL;
1.457 + if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strcmp( str->str, "bad" )!= 0 ) {
1.458 + gint is_rec = gmyth_string_list_get_int( str_list, 0 );
1.459 + if ( is_rec != 0 )
1.460 + ret = TRUE;
1.461 + else
1.462 + ret = FALSE;
1.463 + }
1.464 + }
1.465 + g_print( "[%s] %s, stream is %s being recorded!\n", __FUNCTION__, ret ? "YES" : "NO", ret ? "" : "NOT" );
1.466 + g_static_mutex_unlock (&mutex);
1.467 +
1.468 + if ( str_list != NULL )
1.469 + g_object_unref (str_list);
1.470 +
1.471 + return ret;
1.472 +
1.473 +}
1.474 +
1.475 +guint64
1.476 +myth_file_transfer_get_file_position ( MythFileTransfer *file_transfer )
1.477 +{
1.478 + guint64 pos = 0;
1.479 +
1.480 + GMythStringList *str_list = gmyth_string_list_new ();
1.481 +
1.482 + g_debug ( "[%s]\n", __FUNCTION__ );
1.483 + g_static_mutex_lock (&mutex);
1.484 +
1.485 + g_string_printf( file_transfer->query, "%s %d", MYTHTV_RECORDER_HEADER,
1.486 + file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id );
1.487 +
1.488 + gmyth_string_list_append_string (str_list, file_transfer->query);
1.489 + gmyth_string_list_append_string (str_list, g_string_new ("GET_FILE_POSITION"));
1.490 +
1.491 + gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list );
1.492 +
1.493 + if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 )
1.494 + {
1.495 + GString *str = NULL;
1.496 + if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strcmp ( str->str, "bad" ) != 0 )
1.497 + pos = gmyth_util_decode_long_long( str_list, 0 );
1.498 + }
1.499 + g_static_mutex_unlock (&mutex);
1.500 +
1.501 +#ifndef MYTHTV_ENABLE_DEBUG
1.502 +
1.503 + g_print( "[%s] Got file position = %llu\n", __FUNCTION__, pos );
1.504 +#endif
1.505 + if (str_list!=NULL)
1.506 + g_object_unref (str_list);
1.507 +
1.508 + return pos;
1.509 +
1.510 +}
1.511 +
1.512 + glong
1.513 +myth_file_transfer_get_recordernum( MythFileTransfer *transfer )
1.514 +{
1.515 + return transfer->recordernum;
1.516 +}
1.517 +
1.518 + glong
1.519 +myth_file_transfer_get_filesize( MythFileTransfer *transfer )
1.520 +{
1.521 + return transfer->filesize;
1.522 +}
1.523 +
1.524 + gboolean
1.525 +myth_file_transfer_isopen( MythFileTransfer *transfer )
1.526 +{
1.527 + return (transfer->sock != NULL && transfer->control_sock != NULL);
1.528 +}
1.529 +
1.530 + void
1.531 +myth_file_transfer_close( MythFileTransfer *transfer )
1.532 +{
1.533 + GMythStringList *strlist;
1.534 +
1.535 + if (transfer->control_sock == NULL)
1.536 + return;
1.537 +
1.538 + strlist = gmyth_string_list_new( );
1.539 +
1.540 + g_string_printf( transfer->query, "%s %d", MYTHTV_QUERY_HEADER,
1.541 + transfer->recordernum );
1.542 + gmyth_string_list_append_string( strlist, transfer->query );
1.543 + gmyth_string_list_append_char_array( strlist, "DONE" );
1.544 +
1.545 +
1.546 + if ( gmyth_socket_sendreceive_stringlist(transfer->control_sock, strlist) <= 0 )
1.547 + {
1.548 + g_printerr( "Remote file timeout.\n" );
1.549 + }
1.550 +
1.551 + if (transfer->sock)
1.552 + {
1.553 + g_object_unref( transfer->sock );
1.554 + transfer->sock = NULL;
1.555 + }
1.556 +
1.557 + if (transfer->control_sock)
1.558 + {
1.559 + g_object_unref( transfer->control_sock );
1.560 + transfer->control_sock = NULL;
1.561 + }
1.562 +
1.563 +}
1.564 +
1.565 + void
1.566 +myth_file_transfer_reset_controlsock( MythFileTransfer *transfer )
1.567 +{
1.568 + if (transfer->control_sock == NULL)
1.569 + {
1.570 + g_printerr( "myth_file_transfer_reset_controlsock(): Called with no control socket" );
1.571 + return;
1.572 + }
1.573 +
1.574 + GString *str = gmyth_socket_receive_response( transfer->control_sock );
1.575 +
1.576 + g_string_free( str, TRUE );
1.577 +}
1.578 +
1.579 +void
1.580 +myth_file_transfer_reset_sock( MythFileTransfer *transfer )
1.581 +{
1.582 + if ( transfer->sock == NULL )
1.583 + {
1.584 + g_printerr( "myth_file_transfer_reset_sock(): Called with no raw socket" );
1.585 + return;
1.586 + }
1.587 +
1.588 + GString *str = gmyth_socket_receive_response( transfer->sock );
1.589 +
1.590 + g_string_free( str, TRUE );
1.591 +}
1.592 +
1.593 +void
1.594 +myth_file_transfer_reset( MythFileTransfer *transfer )
1.595 +{
1.596 + myth_file_transfer_reset_controlsock( transfer );
1.597 + myth_file_transfer_reset_sock( transfer );
1.598 +}
1.599 +
1.600 +guint64
1.601 +myth_file_transfer_seek(MythFileTransfer *transfer, guint64 pos, gint whence)
1.602 +{
1.603 + if (transfer->sock == NULL)
1.604 + {
1.605 + g_printerr( "[%s] myth_file_transfer_seek(): Called with no socket", __FUNCTION__ );
1.606 + return 0;
1.607 + }
1.608 +
1.609 + if (transfer->control_sock == NULL)
1.610 + return 0;
1.611 +
1.612 + // if (!controlSock->isOpen() || controlSock->error())
1.613 + // return 0;
1.614 +
1.615 + GMythStringList *strlist = gmyth_string_list_new();
1.616 + g_string_printf (transfer->query, "%s %d", MYTHTV_QUERY_HEADER, transfer->recordernum);
1.617 + gmyth_string_list_append_string( strlist, transfer->query );
1.618 + gmyth_string_list_append_char_array( strlist, "SEEK" );
1.619 + gmyth_string_list_append_uint64( strlist, pos );
1.620 + // gmyth_string_list_append_int( strlist, whence );
1.621 +
1.622 + if (pos > 0 )
1.623 + gmyth_string_list_append_uint64( strlist, pos );
1.624 + else
1.625 + gmyth_string_list_append_uint64( strlist, transfer->readposition );
1.626 +
1.627 + gmyth_socket_sendreceive_stringlist( transfer->control_sock, strlist );
1.628 +
1.629 + guint64 retval = gmyth_string_list_get_uint64(strlist, 0);
1.630 + transfer->readposition = retval;
1.631 + g_print( "[%s] got reading position pointer from the streaming = %llu\n",
1.632 + __FUNCTION__, retval );
1.633 +
1.634 + //myth_file_transfer_reset( transfer );
1.635 +
1.636 + return retval;
1.637 +}
1.638 +
1.639 +static gboolean
1.640 +myth_control_sock_listener( GIOChannel *source, GIOCondition condition, gpointer data )
1.641 +{
1.642 +
1.643 + GIOStatus ret;
1.644 + GError *err = NULL;
1.645 + gchar *msg = g_strdup("");
1.646 +
1.647 + gsize len;
1.648 + if (condition & G_IO_HUP)
1.649 + g_error ("Read end of pipe died!\n");
1.650 + ret = g_io_channel_read_line ( source, &msg, &len, NULL, &err);
1.651 + if ( ret == G_IO_STATUS_ERROR )
1.652 + g_error ("[%s] Error reading: %s\n", __FUNCTION__, err != NULL ? err->message : "" );
1.653 + g_print ("\n\n\n\n\n\n[%s]\t\tEVENT: Read %u bytes: %s\n\n\n\n\n", __FUNCTION__, len, msg != NULL ? msg : "" );
1.654 + if ( msg != NULL )
1.655 + g_free (msg);
1.656 +
1.657 + return TRUE;
1.658 +
1.659 +}
1.660 +
1.661 +static void*
1.662 +myth_init_io_watchers( void *data )
1.663 +{
1.664 + MythFileTransfer *transfer = (MythFileTransfer*)data;
1.665 + GMainContext *context = g_main_context_new();
1.666 + GMainLoop *loop = g_main_loop_new( NULL, FALSE );
1.667 +
1.668 + GSource *source = NULL;
1.669 +
1.670 + if ( transfer->event_sock->sd_io_ch != NULL )
1.671 + source = g_io_create_watch( transfer->event_sock->sd_io_ch, G_IO_IN | G_IO_HUP );
1.672 +
1.673 + g_source_set_callback ( source, (GSourceFunc)myth_control_sock_listener, NULL, NULL );
1.674 +
1.675 + g_source_attach( source, context );
1.676 +
1.677 + if (source==NULL)
1.678 + g_printerr( "[%s] Error adding watch listener function to the IO control channel!\n", __FUNCTION__ );
1.679 +
1.680 + g_main_loop_run( loop );
1.681 +
1.682 + g_source_unref( source );
1.683 +
1.684 + g_main_loop_unref( loop );
1.685 +
1.686 + g_main_context_unref( context );
1.687 +
1.688 + return NULL;
1.689 +}
1.690 +
1.691 + gint
1.692 +myth_file_transfer_read(MythFileTransfer *transfer, void *data, gint size, gboolean read_unlimited)
1.693 +{
1.694 + gint recv = 0;
1.695 + gsize bytes_read = 0;
1.696 +
1.697 + gint sent = 0;
1.698 + //guint zerocnt = 0;
1.699 + gboolean response = FALSE;
1.700 +
1.701 + GIOChannel *io_channel;
1.702 + GIOChannel *io_channel_control;
1.703 +
1.704 + GIOCondition io_cond;
1.705 + GIOCondition io_cond_control;
1.706 + GIOStatus io_status = G_IO_STATUS_NORMAL, io_status_control = G_IO_STATUS_NORMAL;
1.707 +
1.708 + gint buf_len = MYTHTV_BUFFER_SIZE;
1.709 +
1.710 + GMythStringList *strlist = NULL;
1.711 + GError *error = NULL;
1.712 +
1.713 + gchar *trash = g_strdup("");
1.714 +
1.715 + g_return_val_if_fail ( data != NULL, -2 );
1.716 +
1.717 + /* gets the size of the entire file, if the size requested is lesser than 0 */
1.718 + if ( size <= 0 )
1.719 + size = transfer->filesize;
1.720 +
1.721 + io_channel = transfer->sock->sd_io_ch;
1.722 + io_channel_control = transfer->control_sock->sd_io_ch;
1.723 +
1.724 + //g_io_channel_set_flags( io_channel, G_IO_FLAG_APPEND |
1.725 + // G_IO_STATUS_AGAIN | G_IO_FLAG_IS_READABLE | G_IO_FLAG_IS_WRITEABLE |
1.726 + // G_IO_FLAG_IS_SEEKABLE, NULL );
1.727 +
1.728 + io_status = g_io_channel_set_encoding( io_channel, NULL, &error );
1.729 + if ( io_status == G_IO_STATUS_NORMAL )
1.730 + g_print( "[%s] Setting encoding to binary data socket).\n", __FUNCTION__ );
1.731 +
1.732 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.733 +
1.734 + io_cond_control = g_io_channel_get_buffer_condition( io_channel );
1.735 +
1.736 + if ( transfer->sock == NULL || ( io_status == G_IO_STATUS_ERROR ) )
1.737 + {
1.738 + g_printerr( "myth_file_transfer_read(): Called with no raw socket.\n" );
1.739 + recv = -1;
1.740 + goto cleanup;
1.741 + }
1.742 +
1.743 + if ( transfer->control_sock == NULL || ( io_status_control == G_IO_STATUS_ERROR ) )
1.744 + {
1.745 + g_printerr( "myth_file_transfer_read(): Called with no control socket.\n" );
1.746 + recv = -1;
1.747 + goto cleanup;
1.748 + }
1.749 +
1.750 + /*
1.751 + if (!controlSock->isOpen() || controlSock->error())
1.752 + return -1;
1.753 + */
1.754 +
1.755 + if ( ( io_cond & G_IO_IN ) != 0 ) {
1.756 + do
1.757 + {
1.758 +
1.759 + io_status = g_io_channel_read_line( io_channel, &trash, &bytes_read, NULL, &error);
1.760 +
1.761 + g_print( "[%s] cleaning buffer on IO binary channel... (%s)\n", __FUNCTION__, trash );
1.762 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.763 +
1.764 + } while ( ( io_cond & G_IO_IN ) != 0 && ( io_status != G_IO_STATUS_ERROR ) );
1.765 +
1.766 + if ( trash!= NULL )
1.767 + g_free( trash );
1.768 + }
1.769 +
1.770 + if ( ( io_cond_control & G_IO_IN ) != 0 ) {
1.771 + GMythStringList *strlist_tmp = gmyth_string_list_new();
1.772 + gmyth_socket_read_stringlist( transfer->control_sock, strlist_tmp );
1.773 + g_object_unref( strlist_tmp );
1.774 + }
1.775 +
1.776 + wait_to_transfer = 0;
1.777 +
1.778 + while ( transfer->live_tv && ( myth_file_transfer_get_file_position( transfer ) < 4096 ) &&
1.779 + wait_to_transfer++ < MYTHTV_TRANSFER_MAX_WAITS )
1.780 + g_usleep( 1000*50 ); /* waits just for 2/10 second */
1.781 +
1.782 + //g_thread_create( myth_init_io_watchers, (void*)transfer, FALSE, NULL );
1.783 + //g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() );
1.784 +
1.785 + g_static_mutex_lock (&mutex);
1.786 + strlist = gmyth_string_list_new();
1.787 +
1.788 + g_string_printf ( transfer->query, "%s %d", /*transfer->live_tv ? MYTHTV_RECORDER_HEADER :*/ MYTHTV_QUERY_HEADER,
1.789 + /* transfer->live_tv ? transfer->card_id :*/ transfer->recordernum ); // transfer->recordernum
1.790 + g_print( "\t[%s] Transfer_query = %s\n", __FUNCTION__, transfer->query->str );
1.791 + strlist = gmyth_string_list_new();
1.792 +
1.793 + gmyth_string_list_append_string( strlist, transfer->query );
1.794 + gmyth_string_list_append_char_array( strlist, /*transfer->live_tv ? "REQUEST_BLOCK_RINGBUF" :*/ "REQUEST_BLOCK" );
1.795 + gmyth_string_list_append_int( strlist, size );
1.796 +
1.797 + gmyth_socket_write_stringlist( transfer->control_sock, strlist );
1.798 + sent = size;
1.799 +
1.800 + //data = (void*)g_new0( gchar, size );
1.801 +
1.802 + g_io_channel_flush( io_channel_control, NULL );
1.803 +// g_io_channel_flush( io_channel, NULL );
1.804 +
1.805 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.806 +
1.807 + while ( ( recv < sent ) )//&& ( io_cond & G_IO_IN ) != 0 )
1.808 + {
1.809 + do
1.810 + {
1.811 + //while ( ( io_cond & G_IO_IN ) == 0 ) {
1.812 + //usleep(200);
1.813 + //
1.814 + //io_cond = g_io_channel_get_buffer_condition( io_channel );
1.815 + //
1.816 +
1.817 + buf_len = ( sent - recv ) > MYTHTV_BUFFER_SIZE ? MYTHTV_BUFFER_SIZE : ( sent - recv );
1.818 +
1.819 + io_status = g_io_channel_read_chars( io_channel, data + recv,
1.820 + buf_len, &bytes_read, &error );
1.821 + /*
1.822 + GString *sss = g_string_new("");
1.823 + sss = g_string_append_len( sss, (gchar*)data+recv, bytes_read );
1.824 +
1.825 + g_print( "[%s] Reading buffer (length = %d)\n", __FUNCTION__, bytes_read);
1.826 + */
1.827 + if ( bytes_read > 0 )
1.828 + {
1.829 + if ( bytes_read <= buf_len )
1.830 + recv += bytes_read;
1.831 + }
1.832 +
1.833 + if ( io_status == G_IO_STATUS_EOF ) {
1.834 + g_printerr( "[%s] got EOS!", __FUNCTION__ );
1.835 + break;
1.836 + } else if ( io_status == G_IO_STATUS_ERROR ) {
1.837 + g_printerr( "[%s] myth_file_transfer_read(): socket error.\n", __FUNCTION__ );
1.838 + break;
1.839 + }
1.840 +
1.841 + /* increase buffer size, to allow get more data (do not obey to the buffer size) */
1.842 + if ( read_unlimited == TRUE ) {
1.843 + //if ( recv > buf_len )
1.844 + // sent += (bytes_read - buf_len) + 1;
1.845 + }
1.846 + if ( bytes_read == buf_len )
1.847 + break;
1.848 +
1.849 + /* verify if the input (read) buffer is ready to receive data */
1.850 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.851 +
1.852 + g_print( "[%s]\t io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
1.853 + ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
1.854 +
1.855 + } while ( recv < sent && ( ( io_cond & G_IO_IN ) != 0 ) && ( io_status == G_IO_STATUS_NORMAL ) );
1.856 +
1.857 + io_cond_control = g_io_channel_get_buffer_condition( io_channel_control );
1.858 + if ( ( io_status == G_IO_STATUS_EOF ) || ( ( io_cond_control & G_IO_IN ) != 0 ) )
1.859 + {
1.860 + gmyth_socket_read_stringlist( transfer->control_sock, strlist );
1.861 + sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error
1.862 + g_print( "[%s]\t sent = %d, io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
1.863 + sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
1.864 + response = TRUE;
1.865 + }
1.866 + }
1.867 +
1.868 + if ( ( ( error == NULL ) && ( response == FALSE ) ) ||
1.869 + ( io_status == G_IO_STATUS_EOF ) || ( ( io_cond & G_IO_IN ) == 0 ) )
1.870 + {
1.871 + if ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 )
1.872 + {
1.873 + if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 ) {
1.874 + sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error
1.875 + g_print( "[%s]\t sent = %d, io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
1.876 + sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
1.877 + }
1.878 + }
1.879 + else
1.880 + {
1.881 + g_printerr ( "myth_file_transfer_read(): No response from control socket.");
1.882 + sent = -1;
1.883 + }
1.884 +
1.885 + if ( error!=NULL ) {
1.886 + g_printerr( "[%s] Error occurred: (%d, %s)\n", __FUNCTION__, error->code, error->message );
1.887 + g_error_free( error );
1.888 + }
1.889 + }
1.890 +
1.891 +cleanup:
1.892 +
1.893 + if ( trash != NULL )
1.894 + g_free( trash );
1.895 +
1.896 + if ( strlist != NULL )
1.897 + g_object_unref( strlist );
1.898 +
1.899 + g_static_mutex_unlock (&mutex);
1.900 + g_print( "myth_file_transfer_read(): reqd=%d, rcvd=%d, rept=%d, "\
1.901 + "(rcvd and rept MUST be the same!)\n", size,
1.902 + recv, sent );
1.903 +
1.904 + //if ( sent != recv ) {
1.905 + // recv = -1;
1.906 + //}
1.907 +
1.908 + if ( error != NULL )
1.909 + g_printerr( "ERROR: %s [msg = %s, code = %d]\n", __FUNCTION__, error->message,
1.910 + error->code );
1.911 +
1.912 + return recv;
1.913 +}
1.914 +
1.915 + void
1.916 +myth_file_transfer_settimeout( MythFileTransfer *transfer, gboolean fast )
1.917 +{
1.918 +
1.919 + GMythStringList *strlist = NULL;
1.920 +
1.921 + if ( transfer->timeoutisfast == fast )
1.922 + return;
1.923 +
1.924 + if ( transfer->sock == NULL )
1.925 + {
1.926 + g_printerr( "myth_file_transfer_settimeout(): Called with no socket" );
1.927 + return;
1.928 + }
1.929 +
1.930 + if ( transfer->control_sock == NULL )
1.931 + return;
1.932 +
1.933 + strlist = gmyth_string_list_new();
1.934 + gmyth_string_list_append_string( strlist, transfer->query );
1.935 + gmyth_string_list_append_char_array( strlist, "SET_TIMEOUT" );
1.936 + gmyth_string_list_append_int( strlist, fast );
1.937 +
1.938 + gmyth_socket_write_stringlist( transfer->control_sock, strlist );
1.939 + gmyth_socket_read_stringlist( transfer->control_sock, strlist );
1.940 +
1.941 + transfer->timeoutisfast = fast;
1.942 +
1.943 +}
1.944 +
1.945 +#ifdef DO_TESTING
1.946 +
1.947 + int
1.948 +main( int argc, char *argv[] )
1.949 +{
1.950 + g_type_init();
1.951 +
1.952 + MythFileTransfer *file_transfer = myth_file_transfer_new( 1,
1.953 + g_string_new("myth://192.168.1.109:6543/jshks.nuv"), -1, MYTHTV_VERSION );
1.954 + myth_file_transfer_setup( &file_transfer );
1.955 + gchar *data = g_strdup("");
1.956 +
1.957 + gint num = myth_file_transfer_read( file_transfer, data, -1 );
1.958 +
1.959 + return 0;
1.960 +
1.961 +}
1.962 +
1.963 +#endif