morphbr@565: #!/usr/bin/env python morphbr@565: morphbr@565: __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" morphbr@565: __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" morphbr@565: __license__ = "GPL" morphbr@565: __version__ = "0.3" morphbr@565: morphbr@565: import os morphbr@565: import threading morphbr@565: import SocketServer morphbr@565: import BaseHTTPServer morphbr@565: import socket morphbr@565: import urlparse morphbr@565: import cgi morphbr@565: import lib.utils as utils morphbr@565: import logging as log morphbr@565: morphbr@565: __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever", morphbr@565: "load_plugins_transcoders") morphbr@565: morphbr@565: class Transcoder(object): morphbr@577: log = log.getLogger("gms.transcoder") morphbr@565: priority = 0 # negative values have higher priorities morphbr@565: name = None # to be used in requests morphbr@595: status = None morphbr@565: morphbr@565: def __init__(self, params): morphbr@565: self.params = params morphbr@565: # __init__() morphbr@565: morphbr@565: morphbr@565: def params_first(self, key, default=None): morphbr@565: if default is None: morphbr@565: return self.params[key][0] morphbr@565: else: morphbr@565: try: morphbr@565: return self.params[key][0] morphbr@565: except: morphbr@565: return default morphbr@565: # params_first() morphbr@565: morphbr@565: morphbr@565: def get_mimetype(self): morphbr@565: mux = self.params_first("mux", "mpg") morphbr@565: morphbr@565: if mux == "mpeg": morphbr@565: return "video/mpeg" morphbr@565: elif mux == "avi": morphbr@565: return "video/x-msvideo" morphbr@565: else: morphbr@565: return "application/octet-stream" morphbr@565: # get_mimetype() morphbr@565: morphbr@565: morphbr@565: def start(self, outfile): morphbr@565: return True morphbr@565: # start() morphbr@565: morphbr@565: morphbr@565: def stop(self): morphbr@565: return Tru morphbr@565: # stop() morphbr@565: morphbr@565: morphbr@565: def __str__(self): morphbr@653: return '%s( params=%s )' % \ morphbr@565: (self.__class__.__name__, morphbr@653: self.params) morphbr@565: # __str__() morphbr@565: # Transcoder morphbr@565: morphbr@565: morphbr@565: morphbr@565: class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): morphbr@577: log = log.getLogger("gms.request") morphbr@565: def_transcoder = None morphbr@565: transcoders = utils.PluginSet(Transcoder) morphbr@565: morphbr@585: menu = { morphbr@638: "Stop": "/stop-transcoder.do", morphbr@585: "Status": "/status.do", morphbr@638: "Version": "/version.do", morphbr@585: "Shutdown": "/shutdown.do" morphbr@585: } morphbr@585: morphbr@565: @classmethod morphbr@565: def load_plugins_transcoders(cls, directory): morphbr@565: cls.transcoders.load_from_directory(directory) morphbr@565: morphbr@565: if cls.def_transcoder is None and cls.transcoders: morphbr@565: cls.def_transcoder = cls.transcoders[0].name morphbr@565: # load_plugins_transcoders() morphbr@565: morphbr@565: morphbr@565: def do_dispatch(self, body): morphbr@565: self.url = self.path morphbr@565: morphbr@565: pieces = urlparse.urlparse(self.path) morphbr@565: self.path = pieces[2] morphbr@565: self.query = cgi.parse_qs(pieces[4]) morphbr@565: morphbr@565: if self.path == "/": morphbr@565: self.serve_main(body) morphbr@565: elif self.path == "/shutdown.do": morphbr@565: self.serve_shutdown(body) morphbr@565: elif self.path == "/stop-transcoder.do": morphbr@565: self.serve_stop_transcoder(body) morphbr@565: elif self.path == "/status.do": morphbr@565: self.serve_status(body) morphbr@638: elif self.path == "/version.do": morphbr@638: self.serve_version(body) morphbr@565: elif self.path == "/stream.do": morphbr@565: self.serve_stream(body) morphbr@565: else: morphbr@628: action = self.query.get("action", None) morphbr@688: if "stream.do" in action: morphbr@628: self.serve_stream(body) morphbr@628: else: morphbr@628: self.send_error(404, "File not found") morphbr@565: # do_dispatch() morphbr@565: morphbr@565: morphbr@565: def do_GET(self): morphbr@565: self.do_dispatch(True) morphbr@565: # do_GET() morphbr@565: morphbr@565: morphbr@565: def do_HEAD(self): morphbr@565: self.do_dispatch(False) morphbr@565: # do_HEAD() morphbr@565: morphbr@565: morphbr@565: def _nav_items(self): morphbr@585: ret = "" morphbr@585: for name, url in self.menu.items(): morphbr@585: ret += utils.getHTML("menu", {"name": name, "url": url}) morphbr@585: morphbr@585: return ret morphbr@565: # _nav_items() morphbr@565: morphbr@565: def serve_main(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@585: self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()})) morphbr@565: # serve_main() morphbr@565: morphbr@638: def serve_version(self, body): morphbr@638: self.send_response(200) morphbr@638: self.send_header("Content-Type", "text/html") morphbr@638: self.send_header('Connection', 'close') morphbr@638: self.end_headers() morphbr@638: if body: morphbr@638: self.wfile.write("Version: %s" % __version__) morphbr@638: morphbr@565: morphbr@565: def serve_shutdown(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@585: self.wfile.write(utils.getHTML("shutdown")) morphbr@565: self.server.server_close() morphbr@565: # serve_shutdown() morphbr@565: morphbr@565: morphbr@565: def serve_stop_all_transcoders(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.server.stop_transcoders() morphbr@585: self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()})) morphbr@565: # serve_stop_all_transcoders() morphbr@565: morphbr@565: morphbr@565: def serve_stop_selected_transcoders(self, body, requests): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@585: opts = "" morphbr@565: if body: morphbr@565: transcoders = self.server.get_transcoders() morphbr@565: morphbr@565: for req in requests: morphbr@565: try: morphbr@565: host, port = req.split(":") morphbr@565: except IndexError: morphbr@565: continue morphbr@565: morphbr@565: port = int(port) morphbr@565: addr = (host, port) morphbr@565: morphbr@565: for t, r in transcoders: morphbr@565: if r.client_address == addr: morphbr@571: try: morphbr@571: t.stop() morphbr@571: except Exception, e: morphbr@571: self.log.info("Plugin already stopped") morphbr@571: morphbr@585: opts += self._create_html_item("%s: %s:%s" % ( morphbr@585: t, addr[0], addr[1])) morphbr@585: morphbr@565: break morphbr@585: morphbr@585: self.wfile.write(utils.getHTML("stop_selected", morphbr@585: {"menu": self._nav_items(), morphbr@585: "opts": opts})) morphbr@565: # serve_stop_selected_transcoders() morphbr@565: morphbr@565: morphbr@565: def serve_stop_transcoder(self, body): morphbr@565: req = self.query.get("request", None) morphbr@565: if req and "all" in req: morphbr@565: self.serve_stop_all_transcoders(body) morphbr@565: elif req: morphbr@565: self.serve_stop_selected_transcoders(body, req) morphbr@565: else: morphbr@565: self.serve_status(body) morphbr@565: # serve_stop_transcoder() morphbr@565: morphbr@565: morphbr@565: def serve_status(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@682: stopone = "" morphbr@585: morphbr@565: if body: morphbr@565: tl = self.server.get_transcoders() morphbr@565: if not tl: morphbr@585: running = "

