# HG changeset patch # User morphbr # Date 1180470288 -3600 # Node ID 3fbcd3d9b2d12139dd57ae2b6ddc3d1fd01a0caf # Parent 24db164804565a11e9c953aad0365a24005ca735 [svn r724] * GMyth-Streamer version 0.3 released - Improved Log architecture; - Creation of a history for the transcoder's actions - Creation of an id for each transcoder instanciated - Also wrapps default actions for python's default logger - Created new functions to make use of this new Log architecture; - serve_new_id - serve_get_log - serve_get_all_log - _Lot_ of small bug fixes; - Inserted header for all files; - Splited files with too many lines (more than 1 class per file) in more files; diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/gms.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/gms.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +__author__ = "Artur Duque de Souza" +__author_email__ = "artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.3" +__thanks__ = "Gustavo Sverzut Barbieri" + +import sys +import os +import logging as log +from lib.server import serve_forever, load_plugins_transcoders + +log_level = log.INFO +for p in sys.argv[1:]: + if p == "-v" or p == "--verbose": + log_level -= 10 + +log.basicConfig(level=log_level, + format=("### %(asctime)s %(name)-18s \t%(levelname)-8s " + "\t%(message)s"), + datefmt="%Y-%m-%d %H:%M:%S") + +pd = os.path.join("plugins", "transcoders") +load_plugins_transcoders(pd) +serve_forever() diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/index.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/html/index.html Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,9 @@ + + GMyth-Streamer Server + +

Welcome to GMyth-Streamer Server

