morphbr@565: #!/usr/bin/env python
morphbr@565:
morphbr@565: __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
morphbr@565: __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
morphbr@565: __license__ = "GPL"
morphbr@565: __version__ = "0.3"
morphbr@565:
morphbr@565: import os
morphbr@565: import threading
morphbr@565: import SocketServer
morphbr@565: import BaseHTTPServer
morphbr@565: import socket
morphbr@565: import urlparse
morphbr@565: import cgi
morphbr@565: import lib.utils as utils
morphbr@565: import logging as log
morphbr@565:
morphbr@565: __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
morphbr@565: "load_plugins_transcoders")
morphbr@565:
morphbr@565: class Transcoder(object):
morphbr@565: log = log.getLogger("gmyth-stream.transcoder")
morphbr@565: priority = 0 # negative values have higher priorities
morphbr@565: name = None # to be used in requests
morphbr@565:
morphbr@565: def __init__(self, params):
morphbr@565: self.params = params
morphbr@565: # __init__()
morphbr@565:
morphbr@565:
morphbr@565: def params_first(self, key, default=None):
morphbr@565: if default is None:
morphbr@565: return self.params[key][0]
morphbr@565: else:
morphbr@565: try:
morphbr@565: return self.params[key][0]
morphbr@565: except:
morphbr@565: return default
morphbr@565: # params_first()
morphbr@565:
morphbr@565:
morphbr@565: def get_mimetype(self):
morphbr@565: mux = self.params_first("mux", "mpg")
morphbr@565:
morphbr@565: if mux == "mpeg":
morphbr@565: return "video/mpeg"
morphbr@565: elif mux == "avi":
morphbr@565: return "video/x-msvideo"
morphbr@565: else:
morphbr@565: return "application/octet-stream"
morphbr@565: # get_mimetype()
morphbr@565:
morphbr@565:
morphbr@565: def start(self, outfile):
morphbr@565: return True
morphbr@565: # start()
morphbr@565:
morphbr@565:
morphbr@565: def stop(self):
morphbr@565: return Tru
morphbr@565: # stop()
morphbr@565:
morphbr@565:
morphbr@565: def __str__(self):
morphbr@565: return '%s("%s", mux="%s", params=%s)' % \
morphbr@565: (self.__class__.__name__,
morphbr@565: self.params_first("uri", "None"),
morphbr@565: self.params_first("mux", "mpg"),
morphbr@565: self.params)
morphbr@565: # __str__()
morphbr@565: # Transcoder
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
morphbr@565: log = log.getLogger("gmyth-stream.request")
morphbr@565: def_transcoder = None
morphbr@565: transcoders = utils.PluginSet(Transcoder)
morphbr@565:
morphbr@565: @classmethod
morphbr@565: def load_plugins_transcoders(cls, directory):
morphbr@565: cls.transcoders.load_from_directory(directory)
morphbr@565:
morphbr@565: if cls.def_transcoder is None and cls.transcoders:
morphbr@565: cls.def_transcoder = cls.transcoders[0].name
morphbr@565: # load_plugins_transcoders()
morphbr@565:
morphbr@565:
morphbr@565: def do_dispatch(self, body):
morphbr@565: self.url = self.path
morphbr@565:
morphbr@565: pieces = urlparse.urlparse(self.path)
morphbr@565: self.path = pieces[2]
morphbr@565: self.query = cgi.parse_qs(pieces[4])
morphbr@565:
morphbr@565: if self.path == "/":
morphbr@565: self.serve_main(body)
morphbr@565: elif self.path == "/shutdown.do":
morphbr@565: self.serve_shutdown(body)
morphbr@565: elif self.path == "/stop-transcoder.do":
morphbr@565: self.serve_stop_transcoder(body)
morphbr@565: elif self.path == "/status.do":
morphbr@565: self.serve_status(body)
morphbr@565: elif self.path == "/play.do":
morphbr@565: self.serve_play(body)
morphbr@565: elif self.path == "/stream.do":
morphbr@565: self.serve_stream(body)
morphbr@565: else:
morphbr@565: self.send_error(404, "File not found")
morphbr@565: # do_dispatch()
morphbr@565:
morphbr@565:
morphbr@565: def do_GET(self):
morphbr@565: self.do_dispatch(True)
morphbr@565: # do_GET()
morphbr@565:
morphbr@565:
morphbr@565: def do_HEAD(self):
morphbr@565: self.do_dispatch(False)
morphbr@565: # do_HEAD()
morphbr@565:
morphbr@565:
morphbr@565: def _nav_items(self):
morphbr@565: self.wfile.write("""\
morphbr@565:
Play
morphbr@565: Status
morphbr@565: Stop transcoders
morphbr@565: Shutdown Server
morphbr@565: """)
morphbr@565: # _nav_items()
morphbr@565:
morphbr@565:
morphbr@565: def serve_main(self, body):
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", "text/html")
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565: if body:
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: Catota Server
morphbr@565:
morphbr@565: Welcome to Catota Server
morphbr@565:
morphbr@565: """)
morphbr@565: self._nav_items()
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: # serve_main()
morphbr@565:
morphbr@565:
morphbr@565: def serve_play(self, body):
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", "text/html")
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565: if body:
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: Catota Server
morphbr@565:
morphbr@565: Play
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: self._nav_items()
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: # serve_play()
morphbr@565:
morphbr@565:
morphbr@565: def serve_shutdown(self, body):
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", "text/html")
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565: if body:
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: Catota Server Exited
morphbr@565:
morphbr@565: Catota is not running anymore
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: self.server.server_close()
morphbr@565: # serve_shutdown()
morphbr@565:
morphbr@565:
morphbr@565: def serve_stop_all_transcoders(self, body):
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", "text/html")
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565: if body:
morphbr@565: self.server.stop_transcoders()
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: Catota Server Stopped Transcoders
morphbr@565:
morphbr@565: Catota stopped running transcoders
morphbr@565:
morphbr@565: """)
morphbr@565: self._nav_items()
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: # serve_stop_all_transcoders()
morphbr@565:
morphbr@565:
morphbr@565: def serve_stop_selected_transcoders(self, body, requests):
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", "text/html")
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565: if body:
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: Catota Server Stopped Transcoders
morphbr@565:
morphbr@565: Catota stopped running transcoders:
morphbr@565:
morphbr@565: """)
morphbr@565: transcoders = self.server.get_transcoders()
morphbr@565:
morphbr@565: for req in requests:
morphbr@565: try:
morphbr@565: host, port = req.split(":")
morphbr@565: except IndexError:
morphbr@565: continue
morphbr@565:
morphbr@565: port = int(port)
morphbr@565: addr = (host, port)
morphbr@565:
morphbr@565: for t, r in transcoders:
morphbr@565: if r.client_address == addr:
morphbr@565: t.stop()
morphbr@565: self.server.del_transcoders(self, t)
morphbr@565: self.wfile.write("""\
morphbr@565: - %s: %s:%s
morphbr@565: """ % (t, addr[0], addr[1]))
morphbr@565: t.stop()
morphbr@565: break
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: self._nav_items()
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: # serve_stop_selected_transcoders()
morphbr@565:
morphbr@565:
morphbr@565: def serve_stop_transcoder(self, body):
morphbr@565: req = self.query.get("request", None)
morphbr@565: if req and "all" in req:
morphbr@565: self.serve_stop_all_transcoders(body)
morphbr@565: elif req:
morphbr@565: self.serve_stop_selected_transcoders(body, req)
morphbr@565: else:
morphbr@565: self.serve_status(body)
morphbr@565: # serve_stop_transcoder()
morphbr@565:
morphbr@565:
morphbr@565: def serve_status(self, body):
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", "text/html")
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565: if body:
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: Catota Server Status
morphbr@565:
morphbr@565: Catota Status
morphbr@565: """)
morphbr@565: tl = self.server.get_transcoders()
morphbr@565: if not tl:
morphbr@565: self.wfile.write("No running transcoder.
\n")
morphbr@565: else:
morphbr@565: self.wfile.write("Running transcoders:
\n")
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565: - [STOP ALL]
morphbr@565: """)
morphbr@565: for transcoder, request in tl:
morphbr@565: self.wfile.write("""\
morphbr@565: - %s: %s:%s [STOP]
morphbr@565: """ % (transcoder, request.client_address[0], request.client_address[1],
morphbr@565: request.client_address[0], request.client_address[1]))
morphbr@565:
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: self._nav_items()
morphbr@565: self.wfile.write("""\
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: """)
morphbr@565: # serve_status()
morphbr@565:
morphbr@565:
morphbr@565: def _get_transcoder(self):
morphbr@565: # get transcoder option: mencoder is the default
morphbr@565: request_transcoders = self.query.get("transcoder", ["mencoder"])
morphbr@565:
morphbr@565: for t in request_transcoders:
morphbr@565: transcoder = self.transcoders.get(t)
morphbr@565: if transcoder:
morphbr@565: return transcoder
morphbr@565:
morphbr@565: if not transcoder:
morphbr@565: return self.transcoders[self.def_transcoder]
morphbr@565: # _get_transcoder()
morphbr@565:
morphbr@565:
morphbr@565: def serve_stream(self, body):
morphbr@565: transcoder = self._get_transcoder()
morphbr@565: try:
morphbr@565: obj = transcoder(self.query)
morphbr@565: except Exception, e:
morphbr@565: self.send_error(500, str(e))
morphbr@565: return
morphbr@565:
morphbr@565: self.send_response(200)
morphbr@565: self.send_header("Content-Type", obj.get_mimetype())
morphbr@565: self.send_header('Connection', 'close')
morphbr@565: self.end_headers()
morphbr@565:
morphbr@565: if body:
morphbr@565: self.server.add_transcoders(self, obj)
morphbr@565: obj.start(self.wfile)
morphbr@565: self.server.del_transcoders(self, obj)
morphbr@565: # serve_stream()
morphbr@565:
morphbr@565:
morphbr@565: def log_request(self, code='-', size='-'):
morphbr@565: self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
morphbr@565: # log_request()
morphbr@565:
morphbr@565:
morphbr@565: def log_error(self, format, *args):
morphbr@565: self.log.error("%s: %s" % (self.address_string(), format % args))
morphbr@565: # log_error()
morphbr@565:
morphbr@565:
morphbr@565: def log_message(self, format, *args):
morphbr@565: self.log.info("%s: %s" % (self.address_string(), format % args))
morphbr@565: # log_message()
morphbr@565: # RequestHandler
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
morphbr@565: log = log.getLogger("gmyth-streamer.server")
morphbr@565: run = True
morphbr@565: _transcoders = {}
morphbr@565: _lock = threading.RLock()
morphbr@565:
morphbr@565: def serve_forever(self):
morphbr@565: self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
morphbr@565: self.socket.getsockname())
morphbr@565: try:
morphbr@565: while self.run:
morphbr@565: self.handle_request()
morphbr@565: except KeyboardInterrupt, e:
morphbr@565: pass
morphbr@565:
morphbr@565: self.log.debug("Stopping all remaining transcoders...")
morphbr@565: self.stop_transcoders()
morphbr@565: self.log.debug("Transcoders stopped!")
morphbr@565: # serve_forever()
morphbr@565:
morphbr@565:
morphbr@565: def get_request(self):
morphbr@565: skt = self.socket
morphbr@565: old = skt.gettimeout()
morphbr@565: skt.settimeout(0.5)
morphbr@565: while self.run:
morphbr@565: try:
morphbr@565: r = skt.accept()
morphbr@565: skt.settimeout(old)
morphbr@565: return r
morphbr@565: except socket.timeout, e:
morphbr@565: pass
morphbr@565: raise socket.error("Not running")
morphbr@565: # get_request()
morphbr@565:
morphbr@565:
morphbr@565: def server_close(self):
morphbr@565: self.run = False
morphbr@565: self.stop_transcoders()
morphbr@565:
morphbr@565: BaseHTTPServer.HTTPServer.server_close(self)
morphbr@565: # server_close()
morphbr@565:
morphbr@565:
morphbr@565: def stop_transcoders(self):
morphbr@565: self._lock.acquire()
morphbr@565: for transcoder, request in self._transcoders.iteritems():
morphbr@565: self.log.info("Stop transcoder: %s, client=%s" %
morphbr@565: (transcoder, request.client_address))
morphbr@565: transcoder.stop()
morphbr@565: self._lock.release()
morphbr@565: # stop_transcoders()
morphbr@565:
morphbr@565:
morphbr@565: def get_transcoders(self):
morphbr@565: self._lock.acquire()
morphbr@565: try:
morphbr@565: return self._transcoders.items()
morphbr@565: finally:
morphbr@565: self._lock.release()
morphbr@565: # get_transcoders()
morphbr@565:
morphbr@565:
morphbr@565: def add_transcoders(self, request, transcoder):
morphbr@565: self._lock.acquire()
morphbr@565: try:
morphbr@565: self._transcoders[transcoder] = request
morphbr@565: finally:
morphbr@565: self._lock.release()
morphbr@565: # add_transcoders()
morphbr@565:
morphbr@565:
morphbr@565: def del_transcoders(self, request, transcoder):
morphbr@565: self._lock.acquire()
morphbr@565: try:
morphbr@565: del self._transcoders[transcoder]
morphbr@565: finally:
morphbr@565: self._lock.release()
morphbr@565: # del_transcoders()
morphbr@565: # Server
morphbr@565:
morphbr@565:
morphbr@565:
morphbr@565: def serve_forever(host="0.0.0.0", port=40000):
morphbr@565: addr = (host, port)
morphbr@565:
morphbr@565: RequestHandler.protocol_version = "HTTP/1.0"
morphbr@565: httpd = Server(addr, RequestHandler)
morphbr@565: httpd.serve_forever()
morphbr@565: # serve_forever()
morphbr@565:
morphbr@565:
morphbr@565: def load_plugins_transcoders(directory):
morphbr@565: RequestHandler.load_plugins_transcoders(directory)
morphbr@565: # load_plugins_transcoders()