# HG changeset patch
# User morphbr
# Date 1180470288 -3600
# Node ID 3fbcd3d9b2d12139dd57ae2b6ddc3d1fd01a0caf
# Parent 24db164804565a11e9c953aad0365a24005ca735
[svn r724] * GMyth-Streamer version 0.3 released
- Improved Log architecture;
- Creation of a history for the transcoder's actions
- Creation of an id for each transcoder instanciated
- Also wrapps default actions for python's default logger
- Created new functions to make use of this new Log architecture;
- serve_new_id
- serve_get_log
- serve_get_all_log
- _Lot_ of small bug fixes;
- Inserted header for all files;
- Splited files with too many lines (more than 1 class per file)
in more files;
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/gms.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/gms.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+
+__author__ = "Artur Duque de Souza"
+__author_email__ = "artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.3"
+__thanks__ = "Gustavo Sverzut Barbieri"
+
+import sys
+import os
+import logging as log
+from lib.server import serve_forever, load_plugins_transcoders
+
+log_level = log.INFO
+for p in sys.argv[1:]:
+ if p == "-v" or p == "--verbose":
+ log_level -= 10
+
+log.basicConfig(level=log_level,
+ format=("### %(asctime)s %(name)-18s \t%(levelname)-8s "
+ "\t%(message)s"),
+ datefmt="%Y-%m-%d %H:%M:%S")
+
+pd = os.path.join("plugins", "transcoders")
+load_plugins_transcoders(pd)
+serve_forever()
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/index.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/html/index.html Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,9 @@
+
+
GMyth-Streamer Server
+
+Welcome to GMyth-Streamer Server
+
+
+
\ No newline at end of file
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/menu.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/html/menu.html Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,1 @@
+%(name)s
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/shutdown.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/html/shutdown.html Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,6 @@
+
+ GMyth-Streamer Server Exited
+
+ GMyth-Streamer is not running anymore
+
+
\ No newline at end of file
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/status.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/html/status.html Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,14 @@
+
+ GMyth-Streamer Server Status
+
+ GMyth-Streamer Status
+
+ %(running)s
+ %(stopall)s
+ %(stopone)s
+
+
+
+
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/stop_all.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/html/stop_all.html Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,9 @@
+
+ GMyth-Streamer Server Stopped Transcoders
+
+ GMyth-Streamer stopped running transcoders
+
+
+
\ No newline at end of file
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/html/stop_selected.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/html/stop_selected.html Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,12 @@
+
+ GMyth-Streamer Server Stopped Transcoders
+
+ GMyth-Streamer stopped running transcoders:
+
+
+
+
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/log.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/lib/log.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+
+__author__ = "Artur Duque de Souza"
+__author_email__ = "artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.1"
+
+import os
+import logging
+
+__all__ = ("Log", "log_structure")
+
+class log_structure(object):
+ """Structure to hold log info."""
+
+ def __init__(self, log=None):
+ self.log = log
+ self.history = []
+ # __init__()
+
+# log_structure()
+
+class Log(object):
+ """This class implements a log where we can store status of
+ all transcoders (even from those that are not running any more)."""
+
+ ## key = tid
+ ## item = ls
+ logs = {}
+
+ def insert(self, tid, name):
+ """Insert a given tid on the log structure"""
+ if not self.logs.has_key(tid):
+ self.logs[tid] = log_structure(logging.getLogger(name))
+ return True
+ else:
+ return False
+ # insert()
+
+ def remove(self, tid=None):
+ """Cleans up all log stored for a
+ given tid or for all transcodes."""
+ if not tid:
+ self.logs = {}
+ else:
+ del(self.logs[tid])
+
+ return True
+ # clean()
+
+ def get_status(self, tid=None, all=False):
+ """Get the status of all transcoders or
+ of just one of them if it's given an tid."""
+ if not tid:
+ ret = {}
+ for tids, logs in self.logs.items():
+ if len(logs.history) > 0:
+ if not all:
+ ret[tids] = logs.history[-1]
+ else:
+ ret[tids] = logs.history
+ return ret
+ elif self.logs.has_key(tid) and len(self.logs[tid].history) > 0:
+ if not all:
+ return self.logs[tid].history[-1]
+ else:
+ return self.logs[tid].history
+
+ return False
+ # get_status()
+
+ def _update_status(self, tid=None, msg=""):
+ """Update the status of a given tid. Private method that
+ is only called inside error/info/debug wrappers"""
+ if msg != "":
+ self.logs[tid].history.append(msg)
+ return True
+ else:
+ return False
+ # update_status()
+
+ def error(self, tid, msg):
+ """Python's Log.error wrapper"""
+ if self.logs.has_key(tid):
+ self.logs[tid].log.error("%s" % msg)
+ return self._update_status(tid, msg)
+ else:
+ return False
+ # error()
+
+ def info(self, tid, msg):
+ """Python's Log.info wrapper"""
+ if self.logs.has_key(tid):
+ self.logs[tid].log.info("%s" % msg)
+ self._update_status(tid, msg)
+ return True
+ else:
+ return False
+ # info()
+
+ def debug(self, tid, msg):
+ """Python's Log.debug wrapper"""
+ if self.logs.has_key(tid):
+ self.logs[tid].log.debug("%s" % msg)
+ self._update_status(tid, msg)
+ return True
+ else:
+ return False
+ # debug()
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/request_handler.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/lib/request_handler.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+
+__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
+__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.4"
+
+import os
+import threading
+import SocketServer
+import BaseHTTPServer
+import socket
+import urlparse
+import cgi
+import lib.utils as utils
+import logging
+
+from log import Log
+import lib.transcoder as transcoder
+
+__all__ = ("RequestHandler")
+
+class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """Class that implements an HTTP request handler for our server."""
+ log = logging.getLogger("gms.request")
+ def_transcoder = None
+ transcoders = utils.PluginSet(transcoder.Transcoder)
+ transcoders_log = Log()
+ tid_queue = []
+
+ menu = {
+ "Log": "/get_log.do",
+ "Stop": "/stop-transcoder.do",
+ "Status": "/status.do",
+ "All Log": "/get_all_log.do",
+ "Version": "/version.do",
+ "Shutdown": "/shutdown.do"
+ }
+
+ @classmethod
+ def load_plugins_transcoders(cls, directory):
+ cls.transcoders.load_from_directory(directory)
+
+ if cls.def_transcoder is None and cls.transcoders:
+ cls.def_transcoder = cls.transcoders[0].name
+ # load_plugins_transcoders()
+
+
+ def do_dispatch(self, body):
+ self.url = self.path
+
+ pieces = urlparse.urlparse(self.path)
+ self.path = pieces[2]
+ self.query = cgi.parse_qs(pieces[4])
+
+ if self.path == "/":
+ self.serve_main(body)
+ elif self.path == "/shutdown.do":
+ self.serve_shutdown(body)
+ elif self.path == "/stop-transcoder.do":
+ self.serve_stop_transcoder(body)
+ elif self.path == "/status.do":
+ self.serve_status(body)
+ elif self.path == "/version.do":
+ self.serve_version(body)
+ elif self.path == "/new_id.do":
+ self.serve_new_id(body)
+ elif self.path == "/get_log.do":
+ self.serve_get_log(body)
+ elif self.path == "/get_all_log.do":
+ self.serve_get_all_log(body)
+ elif self.path == "/stream.do":
+ self.serve_stream(body)
+ else:
+ action = self.query.get("action", None)
+ if action and "stream.do" in action:
+ self.serve_stream(body)
+ else:
+ self.send_error(404, "File not found")
+ # do_dispatch()
+
+
+ def do_GET(self):
+ self.do_dispatch(True)
+ # do_GET()
+
+
+ def do_HEAD(self):
+ self.do_dispatch(False)
+ # do_HEAD()
+
+
+ def _nav_items(self):
+ ret = ""
+ for name, url in self.menu.items():
+ ret += utils.getHTML("menu", {"name": name, "url": url})
+
+ return ret
+ # _nav_items()
+
+ def serve_main(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
+ # serve_main()
+
+ def serve_version(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write("Version: %s" % __version__)
+
+
+ def serve_shutdown(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write(utils.getHTML("shutdown"))
+ self.server.server_close()
+ # serve_shutdown()
+
+
+ def serve_stop_all_transcoders(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.server.stop_transcoders()
+ self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
+ # serve_stop_all_transcoders()
+
+
+ def serve_stop_selected_transcoders(self, body, tids=[]):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ opts = ""
+ if body:
+ transcoders = self.server.get_transcoders()
+
+ for tid in tids:
+ for t, r in transcoders:
+ if t.tid == int(tid):
+ try:
+ t.stop()
+ except Exception, e:
+ self.log.info("Plugin already stopped")
+
+ opts += utils._create_html_item("%s" % t)
+
+ break
+
+ self.wfile.write(utils.getHTML("stop_selected",
+ {"menu": self._nav_items(),
+ "opts": opts}))
+ # serve_stop_selected_transcoders()
+
+
+ def serve_stop_transcoder(self, body):
+ req = self.query.get("request", None)
+ tid = self.query.get("tid", None)
+ if req and "all" in req:
+ self.serve_stop_all_transcoders(body)
+ elif tid:
+ self.serve_stop_selected_transcoders(body, tid[0].split(";"))
+ else:
+ self.serve_status(body)
+ # serve_stop_transcoder()
+
+
+ def serve_status(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ stopone = ""
+
+ if body:
+ tl = self.server.get_transcoders()
+ if not tl:
+ running = "No running transcoder.
\n"
+ stopall = ""
+ stopone = ""
+
+ elif self.query.get("tid", None):
+ req_tid = int(self.query.get("tid")[0])
+ for transcoder, request in tl:
+ if transcoder.tid == req_tid:
+ self.wfile.write("Status: %s %%" % transcoder.status)
+ return True
+
+ return False
+
+ else:
+ running = "Running transcoders:
\n"
+ stopall = utils._create_html_item(""
+ "[STOP ALL]" %
+ self.menu["Stop"])
+
+ for transcoder, request in tl:
+ stopone += utils._create_html_item("%s;"
+ ""
+ " [STOP] ") % (
+ transcoder, self.menu["Stop"], transcoder.tid)
+
+ self.wfile.write(utils.getHTML("status",
+ {"menu": self._nav_items(),
+ "running": running,
+ "stopall": stopall,
+ "stopone": stopone}))
+ # serve_status()
+
+
+ def _get_transcoder(self):
+ # get transcoder option: mencoder is the default
+ request_transcoders = self.query.get("transcoder", ["mencoder"])
+
+ for t in request_transcoders:
+ transcoder = self.transcoders.get(t)
+ if transcoder:
+ return transcoder
+
+ if not transcoder:
+ return self.transcoders[self.def_transcoder]
+ # _get_transcoder()
+
+
+ def _get_new_id(self, tid):
+ self.server.last_tid = utils.create_tid(tid)
+ self.tid_queue.append(self.server.last_tid)
+ return self.server.last_tid
+ # _get_new_id()
+
+
+ def serve_new_id(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+
+ if body:
+ self.wfile.write("%s" % self._get_new_id(self.server.last_tid))
+ # serve_new_id()
+
+ def serve_get_log(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+
+ if body:
+ if self.query.get("tid", None):
+ tid = int(self.query.get("tid")[0])
+ stat = self.transcoders_log.get_status(tid)
+ self.wfile.write("%s" % stat)
+ else:
+ stat = self.transcoders_log.get_status()
+ for rtid, status in stat.iteritems():
+ self.wfile.write("%s: %s
" % (rtid, status))
+ # serve_get_log()
+
+ def serve_get_all_log(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+
+ if body:
+ if self.query.get("tid", None):
+ tid = int(self.query.get("tid")[0])
+ stat = self.transcoders_log.get_status(tid, True)
+ for status in stat:
+ self.wfile.write("%s
" % status)
+ else:
+ stat = self.transcoders_log.get_status(None, True)
+ for rtid, history in stat.iteritems():
+ for status in history:
+ self.wfile.write("%s: %s
" % (rtid, status))
+ self.wfile.write("
")
+ # serve_get_all_log()
+
+ def serve_stream(self, body):
+ transcoder = self._get_transcoder()
+ try:
+ obj = transcoder(self.query)
+ except Exception, e:
+ self.send_error(500, str(e))
+ return
+
+ self.send_response(200)
+ self.send_header("Content-Type", obj.get_mimetype())
+ self.send_header('Connection', 'close')
+ self.end_headers()
+
+ if body:
+ test_tid = int(self.query.get("tid", "0")[0])
+ if test_tid == 0 or test_tid not in self.tid_queue:
+ test_tid = self._get_new_id(self.server.last_tid)
+
+ self.transcoders_log.insert(test_tid, "gms.%s" % obj.name)
+ obj.tid = test_tid
+ obj.log = self.transcoders_log
+
+ self.server.add_transcoders(self, obj)
+ obj.start(self.wfile)
+ self.server.del_transcoders(self, obj)
+ # serve_stream()
+
+
+ def log_request(self, code='-', size='-'):
+ self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
+ # log_request()
+
+
+ def log_error(self, format, *args):
+ self.log.error("%s: %s" % (self.address_string(), format % args))
+ # log_error()
+
+
+ def log_message(self, format, *args):
+ self.log.info("%s: %s" % (self.address_string(), format % args))
+ # log_message()
+# RequestHandler
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/server.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/lib/server.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
+__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.4"
+
+import os
+import threading
+import SocketServer
+import BaseHTTPServer
+import socket
+import urlparse
+import cgi
+import lib.utils as utils
+import logging
+
+from log import Log
+from request_handler import RequestHandler
+
+__all__ = ("Server", "serve_forever", "load_plugins_transcoders")
+
+class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
+ log = logging.getLogger("gms.server")
+ last_tid = 0
+ run = True
+ _transcoders = {}
+ _lock = threading.RLock()
+
+ def serve_forever(self):
+ self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
+ self.socket.getsockname())
+ try:
+ while self.run:
+ self.handle_request()
+ except KeyboardInterrupt, e:
+ pass
+
+ self.log.debug("Stopping all remaining transcoders...")
+ self.stop_transcoders()
+ self.log.debug("Transcoders stopped!")
+ # serve_forever()
+
+
+ def get_request(self):
+ skt = self.socket
+ old = skt.gettimeout()
+ skt.settimeout(0.5)
+ while self.run:
+ try:
+ r = skt.accept()
+ skt.settimeout(old)
+ return r
+ except socket.timeout, e:
+ pass
+ raise socket.error("Not running")
+ # get_request()
+
+
+ def server_close(self):
+ self.run = False
+ self.stop_transcoders()
+
+ BaseHTTPServer.HTTPServer.server_close(self)
+ # server_close()
+
+
+ def stop_transcoders(self):
+ self._lock.acquire()
+ for transcoder, request in self._transcoders.iteritems():
+ self.log.info("Stop transcoder: %s, client=%s" %
+ (transcoder, request.client_address))
+ transcoder.stop()
+ self._lock.release()
+ # stop_transcoders()
+
+
+ def get_transcoders(self):
+ self._lock.acquire()
+ try:
+ return self._transcoders.items()
+ finally:
+ self._lock.release()
+ # get_transcoders()
+
+
+ def add_transcoders(self, request, transcoder):
+ self._lock.acquire()
+ try:
+ self._transcoders[transcoder] = request
+ finally:
+ self._lock.release()
+ # add_transcoders()
+
+
+ def del_transcoders(self, request, transcoder):
+ self._lock.acquire()
+ try:
+ del self._transcoders[transcoder]
+ finally:
+ self._lock.release()
+ # del_transcoders()
+# Server
+
+
+
+def serve_forever(host="0.0.0.0", port=40000):
+ addr = (host, port)
+ RequestHandler.protocol_version = "HTTP/1.0"
+ httpd = Server(addr, RequestHandler)
+ httpd.serve_forever()
+# serve_forever()
+
+
+def load_plugins_transcoders(directory):
+ RequestHandler.load_plugins_transcoders(directory)
+# load_plugins_transcoders()
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/transcoder.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/lib/transcoder.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+
+__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
+__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.4"
+
+__all__ = ("Transcoder")
+
+class Transcoder(object):
+ """Transcoder's Class: parent class to implement
+ a plugin for transcoding data."""
+ priority = 0 # negative values have higher priorities
+ name = None # to be used in requests
+ status = None
+ log = None
+ tid = -1
+
+ def __init__(self, params):
+ self.params = params
+ # __init__()
+
+
+ def params_first(self, key, default=None):
+ if default is None:
+ return self.params[key][0]
+ else:
+ try:
+ return self.params[key][0]
+ except:
+ return default
+ # params_first()
+
+
+ def get_mimetype(self):
+ mux = self.params_first("mux", "mpg")
+
+ if mux == "mpeg":
+ return "video/mpeg"
+ elif mux == "avi":
+ return "video/x-msvideo"
+ else:
+ return "application/octet-stream"
+ # get_mimetype()
+
+ def start(self, outfile):
+ return True
+ # start()
+
+
+ def stop(self):
+ return True
+ # stop()
+
+ def __str__(self):
+ return '%s: %s( params=%s ) - Status: %s%%' % \
+ (self.__class__.__name__, self.tid,
+ self.params, self.status)
+ # __str__()
+# Transcoder
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/lib/utils.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/lib/utils.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,188 @@
+#!/usr/bin/env
+
+__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
+__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.3"
+
+import os
+import stat
+import sys
+import logging
+import urllib
+import gobject
+import imp
+
+log = logging.getLogger("gms.utils")
+
+__all__ = ("which", "load_plugins", "PluginSet", "getHTML")
+
+def which(app):
+ """Function to implement which(1) unix command"""
+ pl = os.environ["PATH"].split(os.pathsep)
+ for p in pl:
+ path = os.path.join(p, app)
+ if os.path.isfile(path):
+ st = os.stat(path)
+ if st[stat.ST_MODE] & 0111:
+ return path
+ return ""
+# which()
+
+
+def _load_module(pathlist, name):
+ fp, path, desc = imp.find_module(name, pathlist)
+ try:
+ module = imp.load_module(name, fp, path, desc)
+ return module
+ finally:
+ if fp:
+ fp.close()
+# _load_module()
+
+
+class PluginSet(object):
+ def __init__(self, basetype, *items):
+ self.basetype = basetype
+ self.map = {}
+ self.list = []
+
+ for i in items:
+ self._add(i)
+ self._sort()
+ # __init__()
+
+
+ def _add(self, item):
+ self.map[item.name] = item
+ self.list.append(item)
+ # _add()
+
+
+ def add(self, item):
+ self._add()
+ self._sort()
+ # add()
+
+
+ def __getitem__(self, spec):
+ if isinstance(spec, basestring):
+ return self.map[spec]
+ else:
+ return self.list[spec]
+ # __getitem__()
+
+
+ def get(self, name, default=None):
+ return self.map.get(name, default)
+ # get()
+
+
+ def __iter__(self):
+ return self.list.__iter__()
+ # __iter__()
+
+
+ def __len__(self):
+ return len(self.list)
+ # __len__()
+
+
+ def _sort(self):
+ self.list.sort(lambda a, b: cmp(a.priority, b.priority))
+ # _sort()
+
+
+ def update(self, pluginset):
+ self.map.update(pluginset.map)
+ self.list.extend(pluginset.list)
+ self._sort()
+ # update()
+
+
+ def load_from_directory(self, directory):
+ for i in load_plugins(directory, self.basetype):
+ self._add(i)
+ self._sort()
+ # load_from_directory()
+
+
+ def __str__(self):
+ lst = []
+ for o in self.list:
+ lst.append('"%s" (%s)' % (o.name, o.__name__))
+
+ return "%s(basetype=%s, items=[%s])" % \
+ (self.__class__.__name__,
+ self.basetype.__name__,
+ ", ".join(lst))
+ # __str__()
+# PluginSet
+
+
+def load_plugins(directory, basetype):
+ """Function to load plugins from a given directory"""
+ tn = basetype.__name__
+ log.debug("Loading plugins from %s, type=%s" % (directory, tn))
+
+
+ plugins = []
+ for d in os.listdir(directory):
+ if not d.endswith(".py"):
+ continue
+
+ name = d[0: -3]
+ if name == "__init__":
+ continue
+
+ directory.replace(os.path.sep, ".")
+ mod = _load_module([directory], name)
+ for sym in dir(mod):
+ cls = getattr(mod, sym)
+ if isinstance(cls, type) and issubclass(cls, basetype) and \
+ cls != basetype:
+ plugins.append(cls)
+ log.info("Loaded %s (%s) from %s" % \
+ (cls.__name__, tn, os.path.join(directory, d)))
+
+ return plugins
+# load_plugins()
+
+def getHTML(html_file, params={}):
+ """This function parses an html file with the given
+ parameters and returns a formated web-page"""
+ try:
+ filename = os.path.join(sys.path[0], "html", html_file + ".html")
+ html = open(filename).read() % params
+ return html
+ except Exception, e:
+ return "HTML format error. Wrong keys: %s" % e
+
+# getHTML
+
+def _create_html_item(opt):
+ """Create an item using HTML."""
+ return "%s\n" % opt
+# _create_html_item
+
+def progress_bar(log, value, max, barsize):
+ """Creates and displays a progressbar. By OSantana"""
+ chars = int(value * barsize / float(max))
+ percent = int((value / float(max)) * 100)
+ sys.stdout.write("#" * chars)
+ sys.stdout.write(" " * (barsize - chars + 2))
+ if value >= max:
+ sys.stdout.write("done.\n\n")
+ else:
+ sys.stdout.write("[%3i%%]\r" % (percent))
+ sys.stdout.flush()
+ return percent
+# progress_bar() by osantana
+
+def create_tid(last_tid):
+ """Function to generate TIDs (ids for transcoders).
+ At first it just do +1 on last_tid but can be implemented
+ to generate more sparse TIDs"""
+ tid = last_tid + 1
+ return tid
+# create_id()
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/plugins/transcoders/gmencoder.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/plugins/transcoders/gmencoder.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+
+__author__ = "Renato Filho"
+__author_email__ = "renato.filho@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.1"
+
+import os
+import shlex
+import signal
+import subprocess
+import time
+
+import select
+
+import lib.utils as utils
+import lib.server as server
+import lib.transcoder as transcoder
+
+__all__ = ("TranscoderGMencoder",)
+
+class TranscoderGMencoder(transcoder.Transcoder):
+ gmencoder_path = utils.which("gmencoder")
+ name = "gmencoder"
+ priority = -1
+ proc = None
+
+ def __init__(self, params):
+ transcoder.Transcoder.__init__(self, params)
+ self.opts = []
+ # __init__()
+
+ def _insert_param (self, name, value):
+ if (value != ""):
+ self.opts.append(name)
+ self.opts.append(value)
+
+ def _parser_params (self):
+ self._insert_param("-i", \
+ "%s://%s" % (self.params_first("type", "file"), self.params_first("uri", "")))
+ self._insert_param("--video-encode", self.params_first("ve", ""))
+ self._insert_param("--video-opts", "bitrate=200,pass=2,quantizer=5")
+ self._insert_param("--video-fps", self.params_first("fps", ""))
+ self._insert_param("--video-width", self.params_first("width", ""))
+ self._insert_param("--video-height", self.params_first("height", ""))
+ self._insert_param("--audio-rate", "32000")
+ self._insert_param("--audio-encode", self.params_first("ae", ""))
+ # _parse_params
+
+ def start(self, outfd):
+ self.opts.append (self.gmencoder_path)
+ self._parser_params ()
+ self._insert_param ("-o", "fd://%d" % outfd.fileno())
+
+ cmd = " ".join(self.opts)
+ self.log.info ("GMencoder: %s", cmd)
+
+ try:
+ self.proc = subprocess.Popen(self.opts, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ except Exception, e:
+ self.log.error(self.tid, "Error executing GMencoder: %s" % e)
+ return False
+
+ try:
+ while (self.proc and self.proc.poll() == None):
+ r, w, x = select.select([self.proc.stdout], [], [], 0)
+ if self.proc.stdout in r:
+ progress = self.proc.stdout.readline()
+ self.log.info ("stdout %s" % progress)
+ if (progress.find ("PROGRESS") >= 0):
+ self.status = progress.split (":")[1]
+ #if (progress.find ("DONE") >= 0):
+ # break
+ self.log.info ("Process exit")
+ except Exception, e:
+ self.log.error(self.tid, "Problems handling data: %s" % e)
+ return False
+
+ return True
+ # start()
+
+
+ def stop(self):
+ if self.proc:
+ self.log.info ("STOPED GMencoder plugin")
+ try:
+ self.proc.stdin.write ("QUIT\n")
+ except Exception, e:
+ pass
+
+ try:
+ self.proc.wait()
+ except Exception, e:
+ pass
+
+ self.proc = None
+ # stop()
+
+# TranscoderGMencoder
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/plugins/transcoders/mencoder.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/plugins/transcoders/mencoder.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,305 @@
+#!/usr/bin/env python
+
+__author__ = "Artur Duque de Souza"
+__author_email__ = "artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.1"
+
+import os
+import shlex
+import signal
+import subprocess
+import time
+import fcntl
+
+import lib.utils as utils
+import lib.server as server
+import plugins.transcoders.mencoder_lib.mythtv as mythtv
+
+from select import select
+import lib.transcoder as transcoder
+
+__all__ = ("TranscoderMencoder",)
+
+class TranscoderMencoder(transcoder.Transcoder):
+ """Transcoder class that implements a transcoder using Mencoder"""
+ mencoder_path = utils.which("mencoder")
+ name = "mencoder"
+ priority = -1
+ args = {}
+ proc = None
+ gmyth = None
+
+ # only works with avi container
+ status = 0
+
+ def _setup_params(self):
+ params_first = self.params_first
+
+ # general_opts
+ self.args["local"] = params_first("local", False)
+ self.args["language"] = params_first("language", False)
+ self.args["subtitle"] = params_first("subtitle", False)
+ self.args["format"] = params_first("format", "mpeg1")
+ self.args["outfile"] = params_first("outfile", "-")
+
+ # input_opt
+ self.args["type"] = params_first("type", "file")
+ self.args["input"] = params_first("uri", "-")
+
+ # audio_opts
+ self.args["acodec"] = params_first("acodec", "mp2")
+ self.args["abitrate"] = params_first("abitrate", 192)
+ self.args["volume"] = params_first("volume", 5)
+
+ # video_opts
+ self.args["mux"] = params_first("mux", "mpeg")
+ self.args["fps"] = params_first("fps", 25)
+ self.args["vcodec"] = params_first("vcodec", "mpeg1video")
+ self.args["vbitrate"] = params_first("vbitrate", 400)
+ self.args["width"] = params_first("width", 320)
+ self.args["height"] = params_first("height", 240)
+ # _setup_params()
+
+
+ def _setup_audio(self):
+ if self.args["acodec"] == "mp3lame":
+ audio = "-oac mp3lame -lameopts cbr:br=%s vol=%s" % (
+ self.args["abitrate"], self.args["volume"])
+ else:
+ audio = "-oac lavc -lavcopts acodec=%s:abitrate=%s" % (
+ self.args["acodec"], self.args["abitrate"])
+
+ return audio
+ # _setup_audio()
+
+
+ def _setup_video(self):
+ video = " -of %s" % self.args["mux"]
+ video += " -ofps %s" % self.args["fps"]
+
+ vcodec = self.args["vcodec"]
+ if vcodec == "nuv" or vcodec == "xvid"\
+ or vcodec == "qtvideo" or vcodec == "copy":
+ video += " -ovc %s" % vcodec
+ else:
+ video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % (
+ vcodec, self.args["vbitrate"])
+
+ if self.args["mux"] == "mpeg":
+ video += " -mpegopts format=%s" % self.args["format"]
+ video += " -vf scale=%s:%s" % (self.args["width"], self.args["height"])
+
+ return video
+ # _setup_video()
+
+
+ def _arg_append(self, args, options):
+ for arg in shlex.split(options):
+ args.append(arg)
+ # arg_append()
+
+ def _setup_mencoder_opts(self, args):
+ args.append(self.mencoder_path)
+
+ if self.args["outfile"] == "-" and self.args["type"]:
+ args.append(self.args["input"])
+ else:
+ args.append("-")
+
+ if self.args["language"]:
+ self._arg_append(args, "-alang %s" % self.args["language"])
+
+ if self.args["subtitle"]:
+ self._arg_append(args, "-slang %s" % self.args["subtitle"])
+ self._arg_append(args, "-subfps %s" % self.args["fps"])
+
+ self._arg_append(args, "-idx")
+ self._arg_append(args, "-cache 1024")
+ self._arg_append(args, self._setup_audio())
+ self._arg_append(args, self._setup_video())
+
+ self._arg_append(args, "-really-quiet")
+ self._arg_append(args, "-o %s" % self.args["outfile"])
+ self._arg_append(args, "2>%s" % os.devnull)
+ # _setup_args()
+
+ def _setup_filename(self):
+ """This function setups the file to encode parsing the uri.
+ So, type can be:
+ * file
+ * dvd
+ * myth
+
+ If the last one is detected we have to parse the uri to find args.
+ Then we store all the args inside a dictionary: self.args['gmyth-cat']
+ """
+ _type = self.args["type"]
+
+ if _type == "file":
+ if not os.path.exists(self.args["input"]):
+ raise IOError,\
+ "File requested does not exist: %s." % self.args["input"]
+ else:
+ self.args["input"] = "file://%s" % self.args["input"]
+
+ elif _type == "dvd":
+ self.args["input"] = "dvd://".join(self.args["input"])
+
+ elif _type == "myth":
+ self.args["gmyth-cat"] = mythtv._setup_mythfilename(self)
+ # _setup_filename()
+
+
+ def __init__(self, params):
+ transcoder.Transcoder.__init__(self, params)
+ self.mencoder_opts = []
+
+ try:
+ self._setup_params()
+ self._setup_filename()
+ self._setup_mencoder_opts(self.mencoder_opts)
+ except Exception, e:
+ self.log.error(self.tid, e)
+ # __init__()
+
+
+ def _check_opened_file(self, stdw, _stdin):
+ loop = True
+ while loop:
+ try:
+ return open(self.args["outfile"])
+ except:
+ os.write(stdw, _stdin.read(1024))
+ # _check_opened_file
+
+
+ def _start_outfile(self, outfd):
+ finished = False
+
+ # fix this (not necessary)
+ outfd.write("OK")
+
+ # Configuring stdin
+ try:
+ _stdin = open(self.args["input"])
+ size = int(os.path.getsize(self.args["input"]))
+ except Exception, e:
+ self.log.error(self.tid, "Mencoder stdin setup error: %s" % e)
+ return False
+
+ self.status = 0
+ total_read = 0
+
+ # Configuring pipes
+ stdr, stdw = os.pipe()
+
+ if not self._run_mencoder(input=stdr):
+ return False
+
+ stdout = self._check_opened_file(stdw, _stdin)
+
+ try:
+ while self.proc and self.proc.poll() == None:
+ if not finished:
+ data_in = _stdin.read(4096)
+ if data_in != "":
+ os.write(stdw, data_in)
+ total_read += 4096
+ d = stdout.read(4096)
+ self.status = utils.progress_bar(self.log,
+ int(total_read),
+ int(size), 50)
+ else:
+ finished = True
+ os.close(stdw)
+
+ else:
+ d = stdout.read(4096)
+
+ except Exception, e:
+ self.log.error(self.tid, "Problems handling data: %s" % e)
+ self.stop()
+ return False
+
+ self.log.info(self.tid, "%s: Finished sending data to client" % repr(self))
+ return True
+ # _start_outfile()
+
+ def _start(self, outfd):
+ # Play a file on disk or DVD
+ if not self._run_mencoder(output=subprocess.PIPE):
+ return False
+
+ try:
+ while self.proc and self.proc.poll() == None:
+ d = self.proc.stdout.read(1024)
+ outfd.write(d)
+ except Exception, e:
+ self.log.error(self.tid, "Problems handling data: %s" % e)
+ return False
+
+ self.log.info(self.tid, "%s: Finished sending data to client" % repr(self))
+ return True
+ # _start()
+
+ def _run_mencoder(self, input=None, output=None):
+ try:
+ self.proc = subprocess.Popen(self.mencoder_opts, stdin=input,
+ stdout=output, close_fds=True)
+ except Exception, e:
+ self.log.error(self.tid, "Error executing mencoder: %s" % e)
+ return False
+
+ return True
+ # _run_mencoder()
+
+ def start(self, outfd):
+ cmd = " ".join(self.mencoder_opts)
+ self.log.debug(self.tid, "Plugin's tid: %s" % self.tid)
+ self.log.debug(self.tid, "Mencoder: %s" % cmd)
+ #fixme
+
+ ret = False
+
+ if self.args["outfile"] == "-" and \
+ self.args["type"] in ["file", "dvd"]:
+ ret = self._start(outfd)
+
+ elif self.args["type"] == "myth":
+ ret = mythtv.start_myth(self, outfd)
+
+ else:
+ ret = self._start_outfile(outfd)
+
+ self.stop()
+
+ if not ret:
+ self.log.error(self.tid, "Problems while starting streaming.")
+
+ return ret
+ # start()
+
+ def _aux_stop(self, obj, next=False):
+ if obj:
+ try:
+ os.kill(obj.pid, signal.SIGKILL)
+ if next:
+ os.kill(obj.pid+1, signal.SIGKILL)
+ except OSError, e:
+ pass
+
+ try:
+ obj.wait()
+ except Exception, e:
+ pass
+
+ obj = None
+ # _aux_stop
+
+ def stop(self):
+ self._aux_stop(self.proc, True)
+ self._aux_stop(self.gmyth)
+ # stop()
+
+# TranscoderMencoder
diff -r 24db16480456 -r 3fbcd3d9b2d1 gmyth-stream/server/0.3/plugins/transcoders/mencoder_lib/mythtv.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.3/plugins/transcoders/mencoder_lib/mythtv.py Tue May 29 21:24:48 2007 +0100
@@ -0,0 +1,106 @@
+import os
+import subprocess
+import fcntl
+
+import lib.utils as utils
+import lib.server as server
+
+from select import select
+
+def _setup_mythfilename(self):
+ # mythtv:mythtv@192.168.3.110:6543/1002_20070426230000.nuv
+ try:
+ _mysql = self.args["input"].split("@")[0].split(":")
+ except IndexError, e:
+ _mysql = ["mythtv", "mythtv"]
+
+ try:
+ _args = self.args["input"].split("@")[1].split(":")
+ except IndexError, e:
+ _args = self.args["input"].split(":")
+
+ gmyth_dict = {}
+ gmyth_dict["mysql"] = _mysql
+ gmyth_dict["backend"] = _args[0]
+ gmyth_dict["port"] = _args[1].split("/", 1)[0]
+
+ _tmp_file = _args[1].split("/", 1)[1]
+
+ if _tmp_file.find("channel") >= 0:
+ gmyth_dict["kind"] = "c"
+ gmyth_dict["cfile"] = _tmp_file.split("=")[1]
+ else:
+ gmyth_dict["kind"] = "f"
+ gmyth_dict["cfile"] = _tmp_file
+
+ self.args["input"] = "-"
+ return gmyth_dict
+# _setup_mythfilename
+
+def _setup_mythfile(err):
+ size = err.readline().split("Size:")[1]
+ flags = fcntl.fcntl (err, fcntl.F_GETFL, 0) | os.O_NONBLOCK
+ fcntl.fcntl(err, fcntl.F_SETFL, flags)
+ return size
+# _setup_mythfile
+
+def _setup_gmythcat(self):
+ gmyth_cat = utils.which("gmyth-cat")
+ if self.args.has_key("gmyth-cat"):
+ return [ utils.which("gmyth-cat"),
+ "-h", self.args["gmyth-cat"]["backend"],
+ "-p", self.args["gmyth-cat"]["port"],
+ "-" + self.args["gmyth-cat"]["kind"],
+ self.args["gmyth-cat"]["cfile"]
+ ]
+ else:
+ self.log.error(self.tid, "URI error")
+ return []
+# _setup_gmythcat
+
+def start_myth(self, outfd):
+ opts = _setup_gmythcat(self)
+ try:
+ self.gmyth = subprocess.Popen(opts, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=True)
+ except Exception, e:
+ self.log.error(self.tid, "Error executing gmyth-cat: %s" % e)
+ return False
+
+ if not self._run_mencoder(input=self.gmyth.stdout,
+ output=subprocess.PIPE):
+ return False
+
+ if self.args["gmyth-cat"]["kind"] == "f":
+ try:
+ size = _setup_mythfile(self.gmyth.stderr)
+ self.log.debug(self.tid, "Size of file: %s" % size)
+ except Exception, e:
+ self.log.error(self.tid, "Problems getting size of file: %s" % e)
+ return False
+
+ try:
+ while self.proc and self.proc.poll() == None:
+ r, w, x = select([self.gmyth.stderr, self.proc.stdout],
+ [], [], 0)
+ if self.proc.stdout in r:
+ d = self.proc.stdout.read(4096)
+ outfd.write(d)
+
+ if self.gmyth.stderr in r:
+ partial = self.gmyth.stderr.read(50).split("\n")[-2]
+ if partial != "":
+ self.status = utils.progress_bar(self.log,
+ int(partial),
+ int(size), 50)
+
+ except IndexError, e:
+ pass
+ except Exception, e:
+ self.log.error(self.tid, "Problems handling data: %s" % e)
+ return False
+
+ self.log.info(self.tid, "Finished sending data")
+ return True
+# _start_myth()