gmyth-stream/server/0.2/lib/server.py
author morphbr
Thu May 17 14:07:27 2007 +0100 (2007-05-17)
branchtrunk
changeset 682 367d791aeb57
parent 653 3433df0d6ae3
child 684 cabd7221c449
permissions -rw-r--r--
[svn r688] * GMyth-Streamer
- Bug in server_status (list more than one transcode)
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s( params=%s )' % \
    67                (self.__class__.__name__,
    68                 self.params)
    69     # __str__()
    70 # Transcoder
    71 
    72 
    73 
    74 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    75     log = log.getLogger("gms.request")
    76     def_transcoder = None
    77     transcoders = utils.PluginSet(Transcoder)
    78 
    79     menu = {
    80         "Stop": "/stop-transcoder.do",
    81         "Status": "/status.do",
    82         "Version": "/version.do",
    83         "Shutdown": "/shutdown.do"
    84         }
    85 
    86     @classmethod
    87     def load_plugins_transcoders(cls, directory):
    88         cls.transcoders.load_from_directory(directory)
    89 
    90         if cls.def_transcoder is None and cls.transcoders:
    91             cls.def_transcoder = cls.transcoders[0].name
    92     # load_plugins_transcoders()
    93 
    94 
    95     def do_dispatch(self, body):
    96         self.url = self.path
    97 
    98         pieces = urlparse.urlparse(self.path)
    99         self.path = pieces[2]
   100         self.query = cgi.parse_qs(pieces[4])
   101 
   102         if self.path == "/":
   103             self.serve_main(body)
   104         elif self.path == "/shutdown.do":
   105             self.serve_shutdown(body)
   106         elif self.path == "/stop-transcoder.do":
   107             self.serve_stop_transcoder(body)
   108         elif self.path == "/status.do":
   109             self.serve_status(body)
   110         elif self.path == "/version.do":
   111             self.serve_version(body)
   112         elif self.path == "/stream.do":
   113             self.serve_stream(body)
   114         else:
   115             action = self.query.get("action", None)
   116             if action is not None:
   117                 self.serve_stream(body)
   118             else:
   119                 self.send_error(404, "File not found")
   120     # do_dispatch()
   121 
   122 
   123     def do_GET(self):
   124         self.do_dispatch(True)
   125     # do_GET()
   126 
   127 
   128     def do_HEAD(self):
   129         self.do_dispatch(False)
   130     # do_HEAD()
   131 
   132 
   133     def _nav_items(self):
   134         ret = ""
   135         for name, url in self.menu.items():
   136             ret += utils.getHTML("menu", {"name": name, "url": url})
   137 
   138         return ret
   139     # _nav_items()
   140 
   141     def serve_main(self, body):
   142         self.send_response(200)
   143         self.send_header("Content-Type", "text/html")
   144         self.send_header('Connection', 'close')
   145         self.end_headers()
   146         if body:
   147             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   148     # serve_main()
   149 
   150     def serve_version(self, body):
   151         self.send_response(200)
   152         self.send_header("Content-Type", "text/html")
   153         self.send_header('Connection', 'close')
   154         self.end_headers()
   155         if body:
   156             self.wfile.write("Version: %s" %  __version__)
   157 
   158 
   159     def serve_shutdown(self, body):
   160         self.send_response(200)
   161         self.send_header("Content-Type", "text/html")
   162         self.send_header('Connection', 'close')
   163         self.end_headers()
   164         if body:
   165             self.wfile.write(utils.getHTML("shutdown"))
   166         self.server.server_close()
   167     # serve_shutdown()
   168 
   169 
   170     def serve_stop_all_transcoders(self, body):
   171         self.send_response(200)
   172         self.send_header("Content-Type", "text/html")
   173         self.send_header('Connection', 'close')
   174         self.end_headers()
   175         if body:
   176             self.server.stop_transcoders()
   177             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   178     # serve_stop_all_transcoders()
   179 
   180 
   181     def serve_stop_selected_transcoders(self, body, requests):
   182         self.send_response(200)
   183         self.send_header("Content-Type", "text/html")
   184         self.send_header('Connection', 'close')
   185         self.end_headers()
   186         opts = ""
   187         if body:
   188             transcoders = self.server.get_transcoders()
   189 
   190             for req in requests:
   191                 try:
   192                     host, port = req.split(":")
   193                 except IndexError:
   194                     continue
   195 
   196                 port = int(port)
   197                 addr = (host, port)
   198 
   199                 for t, r in transcoders:
   200                     if r.client_address == addr:
   201                         try:
   202                             t.stop()
   203                         except Exception, e:
   204                             self.log.info("Plugin already stopped")
   205 
   206                         opts += self._create_html_item("%s: %s:%s" % (
   207                             t, addr[0], addr[1]))
   208 
   209                         break
   210 
   211                 self.wfile.write(utils.getHTML("stop_selected",
   212                                                {"menu": self._nav_items(),
   213                                                 "opts": opts}))
   214     # serve_stop_selected_transcoders()
   215 
   216 
   217     def serve_stop_transcoder(self, body):
   218         req = self.query.get("request", None)
   219         if req and "all" in req:
   220             self.serve_stop_all_transcoders(body)
   221         elif req:
   222             self.serve_stop_selected_transcoders(body, req)
   223         else:
   224             self.serve_status(body)
   225     # serve_stop_transcoder()
   226 
   227 
   228     def serve_status(self, body):
   229         self.send_response(200)
   230         self.send_header("Content-Type", "text/html")
   231         self.send_header('Connection', 'close')
   232         self.end_headers()
   233         stopone = ""
   234 
   235         if body:
   236             tl = self.server.get_transcoders()
   237             if not tl:
   238                 running = "<p>No running transcoder.</p>\n"
   239                 stopall = ""
   240 
   241             elif self.query.get("ip") and self.query.get("file"):
   242                 for transcoder, request in tl:
   243                     filename = "%s" % self.query.get("file")[0]
   244                     tfilename = "%s" % transcoder.params_first("uri")
   245 
   246                     if tfilename.find(filename) >= 0 and \
   247                            request.client_address[0] == self.query.get("ip")[0]:
   248                         self.wfile.write("Status: %s %%" % transcoder.status)
   249                     return
   250 
   251             else:
   252                 running = "<p>Running transcoders:</p>\n"
   253                 stopall = utils._create_html_item("<a href='%s?request=all'>"
   254                                                  "[STOP ALL]</a>" %
   255                                                  self.menu["Stop"])
   256 
   257                 for transcoder, request in tl:
   258                     stopone += utils._create_html_item("%s: %s:%s<a href='%s?"
   259                                                        "request=%s:%s'>"
   260                                                        "[STOP]</a> - "
   261                                                        "Status: %s%%"\
   262                                                      % (
   263                         transcoder, request.client_address[0],
   264                         request.client_address[1],
   265                         self.menu["Stop"], request.client_address[0],
   266                         request.client_address[1],
   267                         transcoder.status) )
   268 
   269                 self.wfile.write(utils.getHTML("status",
   270                                                {"menu": self._nav_items(),
   271                                                 "running": running,
   272                                                 "stopall": stopall,
   273                                                 "stopone": stopone}))
   274     # serve_status()
   275 
   276 
   277     def _get_transcoder(self):
   278         # get transcoder option: mencoder is the default
   279         request_transcoders = self.query.get("transcoder", ["mencoder"])
   280 
   281         for t in request_transcoders:
   282             transcoder = self.transcoders.get(t)
   283             if transcoder:
   284                 return transcoder
   285 
   286         if not transcoder:
   287             return self.transcoders[self.def_transcoder]
   288     # _get_transcoder()
   289 
   290 
   291     def serve_stream(self, body):
   292         transcoder = self._get_transcoder()
   293         try:
   294             obj = transcoder(self.query)
   295         except Exception, e:
   296             self.send_error(500, str(e))
   297             return
   298 
   299         self.send_response(200)
   300         self.send_header("Content-Type", obj.get_mimetype())
   301         self.send_header('Connection', 'close')
   302         self.end_headers()
   303 
   304         if body:
   305             self.server.add_transcoders(self, obj)
   306             obj.start(self.wfile)
   307             self.server.del_transcoders(self, obj)
   308     # serve_stream()
   309 
   310 
   311     def log_request(self, code='-', size='-'):
   312         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   313     # log_request()
   314 
   315 
   316     def log_error(self, format, *args):
   317         self.log.error("%s: %s" % (self.address_string(), format % args))
   318     # log_error()
   319 
   320 
   321     def log_message(self, format, *args):
   322         self.log.info("%s: %s" % (self.address_string(), format % args))
   323     # log_message()
   324 # RequestHandler
   325 
   326 
   327 
   328 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   329     log = log.getLogger("gms.server")
   330     run = True
   331     _transcoders = {}
   332     _lock = threading.RLock()
   333 
   334     def serve_forever(self):
   335         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   336                       self.socket.getsockname())
   337         try:
   338             while self.run:
   339                 self.handle_request()
   340         except KeyboardInterrupt, e:
   341             pass
   342 
   343         self.log.debug("Stopping all remaining transcoders...")
   344         self.stop_transcoders()
   345         self.log.debug("Transcoders stopped!")
   346     # serve_forever()
   347 
   348 
   349     def get_request(self):
   350         skt = self.socket
   351         old = skt.gettimeout()
   352         skt.settimeout(0.5)
   353         while self.run:
   354             try:
   355                 r = skt.accept()
   356                 skt.settimeout(old)
   357                 return r
   358             except socket.timeout, e:
   359                 pass
   360         raise socket.error("Not running")
   361     # get_request()
   362 
   363 
   364     def server_close(self):
   365         self.run = False
   366         self.stop_transcoders()
   367 
   368         BaseHTTPServer.HTTPServer.server_close(self)
   369     # server_close()
   370 
   371 
   372     def stop_transcoders(self):
   373         self._lock.acquire()
   374         for transcoder, request in self._transcoders.iteritems():
   375             self.log.info("Stop transcoder: %s, client=%s" %
   376                           (transcoder, request.client_address))
   377             transcoder.stop()
   378         self._lock.release()
   379     # stop_transcoders()
   380 
   381 
   382     def get_transcoders(self):
   383         self._lock.acquire()
   384         try:
   385             return self._transcoders.items()
   386         finally:
   387             self._lock.release()
   388     # get_transcoders()
   389 
   390 
   391     def add_transcoders(self, request, transcoder):
   392         self._lock.acquire()
   393         try:
   394             self._transcoders[transcoder] = request
   395         finally:
   396             self._lock.release()
   397     # add_transcoders()
   398 
   399 
   400     def del_transcoders(self, request, transcoder):
   401         self._lock.acquire()
   402         try:
   403             del self._transcoders[transcoder]
   404         finally:
   405             self._lock.release()
   406     # del_transcoders()
   407 # Server
   408 
   409 
   410 
   411 def serve_forever(host="0.0.0.0", port=40000):
   412     addr = (host, port)
   413     RequestHandler.protocol_version = "HTTP/1.0"
   414     httpd = Server(addr, RequestHandler)
   415     httpd.serve_forever()
   416 # serve_forever()
   417 
   418 
   419 def load_plugins_transcoders(directory):
   420     RequestHandler.load_plugins_transcoders(directory)
   421 # load_plugins_transcoders()