morphbr@565: #!/usr/bin/env python
morphbr@565: 
morphbr@565: __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
morphbr@565: __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
morphbr@565: __license__ = "GPL"
morphbr@565: __version__ = "0.3"
morphbr@565: 
morphbr@565: import os
morphbr@565: import threading
morphbr@565: import SocketServer
morphbr@565: import BaseHTTPServer
morphbr@565: import socket
morphbr@565: import urlparse
morphbr@565: import cgi
morphbr@565: import lib.utils as utils
morphbr@565: import logging as log
morphbr@565: 
morphbr@565: __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
morphbr@565:            "load_plugins_transcoders")
morphbr@565: 
morphbr@565: class Transcoder(object):
morphbr@577:     log = log.getLogger("gms.transcoder")
morphbr@565:     priority = 0   # negative values have higher priorities
morphbr@565:     name = None # to be used in requests
morphbr@595:     status = None
morphbr@565: 
morphbr@565:     def __init__(self, params):
morphbr@565:         self.params = params
morphbr@565:     # __init__()
morphbr@565: 
morphbr@565: 
morphbr@565:     def params_first(self, key, default=None):
morphbr@565:         if default is None:
morphbr@565:             return self.params[key][0]
morphbr@565:         else:
morphbr@565:             try:
morphbr@565:                 return self.params[key][0]
morphbr@565:             except:
morphbr@565:                 return default
morphbr@565:     # params_first()
morphbr@565: 
morphbr@565: 
morphbr@565:     def get_mimetype(self):
morphbr@565:         mux = self.params_first("mux", "mpg")
morphbr@565: 
morphbr@565:         if mux == "mpeg":
morphbr@565:             return "video/mpeg"
morphbr@565:         elif mux == "avi":
morphbr@565:             return "video/x-msvideo"
morphbr@565:         else:
morphbr@565:             return "application/octet-stream"
morphbr@565:     # get_mimetype()
morphbr@565: 
morphbr@565: 
morphbr@565:     def start(self, outfile):
morphbr@565:         return True
morphbr@565:     # start()
morphbr@565: 
morphbr@565: 
morphbr@565:     def stop(self):
morphbr@565:         return Tru
morphbr@565:     # stop()
morphbr@565: 
morphbr@565: 
morphbr@565:     def __str__(self):
morphbr@577:         return '%s("%s", mux="%s", params=%s, addr=%s)' % \
morphbr@565:                (self.__class__.__name__,
morphbr@565:                 self.params_first("uri", "None"),
morphbr@565:                 self.params_first("mux", "mpg"),
morphbr@577:                 self.params,
morphbr@577:                 repr(self))
morphbr@565:     # __str__()
morphbr@565: # Transcoder
morphbr@565: 
morphbr@565: 
morphbr@565: 
morphbr@565: class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
morphbr@577:     log = log.getLogger("gms.request")
morphbr@565:     def_transcoder = None
morphbr@565:     transcoders = utils.PluginSet(Transcoder)
morphbr@565: 
morphbr@585:     menu = {
morphbr@585:         "Status": "/status.do",
morphbr@585:         "Stop": "/stop-transcoder.do",
morphbr@585:         "Shutdown": "/shutdown.do"
morphbr@585:         }
morphbr@585: 
morphbr@565:     @classmethod
morphbr@565:     def load_plugins_transcoders(cls, directory):
morphbr@565:         cls.transcoders.load_from_directory(directory)
morphbr@565: 
morphbr@565:         if cls.def_transcoder is None and cls.transcoders:
morphbr@565:             cls.def_transcoder = cls.transcoders[0].name
morphbr@565:     # load_plugins_transcoders()
morphbr@565: 
morphbr@565: 
morphbr@565:     def do_dispatch(self, body):
morphbr@565:         self.url = self.path
morphbr@565: 
morphbr@565:         pieces = urlparse.urlparse(self.path)
morphbr@565:         self.path = pieces[2]
morphbr@565:         self.query = cgi.parse_qs(pieces[4])
morphbr@565: 
morphbr@565:         if self.path == "/":
morphbr@565:             self.serve_main(body)
morphbr@565:         elif self.path == "/shutdown.do":
morphbr@565:             self.serve_shutdown(body)
morphbr@565:         elif self.path == "/stop-transcoder.do":
morphbr@565:             self.serve_stop_transcoder(body)
morphbr@565:         elif self.path == "/status.do":
morphbr@565:             self.serve_status(body)
morphbr@565:         elif self.path == "/play.do":
morphbr@565:             self.serve_play(body)
morphbr@565:         elif self.path == "/stream.do":
morphbr@565:             self.serve_stream(body)
morphbr@565:         else:
morphbr@565:             self.send_error(404, "File not found")
morphbr@565:     # do_dispatch()
morphbr@565: 
morphbr@565: 
morphbr@565:     def do_GET(self):
morphbr@565:         self.do_dispatch(True)
morphbr@565:     # do_GET()
morphbr@565: 
morphbr@565: 
morphbr@565:     def do_HEAD(self):
morphbr@565:         self.do_dispatch(False)
morphbr@565:     # do_HEAD()
morphbr@565: 
morphbr@565: 
morphbr@565:     def _nav_items(self):
morphbr@585:         ret = ""
morphbr@585:         for name, url in self.menu.items():
morphbr@585:             ret += utils.getHTML("menu", {"name": name, "url": url})
morphbr@585: 
morphbr@585:         return ret
morphbr@565:     # _nav_items()
morphbr@565: 
morphbr@585:     def _create_html_item(self, opt):
morphbr@585:         return "<li>%s</li>\n" % opt
morphbr@585:     # _create_html_item
morphbr@565: 
morphbr@565:     def serve_main(self, body):
morphbr@565:         self.send_response(200)
morphbr@565:         self.send_header("Content-Type", "text/html")
morphbr@565:         self.send_header('Connection', 'close')
morphbr@565:         self.end_headers()
morphbr@565:         if body:
morphbr@585:             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
morphbr@565:     # serve_main()
morphbr@565: 
morphbr@565: 
morphbr@565:     def serve_shutdown(self, body):
morphbr@565:         self.send_response(200)
morphbr@565:         self.send_header("Content-Type", "text/html")
morphbr@565:         self.send_header('Connection', 'close')
morphbr@565:         self.end_headers()
morphbr@565:         if body:
morphbr@585:             self.wfile.write(utils.getHTML("shutdown"))
morphbr@565:         self.server.server_close()
morphbr@565:     # serve_shutdown()
morphbr@565: 
morphbr@565: 
morphbr@565:     def serve_stop_all_transcoders(self, body):
morphbr@565:         self.send_response(200)
morphbr@565:         self.send_header("Content-Type", "text/html")
morphbr@565:         self.send_header('Connection', 'close')
morphbr@565:         self.end_headers()
morphbr@565:         if body:
morphbr@565:             self.server.stop_transcoders()
morphbr@585:             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
morphbr@565:     # serve_stop_all_transcoders()
morphbr@565: 
morphbr@565: 
morphbr@565:     def serve_stop_selected_transcoders(self, body, requests):
morphbr@565:         self.send_response(200)
morphbr@565:         self.send_header("Content-Type", "text/html")
morphbr@565:         self.send_header('Connection', 'close')
morphbr@565:         self.end_headers()
morphbr@585:         opts = ""
morphbr@565:         if body:
morphbr@565:             transcoders = self.server.get_transcoders()
morphbr@565: 
morphbr@565:             for req in requests:
morphbr@565:                 try:
morphbr@565:                     host, port = req.split(":")
morphbr@565:                 except IndexError:
morphbr@565:                     continue
morphbr@565: 
morphbr@565:                 port = int(port)
morphbr@565:                 addr = (host, port)
morphbr@565: 
morphbr@565:                 for t, r in transcoders:
morphbr@565:                     if r.client_address == addr:
morphbr@571:                         try:
morphbr@571:                             t.stop()
morphbr@571:                         except Exception, e:
morphbr@571:                             self.log.info("Plugin already stopped")
morphbr@571: 
morphbr@585:                         opts += self._create_html_item("%s: %s:%s" % (
morphbr@585:                             t, addr[0], addr[1]))
morphbr@585: 
morphbr@565:                         break
morphbr@585: 
morphbr@585:                 self.wfile.write(utils.getHTML("stop_selected",
morphbr@585:                                                {"menu": self._nav_items(),
morphbr@585:                                                 "opts": opts}))
morphbr@565:     # serve_stop_selected_transcoders()
morphbr@565: 
morphbr@565: 
morphbr@565:     def serve_stop_transcoder(self, body):
morphbr@565:         req = self.query.get("request", None)
morphbr@565:         if req and "all" in req:
morphbr@565:             self.serve_stop_all_transcoders(body)
morphbr@565:         elif req:
morphbr@565:             self.serve_stop_selected_transcoders(body, req)
morphbr@565:         else:
morphbr@565:             self.serve_status(body)
morphbr@565:     # serve_stop_transcoder()
morphbr@565: 
morphbr@565: 
morphbr@565:     def serve_status(self, body):
morphbr@565:         self.send_response(200)
morphbr@565:         self.send_header("Content-Type", "text/html")
morphbr@565:         self.send_header('Connection', 'close')
morphbr@565:         self.end_headers()
morphbr@585: 
morphbr@565:         if body:
morphbr@565:             tl = self.server.get_transcoders()
morphbr@565:             if not tl:
morphbr@585:                 running = "<p>No running transcoder.</p>\n"
morphbr@585:                 stopall = ""
morphbr@585:                 stopone = ""
morphbr@565:             else:
morphbr@585:                 running = "<p>Running transcoders:</p>\n"
morphbr@585:                 stopall = self._create_html_item("<a href='%s?request=all'>[STOP ALL]</a>" %
morphbr@585:                                                  self.menu["Stop"])
morphbr@585: 
morphbr@565:                 for transcoder, request in tl:
morphbr@585:                     stopone = self._create_html_item("%s: %s:%s<a href='%s?request=%s:%s'>"
morphbr@595:                                                      "[STOP]</a> - Status: %s%%" % (
morphbr@585:                         transcoder, request.client_address[0], request.client_address[1],
morphbr@595:                         self.menu["Stop"], request.client_address[0], request.client_address[1],
morphbr@595:                         transcoder.status) )
morphbr@565: 
morphbr@585:             self.wfile.write(utils.getHTML("status",
morphbr@585:                                            {"menu": self._nav_items(),
morphbr@585:                                             "running": running,
morphbr@585:                                             "stopall": stopall,
morphbr@585:                                             "stopone": stopone}))
morphbr@565:     # serve_status()
morphbr@565: 
morphbr@565: 
morphbr@565:     def _get_transcoder(self):
morphbr@565:         # get transcoder option: mencoder is the default
morphbr@565:         request_transcoders = self.query.get("transcoder", ["mencoder"])
morphbr@565: 
morphbr@565:         for t in request_transcoders:
morphbr@565:             transcoder = self.transcoders.get(t)
morphbr@565:             if transcoder:
morphbr@565:                 return transcoder
morphbr@565: 
morphbr@565:         if not transcoder:
morphbr@565:             return self.transcoders[self.def_transcoder]
morphbr@565:     # _get_transcoder()
morphbr@565: 
morphbr@565: 
morphbr@565:     def serve_stream(self, body):
morphbr@565:         transcoder = self._get_transcoder()
morphbr@565:         try:
morphbr@572:             obj = transcoder(self.query)
morphbr@565:         except Exception, e:
morphbr@565:             self.send_error(500, str(e))
morphbr@565:             return
morphbr@565: 
morphbr@565:         self.send_response(200)
morphbr@565:         self.send_header("Content-Type", obj.get_mimetype())
morphbr@565:         self.send_header('Connection', 'close')
morphbr@565:         self.end_headers()
morphbr@565: 
morphbr@565:         if body:
morphbr@565:             self.server.add_transcoders(self, obj)
morphbr@565:             obj.start(self.wfile)
morphbr@565:             self.server.del_transcoders(self, obj)
morphbr@565:     # serve_stream()
morphbr@565: 
morphbr@565: 
morphbr@565:     def log_request(self, code='-', size='-'):
morphbr@565:         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
morphbr@565:     # log_request()
morphbr@565: 
morphbr@565: 
morphbr@565:     def log_error(self, format, *args):
morphbr@565:         self.log.error("%s: %s" % (self.address_string(), format % args))
morphbr@565:     # log_error()
morphbr@565: 
morphbr@565: 
morphbr@565:     def log_message(self, format, *args):
morphbr@565:         self.log.info("%s: %s" % (self.address_string(), format % args))
morphbr@565:     # log_message()
morphbr@565: # RequestHandler
morphbr@565: 
morphbr@565: 
morphbr@565: 
morphbr@565: class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
morphbr@577:     log = log.getLogger("gms.server")
morphbr@565:     run = True
morphbr@565:     _transcoders = {}
morphbr@565:     _lock = threading.RLock()
morphbr@565: 
morphbr@565:     def serve_forever(self):
morphbr@565:         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
morphbr@565:                       self.socket.getsockname())
morphbr@565:         try:
morphbr@565:             while self.run:
morphbr@565:                 self.handle_request()
morphbr@565:         except KeyboardInterrupt, e:
morphbr@565:             pass
morphbr@565: 
morphbr@565:         self.log.debug("Stopping all remaining transcoders...")
morphbr@565:         self.stop_transcoders()
morphbr@565:         self.log.debug("Transcoders stopped!")
morphbr@565:     # serve_forever()
morphbr@565: 
morphbr@565: 
morphbr@565:     def get_request(self):
morphbr@565:         skt = self.socket
morphbr@565:         old = skt.gettimeout()
morphbr@565:         skt.settimeout(0.5)
morphbr@565:         while self.run:
morphbr@565:             try:
morphbr@565:                 r = skt.accept()
morphbr@565:                 skt.settimeout(old)
morphbr@565:                 return r
morphbr@565:             except socket.timeout, e:
morphbr@565:                 pass
morphbr@565:         raise socket.error("Not running")
morphbr@565:     # get_request()
morphbr@565: 
morphbr@565: 
morphbr@565:     def server_close(self):
morphbr@565:         self.run = False
morphbr@565:         self.stop_transcoders()
morphbr@565: 
morphbr@565:         BaseHTTPServer.HTTPServer.server_close(self)
morphbr@565:     # server_close()
morphbr@565: 
morphbr@565: 
morphbr@565:     def stop_transcoders(self):
morphbr@565:         self._lock.acquire()
morphbr@565:         for transcoder, request in self._transcoders.iteritems():
morphbr@565:             self.log.info("Stop transcoder: %s, client=%s" %
morphbr@565:                           (transcoder, request.client_address))
morphbr@565:             transcoder.stop()
morphbr@565:         self._lock.release()
morphbr@565:     # stop_transcoders()
morphbr@565: 
morphbr@565: 
morphbr@565:     def get_transcoders(self):
morphbr@565:         self._lock.acquire()
morphbr@565:         try:
morphbr@565:             return self._transcoders.items()
morphbr@565:         finally:
morphbr@565:             self._lock.release()
morphbr@565:     # get_transcoders()
morphbr@565: 
morphbr@565: 
morphbr@565:     def add_transcoders(self, request, transcoder):
morphbr@565:         self._lock.acquire()
morphbr@565:         try:
morphbr@565:             self._transcoders[transcoder] = request
morphbr@565:         finally:
morphbr@565:             self._lock.release()
morphbr@565:     # add_transcoders()
morphbr@565: 
morphbr@565: 
morphbr@565:     def del_transcoders(self, request, transcoder):
morphbr@565:         self._lock.acquire()
morphbr@565:         try:
morphbr@565:             del self._transcoders[transcoder]
morphbr@565:         finally:
morphbr@565:             self._lock.release()
morphbr@565:     # del_transcoders()
morphbr@565: # Server
morphbr@565: 
morphbr@565: 
morphbr@565: 
morphbr@565: def serve_forever(host="0.0.0.0", port=40000):
morphbr@565:     addr = (host, port)
morphbr@565:     RequestHandler.protocol_version = "HTTP/1.0"
morphbr@565:     httpd = Server(addr, RequestHandler)
morphbr@565:     httpd.serve_forever()
morphbr@565: # serve_forever()
morphbr@565: 
morphbr@565: 
morphbr@565: def load_plugins_transcoders(directory):
morphbr@565:     RequestHandler.load_plugins_transcoders(directory)
morphbr@565: # load_plugins_transcoders()