No running transcoder.

\n" morphbr@585: stopall = "" morphbr@653: morphbr@653: elif self.query.get("ip") and self.query.get("file"): morphbr@653: for transcoder, request in tl: morphbr@653: filename = "%s" % self.query.get("file")[0] morphbr@653: tfilename = "%s" % transcoder.params_first("uri") morphbr@653: morphbr@653: if tfilename.find(filename) >= 0 and \ morphbr@653: request.client_address[0] == self.query.get("ip")[0]: morphbr@653: self.wfile.write("Status: %s %%" % transcoder.status) morphbr@684: return True morphbr@684: morphbr@684: return False morphbr@653: morphbr@565: else: morphbr@585: running = "

Running transcoders:

\n" morphbr@682: stopall = utils._create_html_item("" morphbr@653: "[STOP ALL]" % morphbr@585: self.menu["Stop"]) morphbr@585: morphbr@565: for transcoder, request in tl: morphbr@682: stopone += utils._create_html_item("%s: %s:%s" morphbr@682: "[STOP] - " morphbr@682: "Status: %s%%"\ morphbr@653: % ( morphbr@653: transcoder, request.client_address[0], morphbr@653: request.client_address[1], morphbr@653: self.menu["Stop"], request.client_address[0], morphbr@653: request.client_address[1], morphbr@595: transcoder.status) ) morphbr@565: morphbr@682: self.wfile.write(utils.getHTML("status", morphbr@682: {"menu": self._nav_items(), morphbr@682: "running": running, morphbr@682: "stopall": stopall, morphbr@682: "stopone": stopone})) morphbr@565: # serve_status() morphbr@565: morphbr@565: morphbr@565: def _get_transcoder(self): morphbr@565: # get transcoder option: mencoder is the default morphbr@565: request_transcoders = self.query.get("transcoder", ["mencoder"]) morphbr@565: morphbr@565: for t in request_transcoders: morphbr@565: transcoder = self.transcoders.get(t) morphbr@565: if transcoder: morphbr@565: return transcoder morphbr@565: morphbr@565: if not transcoder: morphbr@565: return self.transcoders[self.def_transcoder] morphbr@565: # _get_transcoder() morphbr@565: morphbr@565: morphbr@565: def serve_stream(self, body): morphbr@565: transcoder = self._get_transcoder() morphbr@565: try: morphbr@572: obj = transcoder(self.query) morphbr@565: except Exception, e: morphbr@565: self.send_error(500, str(e)) morphbr@565: return morphbr@565: morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", obj.get_mimetype()) morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: morphbr@565: if body: morphbr@565: self.server.add_transcoders(self, obj) morphbr@565: obj.start(self.wfile) morphbr@565: self.server.del_transcoders(self, obj) morphbr@565: # serve_stream() morphbr@565: morphbr@565: morphbr@565: def log_request(self, code='-', size='-'): morphbr@565: self.log.info('"%s" %s %s', self.requestline, str(code), str(size)) morphbr@565: # log_request() morphbr@565: morphbr@565: morphbr@565: def log_error(self, format, *args): morphbr@565: self.log.error("%s: %s" % (self.address_string(), format % args)) morphbr@565: # log_error() morphbr@565: morphbr@565: morphbr@565: def log_message(self, format, *args): morphbr@565: self.log.info("%s: %s" % (self.address_string(), format % args)) morphbr@565: # log_message() morphbr@565: # RequestHandler morphbr@565: morphbr@565: morphbr@565: morphbr@565: class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): morphbr@577: log = log.getLogger("gms.server") morphbr@565: run = True morphbr@565: _transcoders = {} morphbr@565: _lock = threading.RLock() morphbr@565: morphbr@565: def serve_forever(self): morphbr@565: self.log.info("GMyth-Streamer serving HTTP on %s:%s" % morphbr@565: self.socket.getsockname()) morphbr@565: try: morphbr@565: while self.run: morphbr@565: self.handle_request() morphbr@565: except KeyboardInterrupt, e: morphbr@565: pass morphbr@565: morphbr@565: self.log.debug("Stopping all remaining transcoders...") morphbr@565: self.stop_transcoders() morphbr@565: self.log.debug("Transcoders stopped!") morphbr@565: # serve_forever() morphbr@565: morphbr@565: morphbr@565: def get_request(self): morphbr@565: skt = self.socket morphbr@565: old = skt.gettimeout() morphbr@565: skt.settimeout(0.5) morphbr@565: while self.run: morphbr@565: try: morphbr@565: r = skt.accept() morphbr@565: skt.settimeout(old) morphbr@565: return r morphbr@565: except socket.timeout, e: morphbr@565: pass morphbr@565: raise socket.error("Not running") morphbr@565: # get_request() morphbr@565: morphbr@565: morphbr@565: def server_close(self): morphbr@565: self.run = False morphbr@565: self.stop_transcoders() morphbr@565: morphbr@565: BaseHTTPServer.HTTPServer.server_close(self) morphbr@565: # server_close() morphbr@565: morphbr@565: morphbr@565: def stop_transcoders(self): morphbr@565: self._lock.acquire() morphbr@565: for transcoder, request in self._transcoders.iteritems(): morphbr@565: self.log.info("Stop transcoder: %s, client=%s" % morphbr@565: (transcoder, request.client_address)) morphbr@565: transcoder.stop() morphbr@565: self._lock.release() morphbr@565: # stop_transcoders() morphbr@565: morphbr@565: morphbr@565: def get_transcoders(self): morphbr@565: self._lock.acquire() morphbr@565: try: morphbr@565: return self._transcoders.items() morphbr@565: finally: morphbr@565: self._lock.release() morphbr@565: # get_transcoders() morphbr@565: morphbr@565: morphbr@565: def add_transcoders(self, request, transcoder): morphbr@565: self._lock.acquire() morphbr@565: try: morphbr@565: self._transcoders[transcoder] = request morphbr@565: finally: morphbr@565: self._lock.release() morphbr@565: # add_transcoders() morphbr@565: morphbr@565: morphbr@565: def del_transcoders(self, request, transcoder): morphbr@565: self._lock.acquire() morphbr@565: try: morphbr@565: del self._transcoders[transcoder] morphbr@565: finally: morphbr@565: self._lock.release() morphbr@565: # del_transcoders() morphbr@565: # Server morphbr@565: morphbr@565: morphbr@565: morphbr@565: def serve_forever(host="0.0.0.0", port=40000): morphbr@565: addr = (host, port) morphbr@565: RequestHandler.protocol_version = "HTTP/1.0" morphbr@565: httpd = Server(addr, RequestHandler) morphbr@565: httpd.serve_forever() morphbr@565: # serve_forever() morphbr@565: morphbr@565: morphbr@565: def load_plugins_transcoders(directory): morphbr@565: RequestHandler.load_plugins_transcoders(directory) morphbr@565: # load_plugins_transcoders()