# HG changeset patch # User morphbr # Date 1175224372 -3600 # Node ID 57833200a4150bdcd90b66635e5f8c0b603afcbe # Parent 28879368706bba619f7acee33465c0987fcd51b5 [svn r475] Regarding GMyth-Stream: - Bug fixes - Better error handling - Return messages to client - Created a lib.py with common functions - Improved API of comm plugins diff -r 28879368706b -r 57833200a415 gmyth-stream/lib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/lib.py Fri Mar 30 04:12:52 2007 +0100 @@ -0,0 +1,7 @@ +import time + +def now(): + return time.strftime("%Y-%m-%d %H:%M:%S"); + +def log(msg): + print "[%s] %s" % (now(), msg) diff -r 28879368706b -r 57833200a415 gmyth-stream/main.py --- a/gmyth-stream/main.py Thu Mar 29 23:14:32 2007 +0100 +++ b/gmyth-stream/main.py Fri Mar 30 04:12:52 2007 +0100 @@ -1,13 +1,10 @@ #!/usr/bin/python +import os +import lib import sys -import os import ConfigParser -def now(): - return time.strftime("%Y-%m-%d %H:%M:%S"); - - config = ConfigParser.ConfigParser() config.read("stream.conf") @@ -22,7 +19,7 @@ # Start Our Server: server = Server(config) -print "--> Starting the server..." +lib.log("Starting GMyth-Stream server") while (server.finish == 0): con, client = server.getRequest() @@ -32,29 +29,39 @@ if not msg: break - elif (msg == "SETUP"): + lib.log("Received %s from: %s" % (msg, client) ) + + if (msg == "SETUP"): setup = server.getMsg(1024).strip().split(" ") - media.setup(setup[0], setup[1], setup[2], \ - setup[3], setup[4], setup[5], - setup[6], setup[7], setup[8], - setup[9]) + if ( len(setup) == 10 ): + media.setup(setup[0], setup[1], setup[2], \ + setup[3], setup[4], setup[5], + setup[6], setup[7], setup[8], + setup[9]) + + server.Ack("SETUP") + + else: + lib.log("Wrong SETUP command from: %s" % client) elif (msg == "PLAY"): media.play() + server.Ack("PLAY") elif (msg == "STOP"): media.stop() + server.Ack("STOP") elif (msg == "CLOSE"): server.finish = 1 media.stop() + server.Ack("CLOSE") break - print "[%s] %s: %s" % (now(), client, msg) - - print "[%s] Closing connection with %s" % (now(), client) + lib.log("Closing connection with %s" % client[0]) server.disconnect_client(con) server.stop() del(server) -print "--> Server stopped..." +lib.log("Server stopped. Closing...") + diff -r 28879368706b -r 57833200a415 gmyth-stream/plugins/comm/tcp.py --- a/gmyth-stream/plugins/comm/tcp.py Thu Mar 29 23:14:32 2007 +0100 +++ b/gmyth-stream/plugins/comm/tcp.py Fri Mar 30 04:12:52 2007 +0100 @@ -1,3 +1,4 @@ +import lib import time import socket @@ -12,17 +13,20 @@ self.tcp.bind( (self.host, self.port) ) self.tcp.listen(1) - def now(self): - return time.strftime("%Y-%m-%d %H:%M:%S"); + def getMsg(self, size): + return self.con.recv(size) - def getMsg(self, size): - con = self.data[0] - return con.recv(size) + def sendMsg(self, msg): + self.con.send(msg) + + def Ack(self, command): + msg = "[%s] Command %s received" % (lib.now(), command) + self.sendMsg(msg + "\n") def getRequest(self): - self.data = self.tcp.accept() - print "[%s] Received request from ip=%s" % (self.now(), self.data[1] ) - return self.data + self.con, self.client = self.tcp.accept() + print "[%s] Received request from ip=%s" % (lib.now(), self.client ) + return (self.con, self.client) def disconnect_client(self, connection): connection.close() diff -r 28879368706b -r 57833200a415 gmyth-stream/plugins/comm/xmlrpc.py --- a/gmyth-stream/plugins/comm/xmlrpc.py Thu Mar 29 23:14:32 2007 +0100 +++ b/gmyth-stream/plugins/comm/xmlrpc.py Fri Mar 30 04:12:52 2007 +0100 @@ -1,74 +1,58 @@ - ''' - # GMyth-Stream - # - # @file plugins/comm/xmlrpc.py - # - # @brief

