# HG changeset patch # User morphbr # Date 1175896686 -3600 # Node ID 16312d0021cb6f7a6c2a57941174c17065b14182 # Parent 4abc2d465d80f81dcb8af61934bbafd2f6fe9794 [svn r516] GMyth-Streamer: Barbieri's patches diff -r 4abc2d465d80 -r 16312d0021cb gmyth-stream/server/lib.py --- a/gmyth-stream/server/lib.py Thu Apr 05 19:58:40 2007 +0100 +++ b/gmyth-stream/server/lib.py Fri Apr 06 22:58:06 2007 +0100 @@ -1,9 +1,25 @@ import time +import logging +import os +import stat def now(): return time.strftime("%Y-%m-%d %H:%M:%S"); def log(msg): + logging.log(logging.DEBUG, msg) new_msg = "[%s] %s" % (now(), msg) - print new_msg return new_msg + + +bin_path_list = os.environ["PATH"].split(os.pathsep) +def which(prg): + for d in bin_path_list: + path = os.path.join(d, prg) + if os.path.exits(path): + st = os.stat(path) + if st[stat.ST_MODE] & 0111: + return path + return "" + + diff -r 4abc2d465d80 -r 16312d0021cb gmyth-stream/server/main.py --- a/gmyth-stream/server/main.py Thu Apr 05 19:58:40 2007 +0100 +++ b/gmyth-stream/server/main.py Fri Apr 06 22:58:06 2007 +0100 @@ -3,23 +3,36 @@ import os import lib import sys +import imp import ConfigParser +import logging as log + +log.basicConfig(level=log.DEBUG, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt='%Y-%m-%d %H:%M:%S') config = ConfigParser.ConfigParser() config.read("stream.conf") +def load_module(pathlist, name): + fp, path, desc = imp.find_module(name, pathlist) + try: + module = imp.load_module(name, fp, path, desc) + return module + finally: + if fp: + fp.close() + + media_plugin = config.get("Media", "engine") -exec("from plugins.media.%s import *" % media_plugin) - -media = Media(config) +media_plugin_module = load_module(["./plugins/media"], media_plugin) +media = media_plugin_module.Media(config) comm_plugin = config.get("Comm", "engine") -exec("from plugins.comm.%s import *" % comm_plugin) +comm_plugin_module = load_module(["./plugins/comm"], comm_plugin) +server = comm_plugin_module.Server(config) -# Start Our Server: -server = Server(config) - -lib.log("Starting GMyth-Stream server") +log.info("Starting GMyth-Stream server") ''' @@ -52,66 +65,83 @@ ''' nextport = 0 -while (server.finish == 0): +def do_setup(server, filename, mux, vcodec, vbitrate, fps, acodec, abitrate, + width, height, *options): + global nextport + nextport += 1 + ret = media.setup(filename, mux, vcodec, vbitrate, fps, acodec, + abitrate, width, height, nextport, options) + if ret == 0: + server.sendOk() + else: + server.sendNotOk(ret) + return True + +def do_play(server): + media.play() + server.sendOk("%d" % nextport) + return True + + +def do_stop(server): + media.stop() + server.sendOk() + return True + +def do_quit(server): + server.finish = 1 + media.stop() + server.sendOk() + return True + + +mapping = { + "SETUP": do_setup, + "PLAY": do_play, + "STOP": do_stop, + "QUIT": do_quit, + } + +def dispatch(server, msg): + pieces = msg.split() + if len(pieces) < 1: + log.error("Invalid client command format: %r" % msg) + server.sendNotOk("Invalid Format") + return False + + cmd = pieces[0] + f = mapping.get(cmd, None) + if not f: + log.error("Unknow command: %r" % msg) + server.sendNotOk("Unknow Command") + return False + + try: + return f(server, *pieces[1:]) + except Exception, e: + log.error("Could not execute %r: %s" % (msg, e)) + server.sendNotOk(str(e)) + return False + + + +while not server.finish: conn, client, port = server.getRequest() - server.sendMsg("Welcome to GMyth-Streamer Master") if nextport == 0: nextport = port - while True: - msg = server.getMsg(1024).strip() - - if not msg: break - - lib.log("Received %s from: %s" % (msg, client) ) - - if msg == "SETUP": - setup = server.getMsg(1024).strip().split(" ") - size = len(setup) - options = [] - - if size < 10: - server.sendMsg(lib.log("Wrong SETUP command from: %s" % client[0])) - - else: - - if size > 10: - i = 10 - while (i < size): - options.append(setup[i]) - i += 1 - - nextport += 1 - ret = media.setup(setup[0], setup[1], setup[2], \ - setup[3], setup[4], setup[5], - setup[6], setup[7], setup[8], - nextport, options) - - if ret == 0: - server.Ack("SETUP") - else: - server.sendMsg(lib.log(ret)) - - - elif msg == "PLAY": - media.play() - server.Ack("PLAY") - server.sendMsg("STREAM PORT=%d" % nextport) - - elif msg == "STOP": - media.stop() - server.Ack("STOP") - - elif msg == "QUIT": - server.finish = 1 - media.stop() - server.Ack("QUIT") + while not server.finish: + msg = server.getMsg() + if not msg: break - lib.log("Closing connection with %s" % client[0]) + log.info("Client %s sent command: %r" % (client, msg)) + dispatch(server, msg) + + + log.info("Closing connection with %s" % (client,)) server.disconnect_client(conn) server.stop() -del(server) -lib.log("Server stopped. Closing...") +log.info("Server stopped. Closing...") diff -r 4abc2d465d80 -r 16312d0021cb gmyth-stream/server/plugins/comm/tcp.py --- a/gmyth-stream/server/plugins/comm/tcp.py Thu Apr 05 19:58:40 2007 +0100 +++ b/gmyth-stream/server/plugins/comm/tcp.py Fri Apr 06 22:58:06 2007 +0100 @@ -1,36 +1,79 @@ import lib import time import socket +import logging as log class Server(object): def __init__(self, config): - self.host = '' + self.host = '0.0.0.0' self.port = int(config.get("Comm", "port")) self.finish = 0 + addr = (self.host, self.port) + log.debug("Setup TCP server at %s:%s" % addr) self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.tcp.bind( (self.host, self.port) ) + self.tcp.bind(addr) self.tcp.listen(1) + log.info("TCP server listening at %s:%s (sock=%d)" % + (self.host, self.port, self.tcp.fileno())) - def getMsg(self, size): - return self.con.recv(size) + def getMsg(self): + bytes = [] + try: + while 1: + c = self.con.recv(1) + bytes.append(c) + if not c or c == "\n": + break + except Exception, e: + log.error("Error reading message from client: %s" % e) + return None + + if not bytes or bytes[-1] != "\n": + msg = "".join(bytes) + log.error("Invalid message from client: %r" % msg) + return None + + # remove \n and \r + bytes.pop() + if bytes[-1] == "\r": + bytes.pop() + + msg = "".join(bytes) + log.debug("RECV: %r" % msg) + return msg def sendMsg(self, msg): + log.debug("SEND: %r" % msg) self.con.send(msg + "\n") - def Ack(self, command): - msg = "[%s] Command %s received" % (lib.now(), command) - self.sendMsg(msg) + def sendOk(self, payload=None): + self.sendMsg("OK %d" % bool(payload is not None)) + if payload is not None: + if not isinstance(payload, (tuple, list)): + payload = (payload,) + for e in payload: + self.sendMsg("+%s" % e) + self.sendMsg(".") + + def sendNotOk(self, reason=""): + self.sendMsg("NOTOK %r" % reason) def getRequest(self): + log.debug("Wait for client request at %s:%s (sock=%d)" % + (self.host, self.port, self.tcp.fileno())) self.con, self.client = self.tcp.accept() - print "[%s] Received request from ip=%s" % (lib.now(), self.client ) + log.info("Incoming request from %s (con=%s)" % + (self.client, self.con.fileno())) return (self.con, self.client, self.port) def disconnect_client(self, connection): + log.info("Closed request from %s (con=%s)" % + (self.client, self.con.fileno())) connection.close() def stop(self): + log.debug("Stop") self.tcp.close() diff -r 4abc2d465d80 -r 16312d0021cb gmyth-stream/server/plugins/media/mencoder.py --- a/gmyth-stream/server/plugins/media/mencoder.py Thu Apr 05 19:58:40 2007 +0100 +++ b/gmyth-stream/server/plugins/media/mencoder.py Fri Apr 06 22:58:06 2007 +0100 @@ -35,7 +35,7 @@ for opt in options: if opt == "local": - self.mplayer = os.popen("which mplayer").read().strip() + self.mplayer = lib.which("mplayer") elif opt.find("language=") >= 0: try: @@ -57,7 +57,7 @@ msg = "dvd://" + msg self.mplayer += " " + msg - self.mplayer_pid = Popen(self.mplayer, shell=True) + self.mplayer_pid = Popen(self.mplayer, shell=True, close_fds=True) def setup_mencoder(self): self.path = self.config.get("Mencoder", "path") @@ -169,7 +169,8 @@ lib.log("Starting Mencoder: %s %s" % (self.path, self.args) ) # exec Mencoder if self.mencoder_old: - self.mencoder_pid = Popen(self.path + self.args, shell=True) + self.mencoder_pid = Popen(self.path + self.args, shell=True, + close_fds=True) self.pout = open(self.fifo) else: self.path += self.args