1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/gst-plugins-mythtv/src/gmyth_file_transfer.c Mon Oct 23 15:42:46 2006 +0100
1.3 @@ -0,0 +1,1050 @@
1.4 +/* vim: set sw=2: -*- Mode: C; tab-width: 2; indent-tabs-mode: t; c-basic-offset: 2; c-indent-level: 2-*- */
1.5 +/**
1.6 + * GStreamer plug-in properties:
1.7 + * - location (backend server hostname/URL) [ex.: myth://192.168.1.73:28722/1000_1092091.nuv]
1.8 + * - path (qurl - remote file to be opened)
1.9 + * - port number
1.10 + * @author Rosfran Lins Borges <rosfran.borges@indt.org.br>
1.11 + */
1.12 +
1.13 +#include "gmyth_file_transfer.h"
1.14 +#include "gmyth_uri.h"
1.15 +#include "gmyth_livetv.h"
1.16 +#include <gmyth/gmyth_util.h>
1.17 +#include <gmyth/gmyth_socket.h>
1.18 +#include <gmyth/gmyth_stringlist.h>
1.19 +
1.20 +#include <unistd.h>
1.21 +#include <glib.h>
1.22 +
1.23 +#include <arpa/inet.h>
1.24 +#include <sys/types.h>
1.25 +#include <sys/socket.h>
1.26 +#include <netdb.h>
1.27 +#include <errno.h>
1.28 +#include <stdlib.h>
1.29 +
1.30 +#define GMYTHTV_QUERY_HEADER "QUERY_FILETRANSFER"
1.31 +#define GMYTHTV_RECORDER_HEADER "QUERY_RECORDER"
1.32 +
1.33 +/* default values to the file transfer parameters */
1.34 +#define GMYTHTV_USER_READ_AHEAD FALSE
1.35 +#define GMYTHTV_RETRIES 1
1.36 +#define GMYTHTV_FILE_SIZE -1
1.37 +
1.38 +#define GMYTHTV_BUFFER_SIZE 8*1024
1.39 +
1.40 +#define GMYTHTV_VERSION 30
1.41 +
1.42 +#define GMYTHTV_TRANSFER_MAX_WAITS 700
1.43 +
1.44 +#ifdef GMYTHTV_ENABLE_DEBUG
1.45 +#define GMYTHTV_ENABLE_DEBUG 1
1.46 +#else
1.47 +#undef GMYTHTV_ENABLE_DEBUG
1.48 +#endif
1.49 +
1.50 +/* this NDEBUG is to maintain compatibility with GMyth library */
1.51 +#ifndef NDEBUG
1.52 +#define GMYTHTV_ENABLE_DEBUG 1
1.53 +#endif
1.54 +
1.55 +static guint wait_to_transfer = 0;
1.56 +
1.57 +enum myth_sock_types {
1.58 + GMYTH_PLAYBACK_TYPE = 0,
1.59 + GMYTH_MONITOR_TYPE,
1.60 + GMYTH_FILETRANSFER_TYPE,
1.61 + GMYTH_RINGBUFFER_TYPE
1.62 +};
1.63 +
1.64 +static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
1.65 +
1.66 +static GMainContext *io_watcher_context = NULL;
1.67 +
1.68 +static void gmyth_file_transfer_class_init (GMythFileTransferClass *klass);
1.69 +static void gmyth_file_transfer_init (GMythFileTransfer *object);
1.70 +
1.71 +static void gmyth_file_transfer_dispose (GObject *object);
1.72 +static void gmyth_file_transfer_finalize (GObject *object);
1.73 +
1.74 +static GMythSocket *myth_connect_to_transfer_backend( GMythFileTransfer **transfer, guint sock_type );
1.75 +static void* myth_init_io_watchers( void *data );
1.76 +
1.77 +void gmyth_file_transfer_close( GMythFileTransfer *transfer );
1.78 +
1.79 +G_DEFINE_TYPE(GMythFileTransfer, gmyth_file_transfer, G_TYPE_OBJECT)
1.80 +
1.81 +#if 0
1.82 +static guint64
1.83 +mmyth_util_decode_long_long( GMythStringList *strlist, guint offset )
1.84 +{
1.85 +
1.86 + guint64 ret_value = 0LL;
1.87 +
1.88 + g_return_val_if_fail( strlist != NULL, ret_value );
1.89 +
1.90 + if ( offset < gmyth_string_list_length( strlist ))
1.91 + g_printerr( "[%s] Offset is lower than the GMythStringList (offset = %d)!\n", __FUNCTION__, offset );
1.92 + g_return_val_if_fail( offset < gmyth_string_list_length( strlist ), ret_value );
1.93 +
1.94 + gint l1 = gmyth_string_list_get_int( strlist, offset );
1.95 + gint l2 = gmyth_string_list_get_int( strlist, offset + 1 );
1.96 +
1.97 + ret_value = ((guint64)(l2) & 0xffffffffLL) | ((guint64)(l1) << 32);
1.98 +
1.99 + return ret_value;
1.100 +
1.101 +}
1.102 +#endif
1.103 +
1.104 +static void
1.105 +gmyth_file_transfer_class_init (GMythFileTransferClass *klass)
1.106 +{
1.107 + GObjectClass *gobject_class;
1.108 +
1.109 + gobject_class = (GObjectClass *) klass;
1.110 +
1.111 + gobject_class->dispose = gmyth_file_transfer_dispose;
1.112 + gobject_class->finalize = gmyth_file_transfer_finalize;
1.113 +}
1.114 +
1.115 + static void
1.116 +gmyth_file_transfer_init (GMythFileTransfer *gmyth_file_transfer)
1.117 +{
1.118 + g_return_if_fail( gmyth_file_transfer != NULL );
1.119 + gmyth_file_transfer->mythtv_version = GMYTHTV_VERSION;
1.120 +}
1.121 +
1.122 +static void
1.123 +gmyth_file_transfer_dispose (GObject *object)
1.124 +{
1.125 + GMythFileTransfer *gmyth_file_transfer = GMYTH_FILE_TRANSFER(object);
1.126 +
1.127 + gmyth_file_transfer_close( gmyth_file_transfer );
1.128 +
1.129 + G_OBJECT_CLASS (gmyth_file_transfer_parent_class)->dispose (object);
1.130 +}
1.131 +
1.132 + static void
1.133 +gmyth_file_transfer_finalize (GObject *object)
1.134 +{
1.135 + g_signal_handlers_destroy (object);
1.136 +
1.137 + G_OBJECT_CLASS (gmyth_file_transfer_parent_class)->finalize (object);
1.138 +}
1.139 +
1.140 + GMythFileTransfer*
1.141 +gmyth_file_transfer_new (gint num, GString *uri_str, gshort port, gint mythtv_version)
1.142 +{
1.143 + GMythFileTransfer *transfer = GMYTH_FILE_TRANSFER ( g_object_new (
1.144 + GMYTH_FILE_TRANSFER_TYPE, FALSE ));
1.145 +
1.146 + if ( mythtv_version > 0 )
1.147 + transfer->mythtv_version = mythtv_version;
1.148 +
1.149 + transfer->card_id = num;
1.150 +
1.151 + transfer->rec_id = -1;
1.152 +
1.153 + transfer->recordernum = num;
1.154 + transfer->uri = gmyth_uri_new ( uri_str->str );
1.155 +
1.156 + transfer->hostname = g_string_new( gmyth_uri_gethost(transfer->uri) );
1.157 + g_print( "\t--> transfer->hostname = %s\n", transfer->hostname->str );
1.158 +
1.159 + if ( port >= 0 )
1.160 + transfer->port = port;
1.161 + else
1.162 + transfer->port = gmyth_uri_getport( transfer->uri );
1.163 +
1.164 + g_print( "\t--> transfer->port = %d\n", transfer->port );
1.165 +
1.166 + transfer->readposition = 0;
1.167 + transfer->filesize = GMYTHTV_FILE_SIZE;
1.168 + transfer->timeoutisfast = FALSE;
1.169 +
1.170 + transfer->userreadahead = GMYTHTV_USER_READ_AHEAD;
1.171 + transfer->retries = GMYTHTV_RETRIES;
1.172 +
1.173 + transfer->live_tv = FALSE;
1.174 +
1.175 + transfer->query = g_string_new( GMYTHTV_QUERY_HEADER );
1.176 + g_string_append_printf ( transfer->query, " %d", transfer->recordernum );
1.177 + g_print( "\t--> transfer->query = %s\n", transfer->query->str );
1.178 +
1.179 + transfer->control_sock = NULL;
1.180 + transfer->event_sock = NULL;
1.181 + transfer->sock = NULL;
1.182 +
1.183 + return transfer;
1.184 +}
1.185 +
1.186 +gboolean
1.187 +gmyth_file_transfer_livetv_setup( GMythFileTransfer **transfer, GMythSocket *live_socket )
1.188 +{
1.189 + (*transfer)->sock = live_socket;
1.190 + g_object_ref( live_socket );
1.191 +
1.192 + return TRUE;
1.193 +}
1.194 +
1.195 +gboolean
1.196 +gmyth_file_transfer_playback_setup( GMythFileTransfer **transfer, gboolean live_tv )
1.197 +{
1.198 +
1.199 + gboolean ret = TRUE;
1.200 +
1.201 + (*transfer)->live_tv = live_tv;
1.202 +
1.203 + printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" );
1.204 +
1.205 + /* configure the control socket */
1.206 + if ((*transfer)->control_sock == NULL) {
1.207 +
1.208 + if ( myth_connect_to_transfer_backend ( transfer, GMYTH_PLAYBACK_TYPE ) == NULL ) {
1.209 + g_printerr( "Connection to backend failed (Control Socket).\n" );
1.210 + ret = FALSE;
1.211 + }
1.212 +
1.213 + } else {
1.214 + g_warning("Remote transfer control socket already created.\n");
1.215 + }
1.216 +
1.217 + return ret;
1.218 +
1.219 +}
1.220 +
1.221 +gboolean
1.222 +gmyth_file_transfer_setup( GMythFileTransfer **transfer, gboolean live_tv )
1.223 +{
1.224 + GMythStringList *strlist = NULL;
1.225 +
1.226 + gboolean ret = TRUE;
1.227 +
1.228 + (*transfer)->live_tv = live_tv;
1.229 +
1.230 + printf("[%s] Running config to the %s...\n", __FUNCTION__, live_tv ? "LiveTV" : "FileTransfer" );
1.231 +
1.232 +#if 0
1.233 + /* configure the control socket */
1.234 + if ((*transfer)->event_sock == NULL) {
1.235 +
1.236 + if ( myth_connect_to_transfer_backend ( transfer, GMYTH_MONITOR_TYPE ) == NULL ) {
1.237 + g_printerr( "Connection to backend failed (Event Socket).\n" );
1.238 + ret = FALSE;
1.239 + }
1.240 +
1.241 + } else {
1.242 + g_warning("Remote transfer control socket already created.\n");
1.243 + }
1.244 +#endif
1.245 +
1.246 + /* configure the socket */
1.247 + if ( (*transfer)->sock == NULL ) {
1.248 +
1.249 + //if ( live_tv == FALSE ) {
1.250 +
1.251 + if ( myth_connect_to_transfer_backend ( transfer, GMYTH_FILETRANSFER_TYPE ) == NULL ) {
1.252 + g_printerr ("Connection to backend failed (Raw Transfer Socket).\n");
1.253 + ret = FALSE;
1.254 + }
1.255 +
1.256 + if ( !(*transfer)->live_tv && (*transfer)->control_sock != NULL) {
1.257 + strlist = gmyth_string_list_new();
1.258 + g_string_printf ( (*transfer)->query, "%s %d", GMYTHTV_QUERY_HEADER, (*transfer)->recordernum );
1.259 +
1.260 + gmyth_string_list_append_string( strlist, (*transfer)->query );
1.261 + gmyth_string_list_append_char_array( strlist, "IS_OPEN" );
1.262 +
1.263 + gmyth_socket_write_stringlist( (*transfer)->control_sock, strlist );
1.264 + gmyth_socket_read_stringlist( (*transfer)->control_sock, strlist );
1.265 +
1.266 + if ( strlist!=NULL && gmyth_string_list_get_int( strlist, 0 ) == 1 ) {
1.267 + g_print( "[%s] Remote Myth FileTransfer socket is open!\n", __FUNCTION__ );
1.268 + } else {
1.269 + g_print( "[%s] Remote Myth FileTransfer socket is CLOSED! See the MythTV Server Backend for configuration details...\n", __FUNCTION__ );
1.270 + ret = FALSE;
1.271 + }
1.272 + }
1.273 +
1.274 + } else {
1.275 + g_warning("Remote transfer (raw) socket already created.\n");
1.276 + }
1.277 +
1.278 + return ret;
1.279 +}
1.280 +
1.281 +static GMythSocket *
1.282 +myth_connect_to_transfer_backend( GMythFileTransfer **transfer, guint sock_type )
1.283 +{
1.284 + GMythSocket *sock = NULL;
1.285 +
1.286 + g_return_val_if_fail( transfer != NULL && *transfer != NULL, NULL );
1.287 + g_return_val_if_fail( (*transfer)->uri != NULL, NULL );
1.288 +
1.289 + g_static_mutex_lock (&mutex);
1.290 +
1.291 + gchar *path_dir = gmyth_uri_getpath( (*transfer)->uri );
1.292 + //g_print( "\t--> %s: path_dir = %s\n", __FUNCTION__, path_dir );
1.293 +
1.294 + gchar *stype = g_strdup( "" );
1.295 +
1.296 + // if ( (*transfer)->live_tv == FALSE ) {
1.297 +
1.298 + sock = gmyth_socket_new();
1.299 +
1.300 + gmyth_socket_connect( &sock, (*transfer)->hostname->str, (*transfer)->port );
1.301 +
1.302 + /*
1.303 + } else {
1.304 + sock = (*transfer)->sock;
1.305 + }
1.306 + */
1.307 +#ifdef GMYTHTV_ENABLE_DEBUG
1.308 +
1.309 + g_print( "[%s] --> Creating socket... (%s, %d)\n", __FUNCTION__, (*transfer)->hostname->str, (*transfer)->port );
1.310 +#endif
1.311 +
1.312 + GMythStringList *strlist = NULL;
1.313 +
1.314 + GString *hostname = g_string_new( gmyth_uri_gethost( (*transfer)->uri ) );
1.315 + GString *base_str = g_string_new( "" );
1.316 +
1.317 + if ( gmyth_socket_check_protocol_version_number (sock, (*transfer)->mythtv_version) ) {
1.318 +
1.319 + if (sock == NULL) {
1.320 + stype = (sock_type==GMYTH_PLAYBACK_TYPE) ? "control socket" : "file data socket";
1.321 + g_printerr( "FileTransfer, open_socket(%s): \n"
1.322 + "\t\t\tCould not connect to server \"%s\" @ port %d\n", stype,
1.323 + (*transfer)->hostname->str, (*transfer)->port );
1.324 + g_object_unref(sock);
1.325 + g_static_mutex_unlock (&mutex);
1.326 + return NULL;
1.327 + }
1.328 +
1.329 + hostname = gmyth_socket_get_local_hostname();
1.330 +
1.331 + g_print( "[%s] local hostname = %s\n", __FUNCTION__, hostname->str );
1.332 +
1.333 + if ( sock_type == GMYTH_PLAYBACK_TYPE )
1.334 + {
1.335 + (*transfer)->control_sock = sock;
1.336 + g_string_printf( base_str, "ANN Playback %s %d", hostname->str, TRUE );
1.337 +
1.338 + gmyth_socket_send_command( (*transfer)->control_sock, base_str );
1.339 + GString *resp = gmyth_socket_receive_response( (*transfer)->control_sock );
1.340 + g_print( "[%s] Got Playback response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
1.341 + }
1.342 + else if ( sock_type == GMYTH_MONITOR_TYPE )
1.343 + {
1.344 + (*transfer)->event_sock = sock;
1.345 + g_string_printf( base_str, "ANN Monitor %s %d", hostname->str, TRUE );
1.346 +
1.347 + gmyth_socket_send_command( (*transfer)->event_sock, base_str );
1.348 + GString *resp = gmyth_socket_receive_response( (*transfer)->event_sock );
1.349 + g_print( "[%s] Got Monitor response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
1.350 + //g_thread_create( myth_init_io_watchers, (void*)(*transfer), FALSE, NULL );
1.351 + myth_init_io_watchers ( (void*)(*transfer) );
1.352 +
1.353 + g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() );
1.354 +
1.355 + }
1.356 + else if ( sock_type == GMYTH_FILETRANSFER_TYPE )
1.357 + {
1.358 + (*transfer)->sock = sock;
1.359 + strlist = gmyth_string_list_new();
1.360 + //g_string_printf( base_str, "ANN FileTransfer %s %d %d", hostname->str,
1.361 + // transfer->userreadahead, transfer->retries );
1.362 + g_string_printf( base_str, "ANN FileTransfer %s", hostname->str );
1.363 +
1.364 + gmyth_string_list_append_string( strlist, base_str );
1.365 + gmyth_string_list_append_char_array( strlist, path_dir );
1.366 +
1.367 + gmyth_socket_write_stringlist( (*transfer)->sock, strlist );
1.368 + gmyth_socket_read_stringlist( (*transfer)->sock, strlist );
1.369 +
1.370 + /* socket number, where all the stream data comes from - got from the MythTV remote backend */
1.371 + (*transfer)->recordernum = gmyth_string_list_get_int( strlist, 1 );
1.372 +
1.373 + /* Myth URI stream file size - decoded using two 8-bytes sequences (64 bits/long long types) */
1.374 + (*transfer)->filesize = gmyth_util_decode_long_long( strlist, 2 );
1.375 +
1.376 + printf( "[%s] ***** Received: recordernum = %d, filesize = %" G_GUINT64_FORMAT "\n", __FUNCTION__,
1.377 + (*transfer)->recordernum, (*transfer)->filesize );
1.378 +
1.379 + if ( (*transfer)->filesize <= 0 ) {
1.380 + g_print( "[%s] Got filesize equals to %llu is lesser than 0 [invalid stream file]\n", __FUNCTION__, (*transfer)->filesize );
1.381 + g_object_unref(sock);
1.382 + sock = NULL;
1.383 + }
1.384 + }
1.385 + else if ( sock_type == GMYTH_RINGBUFFER_TYPE )
1.386 + {
1.387 + (*transfer)->sock = sock;
1.388 + //gmyth_file_transfer_spawntv( (*transfer), NULL );
1.389 +
1.390 + strlist = gmyth_string_list_new();
1.391 + g_string_printf( base_str, "ANN RingBuffer %s %d", hostname->str, (*transfer)->card_id );
1.392 +
1.393 + gmyth_socket_send_command( (*transfer)->sock, base_str );
1.394 + GString *resp = gmyth_socket_receive_response( (*transfer)->sock );
1.395 + g_print( "[%s] Got RingBuffer response from %s: %s\n", __FUNCTION__, base_str->str, resp->str );
1.396 +
1.397 + }
1.398 +
1.399 + }
1.400 +
1.401 + printf("[%s] ANN %s sent: %s\n", (sock_type==GMYTH_PLAYBACK_TYPE) ? "Playback" : (sock_type==GMYTH_FILETRANSFER_TYPE) ? "FileTransfer" : "Monitor", __FUNCTION__, base_str->str);
1.402 +
1.403 + if ( strlist != NULL )
1.404 + g_object_unref( strlist );
1.405 +
1.406 + g_static_mutex_unlock (&mutex);
1.407 +
1.408 + return sock;
1.409 +}
1.410 +
1.411 +void
1.412 +gmyth_file_transfer_spawntv ( GMythFileTransfer *file_transfer,
1.413 + GString *tvchain_id )
1.414 +{
1.415 + GMythStringList *str_list;
1.416 +
1.417 + g_debug ("gmyth_file_transfer_spawntv.\n");
1.418 +
1.419 + str_list = gmyth_string_list_new ();
1.420 +
1.421 + g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER,
1.422 + file_transfer->card_id );
1.423 + gmyth_string_list_append_string (str_list, file_transfer->query);
1.424 + gmyth_string_list_append_string (str_list, g_string_new ("SPAWN_LIVETV"));
1.425 + if (tvchain_id!=NULL) {
1.426 + gmyth_string_list_append_string (str_list, tvchain_id);
1.427 + gmyth_string_list_append_int (str_list, FALSE); // PIP = FALSE (0)
1.428 + }
1.429 +
1.430 + gmyth_socket_sendreceive_stringlist ( file_transfer->sock, str_list );
1.431 +
1.432 + //GString *str = NULL;
1.433 +
1.434 + //if (str_list!=NULL && (str = gmyth_string_list_get_string( str_list, 0 )) != NULL && strcasecmp( str->str, "ok" ) != 0 ) {
1.435 + // g_print( "[%s]\t\tSpawnLiveTV is OK!\n", __FUNCTION__ );
1.436 + //}
1.437 + if (str_list!=NULL)
1.438 + g_object_unref (str_list);
1.439 +
1.440 +}
1.441 +
1.442 +gboolean
1.443 +gmyth_file_transfer_is_recording ( GMythFileTransfer *file_transfer )
1.444 +{
1.445 + gboolean ret = TRUE;
1.446 +
1.447 + GMythStringList *str_list = gmyth_string_list_new ();
1.448 +
1.449 + g_debug ( "[%s]\n", __FUNCTION__ );
1.450 + g_static_mutex_lock (&mutex);
1.451 +
1.452 + g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER,
1.453 + file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id );
1.454 + gmyth_string_list_append_string (str_list, file_transfer->query);
1.455 + gmyth_string_list_append_string (str_list, g_string_new ("IS_RECORDING"));
1.456 +
1.457 + gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list );
1.458 +
1.459 + if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 )
1.460 + {
1.461 + GString *str = NULL;
1.462 + if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strcmp( str->str, "bad" )!= 0 ) {
1.463 + gint is_rec = gmyth_string_list_get_int( str_list, 0 );
1.464 + if ( is_rec != 0 )
1.465 + ret = TRUE;
1.466 + else
1.467 + ret = FALSE;
1.468 + }
1.469 + }
1.470 + g_print( "[%s] %s, stream is %s being recorded!\n", __FUNCTION__, ret ? "YES" : "NO", ret ? "" : "NOT" );
1.471 + g_static_mutex_unlock (&mutex);
1.472 +
1.473 + if ( str_list != NULL )
1.474 + g_object_unref (str_list);
1.475 +
1.476 + return ret;
1.477 +
1.478 +}
1.479 +
1.480 +gint64
1.481 +gmyth_file_transfer_get_file_position ( GMythFileTransfer *file_transfer )
1.482 +{
1.483 + gint64 pos = 0;
1.484 +
1.485 + GMythStringList *str_list = gmyth_string_list_new ();
1.486 +
1.487 + g_debug ( "[%s]\n", __FUNCTION__ );
1.488 + g_static_mutex_lock (&mutex);
1.489 +
1.490 + g_string_printf( file_transfer->query, "%s %d", GMYTHTV_RECORDER_HEADER,
1.491 + file_transfer->rec_id >= 0 ? file_transfer->rec_id : file_transfer->card_id );
1.492 +
1.493 + gmyth_string_list_append_string (str_list, file_transfer->query);
1.494 + gmyth_string_list_append_string (str_list, g_string_new ("GET_FILE_POSITION"));
1.495 +
1.496 + gmyth_socket_sendreceive_stringlist ( file_transfer->control_sock, str_list );
1.497 +
1.498 + if ( str_list != NULL && gmyth_string_list_length(str_list) > 0 )
1.499 + {
1.500 + GString *str = NULL;
1.501 + if ( ( str = gmyth_string_list_get_string( str_list, 0 ) ) != NULL && strstr ( str->str, "bad" ) == NULL )
1.502 + pos = gmyth_util_decode_long_long( str_list, 0 );
1.503 + }
1.504 + g_static_mutex_unlock (&mutex);
1.505 +
1.506 +#ifndef GMYTHTV_ENABLE_DEBUG
1.507 +
1.508 + g_print( "[%s] Got file position = %lld\n", __FUNCTION__, pos );
1.509 +#endif
1.510 + if (str_list!=NULL)
1.511 + g_object_unref (str_list);
1.512 +
1.513 + return pos;
1.514 +
1.515 +}
1.516 +
1.517 + glong
1.518 +gmyth_file_transfer_get_recordernum( GMythFileTransfer *transfer )
1.519 +{
1.520 + return transfer->recordernum;
1.521 +}
1.522 +
1.523 + glong
1.524 +gmyth_file_transfer_get_filesize( GMythFileTransfer *transfer )
1.525 +{
1.526 + return transfer->filesize;
1.527 +}
1.528 +
1.529 + gboolean
1.530 +gmyth_file_transfer_isopen( GMythFileTransfer *transfer )
1.531 +{
1.532 + return (transfer->sock != NULL && transfer->control_sock != NULL);
1.533 +}
1.534 +
1.535 + void
1.536 +gmyth_file_transfer_close( GMythFileTransfer *transfer )
1.537 +{
1.538 + GMythStringList *strlist;
1.539 +
1.540 + if (transfer->control_sock == NULL)
1.541 + return;
1.542 +
1.543 + strlist = gmyth_string_list_new( );
1.544 +
1.545 + g_string_printf( transfer->query, "%s %d", GMYTHTV_QUERY_HEADER,
1.546 + transfer->recordernum );
1.547 + gmyth_string_list_append_string( strlist, transfer->query );
1.548 + gmyth_string_list_append_char_array( strlist, "DONE" );
1.549 +
1.550 +
1.551 + if ( gmyth_socket_sendreceive_stringlist(transfer->control_sock, strlist) <= 0 )
1.552 + {
1.553 + g_printerr( "Remote file timeout.\n" );
1.554 + }
1.555 +
1.556 + if (transfer->sock)
1.557 + {
1.558 + g_object_unref( transfer->sock );
1.559 + transfer->sock = NULL;
1.560 + }
1.561 +
1.562 + if (transfer->control_sock)
1.563 + {
1.564 + g_object_unref( transfer->control_sock );
1.565 + transfer->control_sock = NULL;
1.566 + }
1.567 +
1.568 +}
1.569 +
1.570 + void
1.571 +gmyth_file_transfer_reset_controlsock( GMythFileTransfer *transfer )
1.572 +{
1.573 + if (transfer->control_sock == NULL)
1.574 + {
1.575 + g_printerr( "gmyth_file_transfer_reset_controlsock(): Called with no control socket" );
1.576 + return;
1.577 + }
1.578 +
1.579 + GString *str = gmyth_socket_receive_response( transfer->control_sock );
1.580 +
1.581 + g_string_free( str, TRUE );
1.582 +}
1.583 +
1.584 +void
1.585 +gmyth_file_transfer_reset_sock( GMythFileTransfer *transfer )
1.586 +{
1.587 + if ( transfer->sock == NULL )
1.588 + {
1.589 + g_printerr( "gmyth_file_transfer_reset_sock(): Called with no raw socket" );
1.590 + return;
1.591 + }
1.592 +
1.593 + GString *str = gmyth_socket_receive_response( transfer->sock );
1.594 +
1.595 + g_string_free( str, TRUE );
1.596 +}
1.597 +
1.598 +void
1.599 +gmyth_file_transfer_reset( GMythFileTransfer *transfer )
1.600 +{
1.601 + gmyth_file_transfer_reset_controlsock( transfer );
1.602 + gmyth_file_transfer_reset_sock( transfer );
1.603 +}
1.604 +
1.605 +gint64
1.606 +gmyth_file_transfer_seek(GMythFileTransfer *transfer, guint64 pos, gint whence)
1.607 +{
1.608 + if (transfer->sock == NULL)
1.609 + {
1.610 + g_printerr( "[%s] gmyth_file_transfer_seek(): Called with no socket", __FUNCTION__ );
1.611 + return 0;
1.612 + }
1.613 +
1.614 + if (transfer->control_sock == NULL)
1.615 + return 0;
1.616 +
1.617 + // if (!controlSock->isOpen() || controlSock->error())
1.618 + // return 0;
1.619 +
1.620 + GMythStringList *strlist = gmyth_string_list_new();
1.621 + g_string_printf (transfer->query, "%s %d", GMYTHTV_QUERY_HEADER, transfer->recordernum);
1.622 + gmyth_string_list_append_string( strlist, transfer->query );
1.623 + gmyth_string_list_append_char_array( strlist, "SEEK" );
1.624 + gmyth_string_list_append_uint64( strlist, pos );
1.625 +
1.626 + gmyth_string_list_append_int( strlist, whence );
1.627 +
1.628 + if (pos > 0 )
1.629 + gmyth_string_list_append_uint64( strlist, pos );
1.630 + else
1.631 + gmyth_string_list_append_uint64( strlist, transfer->readposition );
1.632 +
1.633 + gmyth_socket_sendreceive_stringlist( transfer->control_sock, strlist );
1.634 +
1.635 + gint64 retval = gmyth_string_list_get_int64(strlist, 0);
1.636 + transfer->readposition = retval;
1.637 + g_print( "[%s] got reading position pointer from the streaming = %lld\n",
1.638 + __FUNCTION__, retval );
1.639 +
1.640 + //gmyth_file_transfer_reset( transfer );
1.641 +
1.642 + return retval;
1.643 +}
1.644 +
1.645 +static gboolean
1.646 +myth_control_sock_listener( GIOChannel *source, GIOCondition condition, gpointer data )
1.647 +{
1.648 +
1.649 + GIOStatus ret;
1.650 + GError *err = NULL;
1.651 + gchar *msg = g_strdup("");
1.652 +
1.653 + g_static_mutex_lock( &mutex );
1.654 +
1.655 + gsize len;
1.656 + if (condition & G_IO_HUP)
1.657 + g_error ("Read end of pipe died!\n");
1.658 + ret = g_io_channel_read_line ( source, &msg, &len, NULL, &err);
1.659 + if ( ret == G_IO_STATUS_ERROR )
1.660 + g_error ("[%s] Error reading: %s\n", __FUNCTION__, err != NULL ? err->message : "" );
1.661 + g_print ("\n\n\n\n\n\n[%s]\t\tEVENT: Read %u bytes: %s\n\n\n\n\n", __FUNCTION__, len, msg != NULL ? msg : "" );
1.662 + if ( msg != NULL )
1.663 + g_free (msg);
1.664 +
1.665 + g_static_mutex_unlock( &mutex );
1.666 +
1.667 + return TRUE;
1.668 +
1.669 +}
1.670 +
1.671 +static void*
1.672 +myth_init_io_watchers( void *data )
1.673 +{
1.674 + GMythFileTransfer *transfer = (GMythFileTransfer*)data;
1.675 + io_watcher_context = g_main_context_new();
1.676 + GMainLoop *loop = g_main_loop_new( NULL, FALSE );
1.677 +
1.678 + GSource *source = NULL;
1.679 +
1.680 + if ( transfer->event_sock->sd_io_ch != NULL )
1.681 + source = g_io_create_watch( transfer->event_sock->sd_io_ch, G_IO_IN | G_IO_HUP );
1.682 + else
1.683 + goto cleanup;
1.684 +
1.685 + g_source_set_callback ( source, (GSourceFunc)myth_control_sock_listener, NULL, NULL );
1.686 +
1.687 + g_source_attach( source, io_watcher_context );
1.688 +
1.689 + if (source==NULL) {
1.690 + g_printerr( "[%s] Error adding watch listener function to the IO control channel!\n", __FUNCTION__ );
1.691 + goto cleanup;
1.692 + }
1.693 +
1.694 + g_print( "[%s]\tOK! Starting listener on the MONITOR event socket...\n", __FUNCTION__ );
1.695 +
1.696 + g_main_loop_run( loop );
1.697 +
1.698 +cleanup:
1.699 + if ( source != NULL )
1.700 + g_source_unref( source );
1.701 +
1.702 + g_main_loop_unref( loop );
1.703 +
1.704 + g_main_context_unref( io_watcher_context );
1.705 +
1.706 + return NULL;
1.707 +}
1.708 +
1.709 +
1.710 +gint
1.711 +gmyth_file_transfer_read(GMythFileTransfer *transfer, void *data, gint size, gboolean read_unlimited)
1.712 +{
1.713 + gint recv = 0;
1.714 + gsize bytes_read = 0;
1.715 + gint sent = 0;
1.716 + guint remaining = 0;
1.717 + gboolean response = FALSE;
1.718 +
1.719 + GIOChannel *io_channel;
1.720 + GIOChannel *io_channel_control;
1.721 +
1.722 + GIOCondition io_cond;
1.723 + GIOCondition io_cond_control;
1.724 + GIOStatus io_status = G_IO_STATUS_NORMAL, io_status_control = G_IO_STATUS_NORMAL;
1.725 +
1.726 + gint buf_len = GMYTHTV_BUFFER_SIZE;
1.727 +
1.728 + GMythStringList *strlist = NULL;
1.729 + GError *error = NULL;
1.730 +
1.731 + gchar *trash = g_strdup("");
1.732 +
1.733 + g_return_val_if_fail ( data != NULL, -2 );
1.734 +
1.735 + /* gets the size of the entire file, if the size requested is lesser than 0 */
1.736 + if ( size <= 0 )
1.737 + size = transfer->filesize;
1.738 +
1.739 + io_channel = transfer->sock->sd_io_ch;
1.740 + io_channel_control = transfer->control_sock->sd_io_ch;
1.741 +
1.742 + //g_io_channel_set_flags( io_channel, G_IO_FLAG_APPEND |
1.743 + // G_IO_STATUS_AGAIN | G_IO_FLAG_IS_READABLE | G_IO_FLAG_IS_WRITEABLE |
1.744 + // G_IO_FLAG_IS_SEEKABLE, NULL );
1.745 +
1.746 + io_status = g_io_channel_set_encoding( io_channel, NULL, &error );
1.747 + if ( io_status == G_IO_STATUS_NORMAL )
1.748 + g_print( "[%s] Setting encoding to binary data socket).\n", __FUNCTION__ );
1.749 +
1.750 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.751 +
1.752 + io_cond_control = g_io_channel_get_buffer_condition( io_channel );
1.753 +
1.754 + if ( transfer->sock == NULL || ( io_status == G_IO_STATUS_ERROR ) )
1.755 + {
1.756 + g_printerr( "gmyth_file_transfer_read(): Called with no raw socket.\n" );
1.757 + recv = -1;
1.758 + goto cleanup;
1.759 + }
1.760 +
1.761 + if ( transfer->control_sock == NULL || ( io_status_control == G_IO_STATUS_ERROR ) )
1.762 + {
1.763 + g_printerr( "gmyth_file_transfer_read(): Called with no control socket.\n" );
1.764 + recv = -1;
1.765 + goto cleanup;
1.766 + }
1.767 +
1.768 + /*
1.769 + if (!controlSock->isOpen() || controlSock->error())
1.770 + return -1;
1.771 + */
1.772 +
1.773 + if ( ( io_cond & G_IO_IN ) != 0 ) {
1.774 + do
1.775 + {
1.776 + trash = g_new0( gchar, GMYTHTV_BUFFER_SIZE );
1.777 +
1.778 + io_status = g_io_channel_read_chars( io_channel, trash,
1.779 + GMYTHTV_BUFFER_SIZE, &bytes_read, &error);
1.780 +
1.781 + g_print( "[%s] cleaning buffer on IO binary channel... %d bytes gone!\n",
1.782 + __FUNCTION__, bytes_read );
1.783 +
1.784 + if ( trash != NULL )
1.785 + g_free( trash );
1.786 +
1.787 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.788 +
1.789 + } while ( ( io_cond & G_IO_IN ) != 0 && ( io_status != G_IO_STATUS_ERROR ) && (error == NULL) );
1.790 +
1.791 + //if ( trash!= NULL )
1.792 + // g_free( trash );
1.793 + }
1.794 +
1.795 + if ( ( io_cond_control & G_IO_IN ) != 0 ) {
1.796 + GMythStringList *strlist_tmp = gmyth_string_list_new();
1.797 + gmyth_socket_read_stringlist( transfer->control_sock, strlist_tmp );
1.798 + g_object_unref( strlist_tmp );
1.799 + }
1.800 +
1.801 + wait_to_transfer = 0;
1.802 +
1.803 + //while ( transfer->live_tv && ( gmyth_file_transfer_get_file_position( transfer ) < 4096 ) &&
1.804 + // wait_to_transfer++ < GMYTHTV_TRANSFER_MAX_WAITS )
1.805 + // g_usleep( 1000*50 ); /* waits just for 2/10 second */
1.806 +
1.807 + //g_thread_create( myth_init_io_watchers, (void*)transfer, FALSE, NULL );
1.808 + //g_printerr( "[%s] Watch listener function to the IO control channel on thread %p.\n", __FUNCTION__, g_thread_self() );
1.809 +
1.810 + //g_static_mutex_lock (&mutex);
1.811 + //strlist = gmyth_string_list_new();
1.812 +
1.813 + g_string_printf ( transfer->query, "%s %d",
1.814 + /*transfer->live_tv ? GMYTHTV_RECORDER_HEADER :*/ GMYTHTV_QUERY_HEADER,
1.815 + /* transfer->live_tv ? transfer->card_id :*/ transfer->recordernum ); // transfer->recordernum
1.816 + g_print( "\t[%s] Transfer_query = %s\n", __FUNCTION__, transfer->query->str );
1.817 +
1.818 + sent = size;
1.819 + remaining = size - recv;
1.820 + //g_static_mutex_unlock( &mutex );
1.821 + //data = (void*)g_new0( gchar, size );
1.822 +
1.823 + //g_io_channel_flush( io_channel, NULL );
1.824 +
1.825 + //g_static_mutex_lock( &mutex );
1.826 +
1.827 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.828 +
1.829 + while ( recv < size && !response )//&& ( io_cond & G_IO_IN ) != 0 )
1.830 + {
1.831 + g_io_channel_flush( io_channel_control, NULL );
1.832 +
1.833 + strlist = gmyth_string_list_new();
1.834 + gmyth_string_list_append_char_array( strlist, transfer->query->str );
1.835 + gmyth_string_list_append_char_array( strlist,
1.836 + /*transfer->live_tv ? "REQUEST_BLOCK_RINGBUF" :*/ "REQUEST_BLOCK" );
1.837 + gmyth_string_list_append_int( strlist, remaining );
1.838 + gmyth_socket_write_stringlist( transfer->control_sock, strlist );
1.839 +
1.840 + guint count_bytes = 0;
1.841 +
1.842 + do
1.843 + {
1.844 + //buf_len = ( sent - recv ) > GMYTHTV_BUFFER_SIZE ? GMYTHTV_BUFFER_SIZE : ( sent - recv );
1.845 + if ( remaining > GMYTHTV_BUFFER_SIZE ) {
1.846 + buf_len = GMYTHTV_BUFFER_SIZE;
1.847 + } else {
1.848 + buf_len = remaining;
1.849 + }
1.850 +
1.851 + bytes_read = 0;
1.852 +
1.853 + io_status = g_io_channel_read_chars( io_channel, data + recv,
1.854 + buf_len, &bytes_read, &error );
1.855 +
1.856 + //g_static_mutex_unlock( &mutex );
1.857 + /*
1.858 + GString *sss = g_string_new("");
1.859 + sss = g_string_append_len( sss, (gchar*)data+recv, bytes_read );
1.860 +
1.861 + g_print( "[%s] Reading buffer (length = %d)\n", __FUNCTION__, bytes_read);
1.862 + */
1.863 + if ( bytes_read > 0 )
1.864 + {
1.865 + //if ( bytes_read <= buf_len )
1.866 + recv += bytes_read;
1.867 + count_bytes += bytes_read;
1.868 + remaining -= bytes_read;
1.869 + g_print( "[%s] Reading buffer (bytes read = %d, remaining = %d)\n", __FUNCTION__, bytes_read, remaining );
1.870 + if ( remaining == 0 ) {
1.871 + break;
1.872 + }
1.873 + } else {
1.874 + break;
1.875 + }
1.876 +
1.877 + //if ( remaining > 0 ) {
1.878 +
1.879 + if ( io_status == G_IO_STATUS_EOF ) {
1.880 + g_print( "[%s] got EOS!", __FUNCTION__ );
1.881 + break;
1.882 + } else if ( io_status == G_IO_STATUS_ERROR ) {
1.883 + g_print( "[%s] gmyth_file_transfer_read(): socket error.\n", __FUNCTION__ );
1.884 + break;
1.885 + }
1.886 + //}
1.887 +
1.888 + /* increase buffer size, to allow get more data (do not obey to the buffer size) */
1.889 + if ( read_unlimited == TRUE ) {
1.890 + // FOR NOW, DO NOTHING!!!
1.891 + //if ( recv > buf_len )
1.892 + // sent += (bytes_read - buf_len) + 1;
1.893 + }
1.894 +
1.895 + /* verify if the input (read) buffer is ready to receive data */
1.896 + io_cond = g_io_channel_get_buffer_condition( io_channel );
1.897 +
1.898 + g_print( "[%s]\t io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
1.899 + ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
1.900 +
1.901 + //if ( recv == size )
1.902 + //break;
1.903 +
1.904 + } while ( remaining > 0 );//&& ( io_status == G_IO_STATUS_NORMAL ) );
1.905 +
1.906 + // if ( ( recv < size ) ) {
1.907 + // finish_read = FALSE;
1.908 + //}
1.909 +
1.910 + io_cond_control = g_io_channel_get_buffer_condition( io_channel_control );
1.911 + if ( remaining == 0 )//( io_cond_control & G_IO_IN ) != 0 )
1.912 + {
1.913 + gmyth_socket_read_stringlist( transfer->control_sock, strlist );
1.914 + if ( strlist != NULL && gmyth_string_list_length( strlist ) > 0 )
1.915 + {
1.916 + sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error
1.917 + g_print( "[%s] got SENT buffer message = %d\n", __FUNCTION__, sent );
1.918 + if ( sent != 0 )
1.919 + {
1.920 + g_print( "[%s]\t received = %d bytes, backend says %d bytes sent, "\
1.921 + "io_cond %s prepared for reading! (G_IO_IN) !!!\n\n", __FUNCTION__,
1.922 + recv, sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
1.923 +
1.924 + if ( sent == count_bytes )
1.925 + {
1.926 + response = ( recv == size );
1.927 + g_print( "[%s]\t\tsent %d, which is equals to bytes_read = %d\n\n",
1.928 + __FUNCTION__, sent, count_bytes );
1.929 + if ( response == TRUE )
1.930 + break;
1.931 + }
1.932 + else
1.933 + {
1.934 + g_print( "[%s]\t\tsent %d, which is NOT equals to bytes_read = %d\n\n",
1.935 + __FUNCTION__, sent, count_bytes );
1.936 + goto cleanup;
1.937 + //response = FALSE;
1.938 + //break;
1.939 + }
1.940 + } else {
1.941 + break;
1.942 + //goto cleanup;
1.943 + } // if
1.944 + } // if - reading control response from backend
1.945 + } else {
1.946 + response = FALSE;
1.947 + } // if - stringlist response
1.948 +
1.949 + } // while
1.950 +
1.951 + io_cond_control = g_io_channel_get_buffer_condition( io_channel_control );
1.952 + // io_cond = g_io_channel_get_buffer_condition( io_channel );
1.953 +
1.954 + if ( ( ( io_cond_control & G_IO_IN ) != 0 ) &&
1.955 + ( response || ( recv == size ) ) )
1.956 + {
1.957 + if ( gmyth_socket_read_stringlist( transfer->control_sock, strlist ) > 0 )
1.958 + {
1.959 + if ( strlist != NULL && gmyth_string_list_length(strlist) > 0 )
1.960 + {
1.961 + sent = gmyth_string_list_get_int( strlist, 0 ); // -1 on backend error
1.962 + g_print( "[%s]\t received = %d bytes -\tNOW returning from reading buffer I/O socket "\
1.963 + "[%s prepared for reading]! (G_IO_IN) !!!\n\n", __FUNCTION__,
1.964 + sent, ( ( io_cond & G_IO_IN ) != 0 ) ? "IS" : "IS NOT" );
1.965 + }
1.966 + }
1.967 + else
1.968 + {
1.969 + g_printerr ( "gmyth_file_transfer_read(): No response from control socket.");
1.970 + recv = -1;
1.971 + }
1.972 +
1.973 + }
1.974 + else if ( error != NULL )
1.975 + {
1.976 + g_printerr( "[%s] Error occurred: (%d, %s)\n", __FUNCTION__, error->code, error->message );
1.977 + }
1.978 +
1.979 +cleanup:
1.980 + //g_static_mutex_unlock (&mutex);
1.981 +
1.982 + if ( trash != NULL )
1.983 + g_free( trash );
1.984 +
1.985 + if ( strlist != NULL )
1.986 + g_object_unref( strlist );
1.987 +
1.988 + g_print( "gmyth_file_transfer_read(): reqd=%d, rcvd=%d, rept=%d, "\
1.989 + "(rcvd and rept MUST be the same!)\n", size,
1.990 + recv, sent );
1.991 +
1.992 + //if ( ( recv != size ) || ( sent != size ) ) {
1.993 + //recv = size;
1.994 + //}
1.995 +
1.996 + if ( error != NULL ) {
1.997 + g_printerr( "Cleaning-up ERROR: %s [msg = %s, code = %d]\n", __FUNCTION__, error->message,
1.998 + error->code );
1.999 + g_error_free( error );
1.1000 + }
1.1001 +
1.1002 + return recv;
1.1003 +}
1.1004 +
1.1005 +void
1.1006 +gmyth_file_transfer_settimeout( GMythFileTransfer *transfer, gboolean fast )
1.1007 +{
1.1008 +
1.1009 + GMythStringList *strlist = NULL;
1.1010 +
1.1011 + if ( transfer->timeoutisfast == fast )
1.1012 + return;
1.1013 +
1.1014 + if ( transfer->sock == NULL )
1.1015 + {
1.1016 + g_printerr( "gmyth_file_transfer_settimeout(): Called with no socket" );
1.1017 + return;
1.1018 + }
1.1019 +
1.1020 + if ( transfer->control_sock == NULL )
1.1021 + return;
1.1022 +
1.1023 + strlist = gmyth_string_list_new();
1.1024 + gmyth_string_list_append_string( strlist, transfer->query );
1.1025 + gmyth_string_list_append_char_array( strlist, "SET_TIMEOUT" );
1.1026 + gmyth_string_list_append_int( strlist, fast );
1.1027 +
1.1028 + gmyth_socket_write_stringlist( transfer->control_sock, strlist );
1.1029 + gmyth_socket_read_stringlist( transfer->control_sock, strlist );
1.1030 +
1.1031 + transfer->timeoutisfast = fast;
1.1032 +
1.1033 +}
1.1034 +
1.1035 +#ifdef DO_TESTING
1.1036 +
1.1037 + int
1.1038 +main( int argc, char *argv[] )
1.1039 +{
1.1040 + g_type_init();
1.1041 +
1.1042 + GMythFileTransfer *file_transfer = gmyth_file_transfer_new( 1,
1.1043 + g_string_new("myth://192.168.1.109:6543/jshks.nuv"), -1, GMYTHTV_VERSION );
1.1044 + gmyth_file_transfer_setup( &file_transfer );
1.1045 + gchar *data = g_strdup("");
1.1046 +
1.1047 + gint num = gmyth_file_transfer_read( file_transfer, data, -1 );
1.1048 +
1.1049 + return 0;
1.1050 +
1.1051 +}
1.1052 +
1.1053 +#endif