Plugin for GMyth-Stream - # - # Copyright (C) 2007 INdT - Instituto Nokia de Tecnologia. - # @author Artur Duque de Souza - # - # - # This program is free software; you can redistribute it and/or modify - # it under the terms of the GNU Lesser General Public License as published by - # the Free Software Foundation; either version 2 of the License, or - # (at your option) any later version. - # - # This program is distributed in the hope that it will be useful, - # but WITHOUT ANY WARRANTY; without even the implied warranty of - # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - # GNU General Public License for more details. - # - # You should have received a copy of the GNU Lesser General Public License - # along with this program; if not, write to the Free Software - # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - # - ''' - +import lib import SimpleXMLRPCServer class Handler: - def __init__(self, pool): - self.pool = pool + def __init__(self, recv_pool, send_pool): + self.recv_pool = recv_pool + self.send_pool = send_pool + self.getMsg = self.sendMsg def _listMethods(self): - return ['Setup', 'Play', 'Stop', 'Close'] + return ['setup', 'play', 'stop', 'close', 'getMsg'] def _methodHelp(self, method): - if method == 'Setup': - return "Setup the Media: Setup(filename,codec,bitrate,widht,height,port)" - elif method == 'Play': - return "Play the Media: Play()" - elif method == 'Stop': - return "Stop the Media: Stop()" - elif method == 'Close': - return "Close the connection: Close()" + if method == 'setup': + return "Setup the Media: setup( filename, mux, vcodec, vbitrate, fps, acodec, abitrate, width, height, port" + elif method == 'play': + return "Play the Media: play()" + elif method == 'stop': + return "Stop the Media: stop()" + elif method == 'close': + return "Close the connection: close()" + elif method == 'getMsg': + return "Return the first message in the pool: getMsg()" else: # By convention, return empty # string if no help is available return "" + def setup(self, filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port): + self.recv_pool.append("SETUP") + self.recv_pool.append("%s %s %s %s %s %s %s" % (filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port) + return self.sendMsg() - def Setup(self, filename, codec, bitrate, width, height, port): - self.pool.append("SETUP") - self.pool.append("%s %s %s %s %s %s" % (filename, codec, bitrate,\ - width, height, port)) - return 0 + def play(self): + self.recv_pool.append("PLAY") + return self.sendMsg() - def Play(self): - self.pool.append("PLAY") - return 0 + def stop(self): + self.recv_pool.append("STOP") + return self.sendMsg() - def Stop(self): - self.pool.append("STOP") - return 0 + def close(self): + self.recv_pool.append("CLOSE") + return self.sendMsg() - def Close(self): - self.pool.append("CLOSE") - return 0 + def sendMsg(self): + if self.send_pool != []: + return self.send_pool.pop(0) + else: + return "" class Server: @@ -77,20 +61,28 @@ self.host = 'localhost' self.port = int(config.get("Comm", "port")) self.finish = 0 - self.pool = [] + self.recv_pool = [] + self.send_pool = [] - self.handler = Handler(self.pool) + self.handler = Handler(self.recv_pool, self.send_pool) self.xmlrpc = SimpleXMLRPCServer.SimpleXMLRPCServer((self.host, self.port)) self.xmlrpc.register_instance(self.handler) def getMsg(self, size): - try: - return self.pool.pop(0) - except IndexError: + if self.recv_pool != []: + return self.recv_pool.pop(0) + else: return "" + def sendMsg(self, msg): + self.send_pool.append(msg) + + def Ack(self, command): + msg = "[%s] Command %s received" % (lib.now(), command) + self.sendMsg(msg + "\n") + def getRequest(self): self.xmlrpc.handle_request() return (0, "RPC Client") diff -r 28879368706b -r 57833200a415 gmyth-stream/plugins/media/ffmpeg.py --- a/gmyth-stream/plugins/media/ffmpeg.py Thu Mar 29 23:14:32 2007 +0100 +++ b/gmyth-stream/plugins/media/ffmpeg.py Fri Mar 30 04:12:52 2007 +0100 @@ -1,5 +1,6 @@ import os import sys +import lib import time import socket import ConfigParser @@ -9,6 +10,8 @@ def __init__(self, config): self.config = config + self.socket = None + self.child_pid = None def setup(self, filename, mux, vcodec, vbitrate,\ fps, acodec, abitrate, width, height, port): @@ -27,18 +30,22 @@ # good one: /tmp/mpg/cpm.mpg mpeg mpeg1video 400 25 mp2 192 320 240 5000 self.path = self.config.get("FFmpeg", "path") - self.path += " -i %s -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % ( + self.path += " -i %s -bufsize 4096 -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % ( self.filename, self.mux, self.vcodec, self.vbitrate,\ self.fps, self.acodec, self.abitrate, self.width, self.height) + if (self.socket != None): + del(self.socket) + self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.socket.bind( ('', self.port) ) + self.socket.settimeout(10) def play(self): self.socket.listen(1) - print "Starting FFmpeg: %s" % self.path + lib.log("Starting FFmpeg: %s" % self.path) # exec FFmpeg and get stdout child_stdin, child_stdout = os.popen2(self.path) @@ -49,24 +56,35 @@ #child conn,addr= self.socket.accept() - print "--> Sending Data..." - data = child_stdout.read(1024) + lib.log("Sending Data to client: %s" % addr[0]) + data = child_stdout.read(4096) + conn.settimeout(5) + retry = 0 - while( data != ""): + while( data != "" and retry < 5): try: conn.send(data) except socket.error: - break + lib.log("Socket error (maybe timeout ?)") + retry = retry + 1 + data = child_stdout.read(1024) - print "--> Finished sending data..." - conn.close() + if (retry < 5): + lib.log("Finished sending Data to client: %s" % addr[0]) + else: + lib.log("Client timed out") + + child_stdout.close() + #conn.close() + #sys.exit() def stop(self): - print "--> Trying to stop FFmpeg process..." + lib.log("Closing socket") self.socket.close() - os.kill(self.child_pid, 9) - + lib.log("Trying to stop FFmpeg process") + if (self.child_pid != None): + os.kill(self.child_pid, 9) diff -r 28879368706b -r 57833200a415 gmyth-stream/tests/client_ffmpeg.py --- a/gmyth-stream/tests/client_ffmpeg.py Thu Mar 29 23:14:32 2007 +0100 +++ b/gmyth-stream/tests/client_ffmpeg.py Fri Mar 30 04:12:52 2007 +0100 @@ -1,24 +1,43 @@ import os +import sys import time import socket -HOST='localhost' -PORT=5000 -s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) -s.settimeout(10) +if len(sys.argv) < 2: + HOST = 'localhost' + PORT = 5000 +elif len(sys.argv) == 2: + HOST = sys.argv[1] + PORT = 5000 +else: + HOST = sys.argv[1] + PORT = int(sys.argv[2]) -arq = open('/tmp/dvb.mpg','r') +socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) +socket.settimeout(10) + +try: + socket.connect( (HOST,PORT) ) +except: + print "\n--> Could not connect to ('%s':'%d')\n" % (HOST,PORT) + sys.exit(-1) + mplayer = os.popen("which mplayer").read().strip() -mplayer += " -noidx -" +mplayer += " - 1> /dev/null" pin, pout = os.popen2(mplayer) -s.connect((HOST,PORT)) -i = 0 - -data = s.recv(1024) +data = socket.recv(1024) while (data != ""): pin.write(data) - data = s.recv(1024) + data = socket.recv(1024) +pin.close() +pout.close() + + +# from select import select +# r, w, x = select([pout], []. [], 0) +# if pout in r: +# pout.read(32)