morphbr@565: #!/usr/bin/env python morphbr@565: morphbr@565: __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" morphbr@565: __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" morphbr@565: __license__ = "GPL" morphbr@565: __version__ = "0.3" morphbr@565: morphbr@565: import os morphbr@565: import threading morphbr@565: import SocketServer morphbr@565: import BaseHTTPServer morphbr@565: import socket morphbr@565: import urlparse morphbr@565: import cgi morphbr@565: import lib.utils as utils morphbr@565: import logging as log morphbr@565: morphbr@565: __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever", morphbr@565: "load_plugins_transcoders") morphbr@565: morphbr@565: class Transcoder(object): morphbr@565: log = log.getLogger("gmyth-stream.transcoder") morphbr@565: priority = 0 # negative values have higher priorities morphbr@565: name = None # to be used in requests morphbr@565: morphbr@565: def __init__(self, params): morphbr@565: self.params = params morphbr@565: # __init__() morphbr@565: morphbr@565: morphbr@565: def params_first(self, key, default=None): morphbr@565: if default is None: morphbr@565: return self.params[key][0] morphbr@565: else: morphbr@565: try: morphbr@565: return self.params[key][0] morphbr@565: except: morphbr@565: return default morphbr@565: # params_first() morphbr@565: morphbr@565: morphbr@565: def get_mimetype(self): morphbr@565: mux = self.params_first("mux", "mpg") morphbr@565: morphbr@565: if mux == "mpeg": morphbr@565: return "video/mpeg" morphbr@565: elif mux == "avi": morphbr@565: return "video/x-msvideo" morphbr@565: else: morphbr@565: return "application/octet-stream" morphbr@565: # get_mimetype() morphbr@565: morphbr@565: morphbr@565: def start(self, outfile): morphbr@565: return True morphbr@565: # start() morphbr@565: morphbr@565: morphbr@565: def stop(self): morphbr@565: return Tru morphbr@565: # stop() morphbr@565: morphbr@565: morphbr@565: def __str__(self): morphbr@565: return '%s("%s", mux="%s", params=%s)' % \ morphbr@565: (self.__class__.__name__, morphbr@565: self.params_first("uri", "None"), morphbr@565: self.params_first("mux", "mpg"), morphbr@565: self.params) morphbr@565: # __str__() morphbr@565: # Transcoder morphbr@565: morphbr@565: morphbr@565: morphbr@565: class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): morphbr@565: log = log.getLogger("gmyth-stream.request") morphbr@565: def_transcoder = None morphbr@565: transcoders = utils.PluginSet(Transcoder) morphbr@565: morphbr@565: @classmethod morphbr@565: def load_plugins_transcoders(cls, directory): morphbr@565: cls.transcoders.load_from_directory(directory) morphbr@565: morphbr@565: if cls.def_transcoder is None and cls.transcoders: morphbr@565: cls.def_transcoder = cls.transcoders[0].name morphbr@565: # load_plugins_transcoders() morphbr@565: morphbr@565: morphbr@565: def do_dispatch(self, body): morphbr@565: self.url = self.path morphbr@565: morphbr@565: pieces = urlparse.urlparse(self.path) morphbr@565: self.path = pieces[2] morphbr@565: self.query = cgi.parse_qs(pieces[4]) morphbr@565: morphbr@565: if self.path == "/": morphbr@565: self.serve_main(body) morphbr@565: elif self.path == "/shutdown.do": morphbr@565: self.serve_shutdown(body) morphbr@565: elif self.path == "/stop-transcoder.do": morphbr@565: self.serve_stop_transcoder(body) morphbr@565: elif self.path == "/status.do": morphbr@565: self.serve_status(body) morphbr@565: elif self.path == "/play.do": morphbr@565: self.serve_play(body) morphbr@565: elif self.path == "/stream.do": morphbr@565: self.serve_stream(body) morphbr@565: else: morphbr@565: self.send_error(404, "File not found") morphbr@565: # do_dispatch() morphbr@565: morphbr@565: morphbr@565: def do_GET(self): morphbr@565: self.do_dispatch(True) morphbr@565: # do_GET() morphbr@565: morphbr@565: morphbr@565: def do_HEAD(self): morphbr@565: self.do_dispatch(False) morphbr@565: # do_HEAD() morphbr@565: morphbr@565: morphbr@565: def _nav_items(self): morphbr@565: self.wfile.write("""\ morphbr@565:
  • Play
  • morphbr@565:
  • Status
  • morphbr@565:
  • Stop transcoders
  • morphbr@565:
  • Shutdown Server
  • morphbr@565: """) morphbr@565: # _nav_items() morphbr@565: morphbr@565: morphbr@565: def serve_main(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: Catota Server morphbr@565: morphbr@565:

    Welcome to Catota Server

    morphbr@565: morphbr@565: morphbr@565: morphbr@565: """) morphbr@565: # serve_main() morphbr@565: morphbr@565: morphbr@565: def serve_play(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: Catota Server morphbr@565: morphbr@565:

    Play

    morphbr@565:
    morphbr@565: :// morphbr@565: morphbr@565:
    morphbr@565: morphbr@565: morphbr@565: morphbr@565: """) morphbr@565: # serve_play() morphbr@565: morphbr@565: morphbr@565: def serve_shutdown(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: Catota Server Exited morphbr@565: morphbr@565:

    Catota is not running anymore

    morphbr@565: morphbr@565: morphbr@565: """) morphbr@565: self.server.server_close() morphbr@565: # serve_shutdown() morphbr@565: morphbr@565: morphbr@565: def serve_stop_all_transcoders(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.server.stop_transcoders() morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: Catota Server Stopped Transcoders morphbr@565: morphbr@565:

    Catota stopped running transcoders

    morphbr@565: morphbr@565: morphbr@565: morphbr@565: """) morphbr@565: # serve_stop_all_transcoders() morphbr@565: morphbr@565: morphbr@565: def serve_stop_selected_transcoders(self, body, requests): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: Catota Server Stopped Transcoders morphbr@565: morphbr@565:

    Catota stopped running transcoders:

    morphbr@565: morphbr@565: morphbr@565: morphbr@565: morphbr@565: """) morphbr@565: # serve_stop_selected_transcoders() morphbr@565: morphbr@565: morphbr@565: def serve_stop_transcoder(self, body): morphbr@565: req = self.query.get("request", None) morphbr@565: if req and "all" in req: morphbr@565: self.serve_stop_all_transcoders(body) morphbr@565: elif req: morphbr@565: self.serve_stop_selected_transcoders(body, req) morphbr@565: else: morphbr@565: self.serve_status(body) morphbr@565: # serve_stop_transcoder() morphbr@565: morphbr@565: morphbr@565: def serve_status(self, body): morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", "text/html") morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: if body: morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: Catota Server Status morphbr@565: morphbr@565:

    Catota Status

    morphbr@565: """) morphbr@565: tl = self.server.get_transcoders() morphbr@565: if not tl: morphbr@565: self.wfile.write("

    No running transcoder.

    \n") morphbr@565: else: morphbr@565: self.wfile.write("

    Running transcoders:

    \n") morphbr@565: self.wfile.write("""\ morphbr@565: morphbr@565: morphbr@565: morphbr@565: morphbr@565: """) morphbr@565: # serve_status() morphbr@565: morphbr@565: morphbr@565: def _get_transcoder(self): morphbr@565: # get transcoder option: mencoder is the default morphbr@565: request_transcoders = self.query.get("transcoder", ["mencoder"]) morphbr@565: morphbr@565: for t in request_transcoders: morphbr@565: transcoder = self.transcoders.get(t) morphbr@565: if transcoder: morphbr@565: return transcoder morphbr@565: morphbr@565: if not transcoder: morphbr@565: return self.transcoders[self.def_transcoder] morphbr@565: # _get_transcoder() morphbr@565: morphbr@565: morphbr@565: def serve_stream(self, body): morphbr@565: transcoder = self._get_transcoder() morphbr@565: try: morphbr@565: obj = transcoder(self.query) morphbr@565: except Exception, e: morphbr@565: self.send_error(500, str(e)) morphbr@565: return morphbr@565: morphbr@565: self.send_response(200) morphbr@565: self.send_header("Content-Type", obj.get_mimetype()) morphbr@565: self.send_header('Connection', 'close') morphbr@565: self.end_headers() morphbr@565: morphbr@565: if body: morphbr@565: self.server.add_transcoders(self, obj) morphbr@565: obj.start(self.wfile) morphbr@565: self.server.del_transcoders(self, obj) morphbr@565: # serve_stream() morphbr@565: morphbr@565: morphbr@565: def log_request(self, code='-', size='-'): morphbr@565: self.log.info('"%s" %s %s', self.requestline, str(code), str(size)) morphbr@565: # log_request() morphbr@565: morphbr@565: morphbr@565: def log_error(self, format, *args): morphbr@565: self.log.error("%s: %s" % (self.address_string(), format % args)) morphbr@565: # log_error() morphbr@565: morphbr@565: morphbr@565: def log_message(self, format, *args): morphbr@565: self.log.info("%s: %s" % (self.address_string(), format % args)) morphbr@565: # log_message() morphbr@565: # RequestHandler morphbr@565: morphbr@565: morphbr@565: morphbr@565: class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): morphbr@565: log = log.getLogger("gmyth-streamer.server") morphbr@565: run = True morphbr@565: _transcoders = {} morphbr@565: _lock = threading.RLock() morphbr@565: morphbr@565: def serve_forever(self): morphbr@565: self.log.info("GMyth-Streamer serving HTTP on %s:%s" % morphbr@565: self.socket.getsockname()) morphbr@565: try: morphbr@565: while self.run: morphbr@565: self.handle_request() morphbr@565: except KeyboardInterrupt, e: morphbr@565: pass morphbr@565: morphbr@565: self.log.debug("Stopping all remaining transcoders...") morphbr@565: self.stop_transcoders() morphbr@565: self.log.debug("Transcoders stopped!") morphbr@565: # serve_forever() morphbr@565: morphbr@565: morphbr@565: def get_request(self): morphbr@565: skt = self.socket morphbr@565: old = skt.gettimeout() morphbr@565: skt.settimeout(0.5) morphbr@565: while self.run: morphbr@565: try: morphbr@565: r = skt.accept() morphbr@565: skt.settimeout(old) morphbr@565: return r morphbr@565: except socket.timeout, e: morphbr@565: pass morphbr@565: raise socket.error("Not running") morphbr@565: # get_request() morphbr@565: morphbr@565: morphbr@565: def server_close(self): morphbr@565: self.run = False morphbr@565: self.stop_transcoders() morphbr@565: morphbr@565: BaseHTTPServer.HTTPServer.server_close(self) morphbr@565: # server_close() morphbr@565: morphbr@565: morphbr@565: def stop_transcoders(self): morphbr@565: self._lock.acquire() morphbr@565: for transcoder, request in self._transcoders.iteritems(): morphbr@565: self.log.info("Stop transcoder: %s, client=%s" % morphbr@565: (transcoder, request.client_address)) morphbr@565: transcoder.stop() morphbr@565: self._lock.release() morphbr@565: # stop_transcoders() morphbr@565: morphbr@565: morphbr@565: def get_transcoders(self): morphbr@565: self._lock.acquire() morphbr@565: try: morphbr@565: return self._transcoders.items() morphbr@565: finally: morphbr@565: self._lock.release() morphbr@565: # get_transcoders() morphbr@565: morphbr@565: morphbr@565: def add_transcoders(self, request, transcoder): morphbr@565: self._lock.acquire() morphbr@565: try: morphbr@565: self._transcoders[transcoder] = request morphbr@565: finally: morphbr@565: self._lock.release() morphbr@565: # add_transcoders() morphbr@565: morphbr@565: morphbr@565: def del_transcoders(self, request, transcoder): morphbr@565: self._lock.acquire() morphbr@565: try: morphbr@565: del self._transcoders[transcoder] morphbr@565: finally: morphbr@565: self._lock.release() morphbr@565: # del_transcoders() morphbr@565: # Server morphbr@565: morphbr@565: morphbr@565: morphbr@565: def serve_forever(host="0.0.0.0", port=40000): morphbr@565: addr = (host, port) morphbr@565: morphbr@565: RequestHandler.protocol_version = "HTTP/1.0" morphbr@565: httpd = Server(addr, RequestHandler) morphbr@565: httpd.serve_forever() morphbr@565: # serve_forever() morphbr@565: morphbr@565: morphbr@565: def load_plugins_transcoders(directory): morphbr@565: RequestHandler.load_plugins_transcoders(directory) morphbr@565: # load_plugins_transcoders()