+ + + \ No newline at end of file diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/menu.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/html/menu.html Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,1 @@ +
  • %(name)s
  • diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/shutdown.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/html/shutdown.html Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,6 @@ + + GMyth-Streamer Server Exited + +

    GMyth-Streamer is not running anymore

    + + \ No newline at end of file diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/status.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/html/status.html Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,14 @@ + + GMyth-Streamer Server Status + +

    GMyth-Streamer Status

    + + + + diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/stop_all.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/html/stop_all.html Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,9 @@ + + GMyth-Streamer Server Stopped Transcoders + +

    GMyth-Streamer stopped running transcoders

    + + + \ No newline at end of file diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/stop_selected.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/html/stop_selected.html Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,12 @@ + + GMyth-Streamer Server Stopped Transcoders + +

    GMyth-Streamer stopped running transcoders:

    + + + + diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/log.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/lib/log.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,109 @@ +#!/usr/bin/env python + +__author__ = "Artur Duque de Souza" +__author_email__ = "artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.1" + +import os +import logging + +__all__ = ("Log", "log_structure") + +class log_structure(object): + """Structure to hold log info.""" + + def __init__(self, log=None): + self.log = log + self.history = [] + # __init__() + +# log_structure() + +class Log(object): + """This class implements a log where we can store status of + all transcoders (even from those that are not running any more).""" + + ## key = tid + ## item = ls + logs = {} + + def insert(self, tid, name): + """Insert a given tid on the log structure""" + if not self.logs.has_key(tid): + self.logs[tid] = log_structure(logging.getLogger(name)) + return True + else: + return False + # insert() + + def remove(self, tid=None): + """Cleans up all log stored for a + given tid or for all transcodes.""" + if not tid: + self.logs = {} + else: + del(self.logs[tid]) + + return True + # clean() + + def get_status(self, tid=None, all=False): + """Get the status of all transcoders or + of just one of them if it's given an tid.""" + if not tid: + ret = {} + for tids, logs in self.logs.items(): + if len(logs.history) > 0: + if not all: + ret[tids] = logs.history[-1] + else: + ret[tids] = logs.history + return ret + elif self.logs.has_key(tid) and len(self.logs[tid].history) > 0: + if not all: + return self.logs[tid].history[-1] + else: + return self.logs[tid].history + + return False + # get_status() + + def _update_status(self, tid=None, msg=""): + """Update the status of a given tid. Private method that + is only called inside error/info/debug wrappers""" + if msg != "": + self.logs[tid].history.append(msg) + return True + else: + return False + # update_status() + + def error(self, tid, msg): + """Python's Log.error wrapper""" + if self.logs.has_key(tid): + self.logs[tid].log.error("%s" % msg) + return self._update_status(tid, msg) + else: + return False + # error() + + def info(self, tid, msg): + """Python's Log.info wrapper""" + if self.logs.has_key(tid): + self.logs[tid].log.info("%s" % msg) + self._update_status(tid, msg) + return True + else: + return False + # info() + + def debug(self, tid, msg): + """Python's Log.debug wrapper""" + if self.logs.has_key(tid): + self.logs[tid].log.debug("%s" % msg) + self._update_status(tid, msg) + return True + else: + return False + # debug() diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/request_handler.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/lib/request_handler.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,332 @@ +#!/usr/bin/env python + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.4" + +import os +import threading +import SocketServer +import BaseHTTPServer +import socket +import urlparse +import cgi +import lib.utils as utils +import logging + +from log import Log +import lib.transcoder as transcoder + +__all__ = ("RequestHandler") + +class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + """Class that implements an HTTP request handler for our server.""" + log = logging.getLogger("gms.request") + def_transcoder = None + transcoders = utils.PluginSet(transcoder.Transcoder) + transcoders_log = Log() + tid_queue = [] + + menu = { + "Log": "/get_log.do", + "Stop": "/stop-transcoder.do", + "Status": "/status.do", + "All Log": "/get_all_log.do", + "Version": "/version.do", + "Shutdown": "/shutdown.do" + } + + @classmethod + def load_plugins_transcoders(cls, directory): + cls.transcoders.load_from_directory(directory) + + if cls.def_transcoder is None and cls.transcoders: + cls.def_transcoder = cls.transcoders[0].name + # load_plugins_transcoders() + + + def do_dispatch(self, body): + self.url = self.path + + pieces = urlparse.urlparse(self.path) + self.path = pieces[2] + self.query = cgi.parse_qs(pieces[4]) + + if self.path == "/": + self.serve_main(body) + elif self.path == "/shutdown.do": + self.serve_shutdown(body) + elif self.path == "/stop-transcoder.do": + self.serve_stop_transcoder(body) + elif self.path == "/status.do": + self.serve_status(body) + elif self.path == "/version.do": + self.serve_version(body) + elif self.path == "/new_id.do": + self.serve_new_id(body) + elif self.path == "/get_log.do": + self.serve_get_log(body) + elif self.path == "/get_all_log.do": + self.serve_get_all_log(body) + elif self.path == "/stream.do": + self.serve_stream(body) + else: + action = self.query.get("action", None) + if action and "stream.do" in action: + self.serve_stream(body) + else: + self.send_error(404, "File not found") + # do_dispatch() + + + def do_GET(self): + self.do_dispatch(True) + # do_GET() + + + def do_HEAD(self): + self.do_dispatch(False) + # do_HEAD() + + + def _nav_items(self): + ret = "" + for name, url in self.menu.items(): + ret += utils.getHTML("menu", {"name": name, "url": url}) + + return ret + # _nav_items() + + def serve_main(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()})) + # serve_main() + + def serve_version(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("Version: %s" % __version__) + + + def serve_shutdown(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write(utils.getHTML("shutdown")) + self.server.server_close() + # serve_shutdown() + + + def serve_stop_all_transcoders(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.server.stop_transcoders() + self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()})) + # serve_stop_all_transcoders() + + + def serve_stop_selected_transcoders(self, body, tids=[]): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + opts = "" + if body: + transcoders = self.server.get_transcoders() + + for tid in tids: + for t, r in transcoders: + if t.tid == int(tid): + try: + t.stop() + except Exception, e: + self.log.info("Plugin already stopped") + + opts += utils._create_html_item("%s" % t) + + break + + self.wfile.write(utils.getHTML("stop_selected", + {"menu": self._nav_items(), + "opts": opts})) + # serve_stop_selected_transcoders() + + + def serve_stop_transcoder(self, body): + req = self.query.get("request", None) + tid = self.query.get("tid", None) + if req and "all" in req: + self.serve_stop_all_transcoders(body) + elif tid: + self.serve_stop_selected_transcoders(body, tid[0].split(";")) + else: + self.serve_status(body) + # serve_stop_transcoder() + + + def serve_status(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + stopone = "" + + if body: + tl = self.server.get_transcoders() + if not tl: + running = "

    No running transcoder.

    \n" + stopall = "" + stopone = "" + + elif self.query.get("tid", None): + req_tid = int(self.query.get("tid")[0]) + for transcoder, request in tl: + if transcoder.tid == req_tid: + self.wfile.write("Status: %s %%" % transcoder.status) + return True + + return False + + else: + running = "

    Running transcoders:

    \n" + stopall = utils._create_html_item("" + "[STOP ALL]" % + self.menu["Stop"]) + + for transcoder, request in tl: + stopone += utils._create_html_item("%s;" + "" + " [STOP] ") % ( + transcoder, self.menu["Stop"], transcoder.tid) + + self.wfile.write(utils.getHTML("status", + {"menu": self._nav_items(), + "running": running, + "stopall": stopall, + "stopone": stopone})) + # serve_status() + + + def _get_transcoder(self): + # get transcoder option: mencoder is the default + request_transcoders = self.query.get("transcoder", ["mencoder"]) + + for t in request_transcoders: + transcoder = self.transcoders.get(t) + if transcoder: + return transcoder + + if not transcoder: + return self.transcoders[self.def_transcoder] + # _get_transcoder() + + + def _get_new_id(self, tid): + self.server.last_tid = utils.create_tid(tid) + self.tid_queue.append(self.server.last_tid) + return self.server.last_tid + # _get_new_id() + + + def serve_new_id(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + + if body: + self.wfile.write("%s" % self._get_new_id(self.server.last_tid)) + # serve_new_id() + + def serve_get_log(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + + if body: + if self.query.get("tid", None): + tid = int(self.query.get("tid")[0]) + stat = self.transcoders_log.get_status(tid) + self.wfile.write("%s" % stat) + else: + stat = self.transcoders_log.get_status() + for rtid, status in stat.iteritems(): + self.wfile.write("%s: %s

    " % (rtid, status)) + # serve_get_log() + + def serve_get_all_log(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + + if body: + if self.query.get("tid", None): + tid = int(self.query.get("tid")[0]) + stat = self.transcoders_log.get_status(tid, True) + for status in stat: + self.wfile.write("%s

    " % status) + else: + stat = self.transcoders_log.get_status(None, True) + for rtid, history in stat.iteritems(): + for status in history: + self.wfile.write("%s: %s
    " % (rtid, status)) + self.wfile.write("

    ") + # serve_get_all_log() + + def serve_stream(self, body): + transcoder = self._get_transcoder() + try: + obj = transcoder(self.query) + except Exception, e: + self.send_error(500, str(e)) + return + + self.send_response(200) + self.send_header("Content-Type", obj.get_mimetype()) + self.send_header('Connection', 'close') + self.end_headers() + + if body: + test_tid = int(self.query.get("tid", "0")[0]) + if test_tid == 0 or test_tid not in self.tid_queue: + test_tid = self._get_new_id(self.server.last_tid) + + self.transcoders_log.insert(test_tid, "gms.%s" % obj.name) + obj.tid = test_tid + obj.log = self.transcoders_log + + self.server.add_transcoders(self, obj) + obj.start(self.wfile) + self.server.del_transcoders(self, obj) + # serve_stream() + + + def log_request(self, code='-', size='-'): + self.log.info('"%s" %s %s', self.requestline, str(code), str(size)) + # log_request() + + + def log_error(self, format, *args): + self.log.error("%s: %s" % (self.address_string(), format % args)) + # log_error() + + + def log_message(self, format, *args): + self.log.info("%s: %s" % (self.address_string(), format % args)) + # log_message() +# RequestHandler diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/server.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/lib/server.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.4" + +import os +import threading +import SocketServer +import BaseHTTPServer +import socket +import urlparse +import cgi +import lib.utils as utils +import logging + +from log import Log +from request_handler import RequestHandler + +__all__ = ("Server", "serve_forever", "load_plugins_transcoders") + +class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): + log = logging.getLogger("gms.server") + last_tid = 0 + run = True + _transcoders = {} + _lock = threading.RLock() + + def serve_forever(self): + self.log.info("GMyth-Streamer serving HTTP on %s:%s" % + self.socket.getsockname()) + try: + while self.run: + self.handle_request() + except KeyboardInterrupt, e: + pass + + self.log.debug("Stopping all remaining transcoders...") + self.stop_transcoders() + self.log.debug("Transcoders stopped!") + # serve_forever() + + + def get_request(self): + skt = self.socket + old = skt.gettimeout() + skt.settimeout(0.5) + while self.run: + try: + r = skt.accept() + skt.settimeout(old) + return r + except socket.timeout, e: + pass + raise socket.error("Not running") + # get_request() + + + def server_close(self): + self.run = False + self.stop_transcoders() + + BaseHTTPServer.HTTPServer.server_close(self) + # server_close() + + + def stop_transcoders(self): + self._lock.acquire() + for transcoder, request in self._transcoders.iteritems(): + self.log.info("Stop transcoder: %s, client=%s" % + (transcoder, request.client_address)) + transcoder.stop() + self._lock.release() + # stop_transcoders() + + + def get_transcoders(self): + self._lock.acquire() + try: + return self._transcoders.items() + finally: + self._lock.release() + # get_transcoders() + + + def add_transcoders(self, request, transcoder): + self._lock.acquire() + try: + self._transcoders[transcoder] = request + finally: + self._lock.release() + # add_transcoders() + + + def del_transcoders(self, request, transcoder): + self._lock.acquire() + try: + del self._transcoders[transcoder] + finally: + self._lock.release() + # del_transcoders() +# Server + + + +def serve_forever(host="0.0.0.0", port=40000): + addr = (host, port) + RequestHandler.protocol_version = "HTTP/1.0" + httpd = Server(addr, RequestHandler) + httpd.serve_forever() +# serve_forever() + + +def load_plugins_transcoders(directory): + RequestHandler.load_plugins_transcoders(directory) +# load_plugins_transcoders() diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/transcoder.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/lib/transcoder.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.4" + +__all__ = ("Transcoder") + +class Transcoder(object): + """Transcoder's Class: parent class to implement + a plugin for transcoding data.""" + priority = 0 # negative values have higher priorities + name = None # to be used in requests + status = None + log = None + tid = -1 + + def __init__(self, params): + self.params = params + # __init__() + + + def params_first(self, key, default=None): + if default is None: + return self.params[key][0] + else: + try: + return self.params[key][0] + except: + return default + # params_first() + + + def get_mimetype(self): + mux = self.params_first("mux", "mpg") + + if mux == "mpeg": + return "video/mpeg" + elif mux == "avi": + return "video/x-msvideo" + else: + return "application/octet-stream" + # get_mimetype() + + def start(self, outfile): + return True + # start() + + + def stop(self): + return True + # stop() + + def __str__(self): + return '%s: %s( params=%s ) - Status: %s%%' % \ + (self.__class__.__name__, self.tid, + self.params, self.status) + # __str__() +# Transcoder diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/lib/utils.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,188 @@ +#!/usr/bin/env + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.3" + +import os +import stat +import sys +import logging +import urllib +import gobject +import imp + +log = logging.getLogger("gms.utils") + +__all__ = ("which", "load_plugins", "PluginSet", "getHTML") + +def which(app): + """Function to implement which(1) unix command""" + pl = os.environ["PATH"].split(os.pathsep) + for p in pl: + path = os.path.join(p, app) + if os.path.isfile(path): + st = os.stat(path) + if st[stat.ST_MODE] & 0111: + return path + return "" +# which() + + +def _load_module(pathlist, name): + fp, path, desc = imp.find_module(name, pathlist) + try: + module = imp.load_module(name, fp, path, desc) + return module + finally: + if fp: + fp.close() +# _load_module() + + +class PluginSet(object): + def __init__(self, basetype, *items): + self.basetype = basetype + self.map = {} + self.list = [] + + for i in items: + self._add(i) + self._sort() + # __init__() + + + def _add(self, item): + self.map[item.name] = item + self.list.append(item) + # _add() + + + def add(self, item): + self._add() + self._sort() + # add() + + + def __getitem__(self, spec): + if isinstance(spec, basestring): + return self.map[spec] + else: + return self.list[spec] + # __getitem__() + + + def get(self, name, default=None): + return self.map.get(name, default) + # get() + + + def __iter__(self): + return self.list.__iter__() + # __iter__() + + + def __len__(self): + return len(self.list) + # __len__() + + + def _sort(self): + self.list.sort(lambda a, b: cmp(a.priority, b.priority)) + # _sort() + + + def update(self, pluginset): + self.map.update(pluginset.map) + self.list.extend(pluginset.list) + self._sort() + # update() + + + def load_from_directory(self, directory): + for i in load_plugins(directory, self.basetype): + self._add(i) + self._sort() + # load_from_directory() + + + def __str__(self): + lst = [] + for o in self.list: + lst.append('"%s" (%s)' % (o.name, o.__name__)) + + return "%s(basetype=%s, items=[%s])" % \ + (self.__class__.__name__, + self.basetype.__name__, + ", ".join(lst)) + # __str__() +# PluginSet + + +def load_plugins(directory, basetype): + """Function to load plugins from a given directory""" + tn = basetype.__name__ + log.debug("Loading plugins from %s, type=%s" % (directory, tn)) + + + plugins = [] + for d in os.listdir(directory): + if not d.endswith(".py"): + continue + + name = d[0: -3] + if name == "__init__": + continue + + directory.replace(os.path.sep, ".") + mod = _load_module([directory], name) + for sym in dir(mod): + cls = getattr(mod, sym) + if isinstance(cls, type) and issubclass(cls, basetype) and \ + cls != basetype: + plugins.append(cls) + log.info("Loaded %s (%s) from %s" % \ + (cls.__name__, tn, os.path.join(directory, d))) + + return plugins +# load_plugins() + +def getHTML(html_file, params={}): + """This function parses an html file with the given + parameters and returns a formated web-page""" + try: + filename = os.path.join(sys.path[0], "html", html_file + ".html") + html = open(filename).read() % params + return html + except Exception, e: + return "HTML format error. Wrong keys: %s" % e + +# getHTML + +def _create_html_item(opt): + """Create an
  • item using HTML.""" + return "
  • %s
  • \n" % opt +# _create_html_item + +def progress_bar(log, value, max, barsize): + """Creates and displays a progressbar. By OSantana""" + chars = int(value * barsize / float(max)) + percent = int((value / float(max)) * 100) + sys.stdout.write("#" * chars) + sys.stdout.write(" " * (barsize - chars + 2)) + if value >= max: + sys.stdout.write("done.\n\n") + else: + sys.stdout.write("[%3i%%]\r" % (percent)) + sys.stdout.flush() + return percent +# progress_bar() by osantana + +def create_tid(last_tid): + """Function to generate TIDs (ids for transcoders). + At first it just do +1 on last_tid but can be implemented + to generate more sparse TIDs""" + tid = last_tid + 1 + return tid +# create_id() diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/plugins/transcoders/gmencoder.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/plugins/transcoders/gmencoder.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,99 @@ +#!/usr/bin/env python + +__author__ = "Renato Filho" +__author_email__ = "renato.filho@indt.org.br" +__license__ = "GPL" +__version__ = "0.1" + +import os +import shlex +import signal +import subprocess +import time + +import select + +import lib.utils as utils +import lib.server as server +import lib.transcoder as transcoder + +__all__ = ("TranscoderGMencoder",) + +class TranscoderGMencoder(transcoder.Transcoder): + gmencoder_path = utils.which("gmencoder") + name = "gmencoder" + priority = -1 + proc = None + + def __init__(self, params): + transcoder.Transcoder.__init__(self, params) + self.opts = [] + # __init__() + + def _insert_param (self, name, value): + if (value != ""): + self.opts.append(name) + self.opts.append(value) + + def _parser_params (self): + self._insert_param("-i", \ + "%s://%s" % (self.params_first("type", "file"), self.params_first("uri", ""))) + self._insert_param("--video-encode", self.params_first("ve", "")) + self._insert_param("--video-opts", "bitrate=200,pass=2,quantizer=5") + self._insert_param("--video-fps", self.params_first("fps", "")) + self._insert_param("--video-width", self.params_first("width", "")) + self._insert_param("--video-height", self.params_first("height", "")) + self._insert_param("--audio-rate", "32000") + self._insert_param("--audio-encode", self.params_first("ae", "")) + # _parse_params + + def start(self, outfd): + self.opts.append (self.gmencoder_path) + self._parser_params () + self._insert_param ("-o", "fd://%d" % outfd.fileno()) + + cmd = " ".join(self.opts) + self.log.info ("GMencoder: %s", cmd) + + try: + self.proc = subprocess.Popen(self.opts, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + except Exception, e: + self.log.error(self.tid, "Error executing GMencoder: %s" % e) + return False + + try: + while (self.proc and self.proc.poll() == None): + r, w, x = select.select([self.proc.stdout], [], [], 0) + if self.proc.stdout in r: + progress = self.proc.stdout.readline() + self.log.info ("stdout %s" % progress) + if (progress.find ("PROGRESS") >= 0): + self.status = progress.split (":")[1] + #if (progress.find ("DONE") >= 0): + # break + self.log.info ("Process exit") + except Exception, e: + self.log.error(self.tid, "Problems handling data: %s" % e) + return False + + return True + # start() + + + def stop(self): + if self.proc: + self.log.info ("STOPED GMencoder plugin") + try: + self.proc.stdin.write ("QUIT\n") + except Exception, e: + pass + + try: + self.proc.wait() + except Exception, e: + pass + + self.proc = None + # stop() + +# TranscoderGMencoder diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/plugins/transcoders/mencoder.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/plugins/transcoders/mencoder.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,305 @@ +#!/usr/bin/env python + +__author__ = "Artur Duque de Souza" +__author_email__ = "artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.1" + +import os +import shlex +import signal +import subprocess +import time +import fcntl + +import lib.utils as utils +import lib.server as server +import plugins.transcoders.mencoder_lib.mythtv as mythtv + +from select import select +import lib.transcoder as transcoder + +__all__ = ("TranscoderMencoder",) + +class TranscoderMencoder(transcoder.Transcoder): + """Transcoder class that implements a transcoder using Mencoder""" + mencoder_path = utils.which("mencoder") + name = "mencoder" + priority = -1 + args = {} + proc = None + gmyth = None + + # only works with avi container + status = 0 + + def _setup_params(self): + params_first = self.params_first + + # general_opts + self.args["local"] = params_first("local", False) + self.args["language"] = params_first("language", False) + self.args["subtitle"] = params_first("subtitle", False) + self.args["format"] = params_first("format", "mpeg1") + self.args["outfile"] = params_first("outfile", "-") + + # input_opt + self.args["type"] = params_first("type", "file") + self.args["input"] = params_first("uri", "-") + + # audio_opts + self.args["acodec"] = params_first("acodec", "mp2") + self.args["abitrate"] = params_first("abitrate", 192) + self.args["volume"] = params_first("volume", 5) + + # video_opts + self.args["mux"] = params_first("mux", "mpeg") + self.args["fps"] = params_first("fps", 25) + self.args["vcodec"] = params_first("vcodec", "mpeg1video") + self.args["vbitrate"] = params_first("vbitrate", 400) + self.args["width"] = params_first("width", 320) + self.args["height"] = params_first("height", 240) + # _setup_params() + + + def _setup_audio(self): + if self.args["acodec"] == "mp3lame": + audio = "-oac mp3lame -lameopts cbr:br=%s vol=%s" % ( + self.args["abitrate"], self.args["volume"]) + else: + audio = "-oac lavc -lavcopts acodec=%s:abitrate=%s" % ( + self.args["acodec"], self.args["abitrate"]) + + return audio + # _setup_audio() + + + def _setup_video(self): + video = " -of %s" % self.args["mux"] + video += " -ofps %s" % self.args["fps"] + + vcodec = self.args["vcodec"] + if vcodec == "nuv" or vcodec == "xvid"\ + or vcodec == "qtvideo" or vcodec == "copy": + video += " -ovc %s" % vcodec + else: + video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % ( + vcodec, self.args["vbitrate"]) + + if self.args["mux"] == "mpeg": + video += " -mpegopts format=%s" % self.args["format"] + video += " -vf scale=%s:%s" % (self.args["width"], self.args["height"]) + + return video + # _setup_video() + + + def _arg_append(self, args, options): + for arg in shlex.split(options): + args.append(arg) + # arg_append() + + def _setup_mencoder_opts(self, args): + args.append(self.mencoder_path) + + if self.args["outfile"] == "-" and self.args["type"]: + args.append(self.args["input"]) + else: + args.append("-") + + if self.args["language"]: + self._arg_append(args, "-alang %s" % self.args["language"]) + + if self.args["subtitle"]: + self._arg_append(args, "-slang %s" % self.args["subtitle"]) + self._arg_append(args, "-subfps %s" % self.args["fps"]) + + self._arg_append(args, "-idx") + self._arg_append(args, "-cache 1024") + self._arg_append(args, self._setup_audio()) + self._arg_append(args, self._setup_video()) + + self._arg_append(args, "-really-quiet") + self._arg_append(args, "-o %s" % self.args["outfile"]) + self._arg_append(args, "2>%s" % os.devnull) + # _setup_args() + + def _setup_filename(self): + """This function setups the file to encode parsing the uri. + So, type can be: + * file + * dvd + * myth + + If the last one is detected we have to parse the uri to find args. + Then we store all the args inside a dictionary: self.args['gmyth-cat'] + """ + _type = self.args["type"] + + if _type == "file": + if not os.path.exists(self.args["input"]): + raise IOError,\ + "File requested does not exist: %s." % self.args["input"] + else: + self.args["input"] = "file://%s" % self.args["input"] + + elif _type == "dvd": + self.args["input"] = "dvd://".join(self.args["input"]) + + elif _type == "myth": + self.args["gmyth-cat"] = mythtv._setup_mythfilename(self) + # _setup_filename() + + + def __init__(self, params): + transcoder.Transcoder.__init__(self, params) + self.mencoder_opts = [] + + try: + self._setup_params() + self._setup_filename() + self._setup_mencoder_opts(self.mencoder_opts) + except Exception, e: + self.log.error(self.tid, e) + # __init__() + + + def _check_opened_file(self, stdw, _stdin): + loop = True + while loop: + try: + return open(self.args["outfile"]) + except: + os.write(stdw, _stdin.read(1024)) + # _check_opened_file + + + def _start_outfile(self, outfd): + finished = False + + # fix this (not necessary) + outfd.write("OK") + + # Configuring stdin + try: + _stdin = open(self.args["input"]) + size = int(os.path.getsize(self.args["input"])) + except Exception, e: + self.log.error(self.tid, "Mencoder stdin setup error: %s" % e) + return False + + self.status = 0 + total_read = 0 + + # Configuring pipes + stdr, stdw = os.pipe() + + if not self._run_mencoder(input=stdr): + return False + + stdout = self._check_opened_file(stdw, _stdin) + + try: + while self.proc and self.proc.poll() == None: + if not finished: + data_in = _stdin.read(4096) + if data_in != "": + os.write(stdw, data_in) + total_read += 4096 + d = stdout.read(4096) + self.status = utils.progress_bar(self.log, + int(total_read), + int(size), 50) + else: + finished = True + os.close(stdw) + + else: + d = stdout.read(4096) + + except Exception, e: + self.log.error(self.tid, "Problems handling data: %s" % e) + self.stop() + return False + + self.log.info(self.tid, "%s: Finished sending data to client" % repr(self)) + return True + # _start_outfile() + + def _start(self, outfd): + # Play a file on disk or DVD + if not self._run_mencoder(output=subprocess.PIPE): + return False + + try: + while self.proc and self.proc.poll() == None: + d = self.proc.stdout.read(1024) + outfd.write(d) + except Exception, e: + self.log.error(self.tid, "Problems handling data: %s" % e) + return False + + self.log.info(self.tid, "%s: Finished sending data to client" % repr(self)) + return True + # _start() + + def _run_mencoder(self, input=None, output=None): + try: + self.proc = subprocess.Popen(self.mencoder_opts, stdin=input, + stdout=output, close_fds=True) + except Exception, e: + self.log.error(self.tid, "Error executing mencoder: %s" % e) + return False + + return True + # _run_mencoder() + + def start(self, outfd): + cmd = " ".join(self.mencoder_opts) + self.log.debug(self.tid, "Plugin's tid: %s" % self.tid) + self.log.debug(self.tid, "Mencoder: %s" % cmd) + #fixme + + ret = False + + if self.args["outfile"] == "-" and \ + self.args["type"] in ["file", "dvd"]: + ret = self._start(outfd) + + elif self.args["type"] == "myth": + ret = mythtv.start_myth(self, outfd) + + else: + ret = self._start_outfile(outfd) + + self.stop() + + if not ret: + self.log.error(self.tid, "Problems while starting streaming.") + + return ret + # start() + + def _aux_stop(self, obj, next=False): + if obj: + try: + os.kill(obj.pid, signal.SIGKILL) + if next: + os.kill(obj.pid+1, signal.SIGKILL) + except OSError, e: + pass + + try: + obj.wait() + except Exception, e: + pass + + obj = None + # _aux_stop + + def stop(self): + self._aux_stop(self.proc, True) + self._aux_stop(self.gmyth) + # stop() + +# TranscoderMencoder diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/plugins/transcoders/mencoder_lib/mythtv.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.3/plugins/transcoders/mencoder_lib/mythtv.py Tue May 29 21:24:48 2007 +0100 @@ -0,0 +1,106 @@ +import os +import subprocess +import fcntl + +import lib.utils as utils +import lib.server as server + +from select import select + +def _setup_mythfilename(self): + # mythtv:mythtv@192.168.3.110:6543/1002_20070426230000.nuv + try: + _mysql = self.args["input"].split("@")[0].split(":") + except IndexError, e: + _mysql = ["mythtv", "mythtv"] + + try: + _args = self.args["input"].split("@")[1].split(":") + except IndexError, e: + _args = self.args["input"].split(":") + + gmyth_dict = {} + gmyth_dict["mysql"] = _mysql + gmyth_dict["backend"] = _args[0] + gmyth_dict["port"] = _args[1].split("/", 1)[0] + + _tmp_file = _args[1].split("/", 1)[1] + + if _tmp_file.find("channel") >= 0: + gmyth_dict["kind"] = "c" + gmyth_dict["cfile"] = _tmp_file.split("=")[1] + else: + gmyth_dict["kind"] = "f" + gmyth_dict["cfile"] = _tmp_file + + self.args["input"] = "-" + return gmyth_dict +# _setup_mythfilename + +def _setup_mythfile(err): + size = err.readline().split("Size:")[1] + flags = fcntl.fcntl (err, fcntl.F_GETFL, 0) | os.O_NONBLOCK + fcntl.fcntl(err, fcntl.F_SETFL, flags) + return size +# _setup_mythfile + +def _setup_gmythcat(self): + gmyth_cat = utils.which("gmyth-cat") + if self.args.has_key("gmyth-cat"): + return [ utils.which("gmyth-cat"), + "-h", self.args["gmyth-cat"]["backend"], + "-p", self.args["gmyth-cat"]["port"], + "-" + self.args["gmyth-cat"]["kind"], + self.args["gmyth-cat"]["cfile"] + ] + else: + self.log.error(self.tid, "URI error") + return [] +# _setup_gmythcat + +def start_myth(self, outfd): + opts = _setup_gmythcat(self) + try: + self.gmyth = subprocess.Popen(opts, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True) + except Exception, e: + self.log.error(self.tid, "Error executing gmyth-cat: %s" % e) + return False + + if not self._run_mencoder(input=self.gmyth.stdout, + output=subprocess.PIPE): + return False + + if self.args["gmyth-cat"]["kind"] == "f": + try: + size = _setup_mythfile(self.gmyth.stderr) + self.log.debug(self.tid, "Size of file: %s" % size) + except Exception, e: + self.log.error(self.tid, "Problems getting size of file: %s" % e) + return False + + try: + while self.proc and self.proc.poll() == None: + r, w, x = select([self.gmyth.stderr, self.proc.stdout], + [], [], 0) + if self.proc.stdout in r: + d = self.proc.stdout.read(4096) + outfd.write(d) + + if self.gmyth.stderr in r: + partial = self.gmyth.stderr.read(50).split("\n")[-2] + if partial != "": + self.status = utils.progress_bar(self.log, + int(partial), + int(size), 50) + + except IndexError, e: + pass + except Exception, e: + self.log.error(self.tid, "Problems handling data: %s" % e) + return False + + self.log.info(self.tid, "Finished sending data") + return True +# _start_myth()