gmyth-stream/server/0.2/lib/server.py
author rosfran
Fri May 25 18:25:50 2007 +0100 (2007-05-25)
branchtrunk
changeset 715 e01c60057f07
parent 686 b29ea6deb6f8
child 717 24db16480456
permissions -rw-r--r--
[svn r721] Added GStreamer-like indentation.
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s( params=%s )' % \
    67                (self.__class__.__name__,
    68                 self.params)
    69     # __str__()
    70 # Transcoder
    71 
    72 
    73 
    74 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    75     log = log.getLogger("gms.request")
    76     def_transcoder = None
    77     transcoders = utils.PluginSet(Transcoder)
    78 
    79     menu = {
    80         "Stop": "/stop-transcoder.do",
    81         "Status": "/status.do",
    82         "Version": "/version.do",
    83         "Shutdown": "/shutdown.do"
    84         }
    85 
    86     @classmethod
    87     def load_plugins_transcoders(cls, directory):
    88         cls.transcoders.load_from_directory(directory)
    89 
    90         if cls.def_transcoder is None and cls.transcoders:
    91             cls.def_transcoder = cls.transcoders[0].name
    92     # load_plugins_transcoders()
    93 
    94 
    95     def do_dispatch(self, body):
    96         self.url = self.path
    97 
    98         pieces = urlparse.urlparse(self.path)
    99         self.path = pieces[2]
   100         self.query = cgi.parse_qs(pieces[4])
   101 
   102         if self.path == "/":
   103             self.serve_main(body)
   104         elif self.path == "/shutdown.do":
   105             self.serve_shutdown(body)
   106         elif self.path == "/stop-transcoder.do":
   107             self.serve_stop_transcoder(body)
   108         elif self.path == "/status.do":
   109             self.serve_status(body)
   110         elif self.path == "/version.do":
   111             self.serve_version(body)
   112         elif self.path == "/stream.do":
   113             self.serve_stream(body)
   114         else:
   115             action = self.query.get("action", None)
   116             if "stream.do" in action:
   117                 self.serve_stream(body)
   118             else:
   119                 self.send_error(404, "File not found")
   120     # do_dispatch()
   121 
   122 
   123     def do_GET(self):
   124         self.do_dispatch(True)
   125     # do_GET()
   126 
   127 
   128     def do_HEAD(self):
   129         self.do_dispatch(False)
   130     # do_HEAD()
   131 
   132 
   133     def _nav_items(self):
   134         ret = ""
   135         for name, url in self.menu.items():
   136             ret += utils.getHTML("menu", {"name": name, "url": url})
   137 
   138         return ret
   139     # _nav_items()
   140 
   141     def serve_main(self, body):
   142         self.send_response(200)
   143         self.send_header("Content-Type", "text/html")
   144         self.send_header('Connection', 'close')
   145         self.end_headers()
   146         if body:
   147             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   148     # serve_main()
   149 
   150     def serve_version(self, body):
   151         self.send_response(200)
   152         self.send_header("Content-Type", "text/html")
   153         self.send_header('Connection', 'close')
   154         self.end_headers()
   155         if body:
   156             self.wfile.write("Version: %s" %  __version__)
   157 
   158 
   159     def serve_shutdown(self, body):
   160         self.send_response(200)
   161         self.send_header("Content-Type", "text/html")
   162         self.send_header('Connection', 'close')
   163         self.end_headers()
   164         if body:
   165             self.wfile.write(utils.getHTML("shutdown"))
   166         self.server.server_close()
   167     # serve_shutdown()
   168 
   169 
   170     def serve_stop_all_transcoders(self, body):
   171         self.send_response(200)
   172         self.send_header("Content-Type", "text/html")
   173         self.send_header('Connection', 'close')
   174         self.end_headers()
   175         if body:
   176             self.server.stop_transcoders()
   177             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   178     # serve_stop_all_transcoders()
   179 
   180 
   181     def serve_stop_selected_transcoders(self, body, requests):
   182         self.send_response(200)
   183         self.send_header("Content-Type", "text/html")
   184         self.send_header('Connection', 'close')
   185         self.end_headers()
   186         opts = ""
   187         if body:
   188             transcoders = self.server.get_transcoders()
   189 
   190             for req in requests:
   191                 try:
   192                     host, port = req.split(":")
   193                 except IndexError:
   194                     continue
   195 
   196                 port = int(port)
   197                 addr = (host, port)
   198 
   199                 for t, r in transcoders:
   200                     if r.client_address == addr:
   201                         try:
   202                             t.stop()
   203                         except Exception, e:
   204                             self.log.info("Plugin already stopped")
   205 
   206                         opts += self._create_html_item("%s: %s:%s" % (
   207                             t, addr[0], addr[1]))
   208 
   209                         break
   210 
   211                 self.wfile.write(utils.getHTML("stop_selected",
   212                                                {"menu": self._nav_items(),
   213                                                 "opts": opts}))
   214     # serve_stop_selected_transcoders()
   215 
   216 
   217     def serve_stop_transcoder(self, body):
   218         req = self.query.get("request", None)
   219         if req and "all" in req:
   220             self.serve_stop_all_transcoders(body)
   221         elif req:
   222             self.serve_stop_selected_transcoders(body, req)
   223         else:
   224             self.serve_status(body)
   225     # serve_stop_transcoder()
   226 
   227 
   228     def serve_status(self, body):
   229         self.send_response(200)
   230         self.send_header("Content-Type", "text/html")
   231         self.send_header('Connection', 'close')
   232         self.end_headers()
   233         stopone = ""
   234 
   235         if body:
   236             tl = self.server.get_transcoders()
   237             if not tl:
   238                 running = "<p>No running transcoder.</p>\n"
   239                 stopall = ""
   240 
   241             elif self.query.get("ip") and self.query.get("file"):
   242                 for transcoder, request in tl:
   243                     filename = "%s" % self.query.get("file")[0]
   244                     tfilename = "%s" % transcoder.params_first("uri")
   245 
   246                     if tfilename.find(filename) >= 0 and \
   247                            request.client_address[0] == self.query.get("ip")[0]:
   248                         self.wfile.write("Status: %s %%" % transcoder.status)
   249                         return True
   250 
   251                 return False
   252 
   253             else:
   254                 running = "<p>Running transcoders:</p>\n"
   255                 stopall = utils._create_html_item("<a href='%s?request=all'>"
   256                                                  "[STOP ALL]</a>" %
   257                                                  self.menu["Stop"])
   258 
   259                 for transcoder, request in tl:
   260                     stopone += utils._create_html_item("%s: %s:%s<a href='%s?"
   261                                                        "request=%s:%s'>"
   262                                                        "[STOP]</a> - "
   263                                                        "Status: %s%%"\
   264                                                      % (
   265                         transcoder, request.client_address[0],
   266                         request.client_address[1],
   267                         self.menu["Stop"], request.client_address[0],
   268                         request.client_address[1],
   269                         transcoder.status) )
   270 
   271                 self.wfile.write(utils.getHTML("status",
   272                                                {"menu": self._nav_items(),
   273                                                 "running": running,
   274                                                 "stopall": stopall,
   275                                                 "stopone": stopone}))
   276     # serve_status()
   277 
   278 
   279     def _get_transcoder(self):
   280         # get transcoder option: mencoder is the default
   281         request_transcoders = self.query.get("transcoder", ["mencoder"])
   282 
   283         for t in request_transcoders:
   284             transcoder = self.transcoders.get(t)
   285             if transcoder:
   286                 return transcoder
   287 
   288         if not transcoder:
   289             return self.transcoders[self.def_transcoder]
   290     # _get_transcoder()
   291 
   292 
   293     def serve_stream(self, body):
   294         transcoder = self._get_transcoder()
   295         try:
   296             obj = transcoder(self.query)
   297         except Exception, e:
   298             self.send_error(500, str(e))
   299             return
   300 
   301         self.send_response(200)
   302         self.send_header("Content-Type", obj.get_mimetype())
   303         self.send_header('Connection', 'close')
   304         self.end_headers()
   305 
   306         if body:
   307             self.server.add_transcoders(self, obj)
   308             obj.start(self.wfile)
   309             self.server.del_transcoders(self, obj)
   310     # serve_stream()
   311 
   312 
   313     def log_request(self, code='-', size='-'):
   314         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   315     # log_request()
   316 
   317 
   318     def log_error(self, format, *args):
   319         self.log.error("%s: %s" % (self.address_string(), format % args))
   320     # log_error()
   321 
   322 
   323     def log_message(self, format, *args):
   324         self.log.info("%s: %s" % (self.address_string(), format % args))
   325     # log_message()
   326 # RequestHandler
   327 
   328 
   329 
   330 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   331     log = log.getLogger("gms.server")
   332     run = True
   333     _transcoders = {}
   334     _lock = threading.RLock()
   335 
   336     def serve_forever(self):
   337         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   338                       self.socket.getsockname())
   339         try:
   340             while self.run:
   341                 self.handle_request()
   342         except KeyboardInterrupt, e:
   343             pass
   344 
   345         self.log.debug("Stopping all remaining transcoders...")
   346         self.stop_transcoders()
   347         self.log.debug("Transcoders stopped!")
   348     # serve_forever()
   349 
   350 
   351     def get_request(self):
   352         skt = self.socket
   353         old = skt.gettimeout()
   354         skt.settimeout(0.5)
   355         while self.run:
   356             try:
   357                 r = skt.accept()
   358                 skt.settimeout(old)
   359                 return r
   360             except socket.timeout, e:
   361                 pass
   362         raise socket.error("Not running")
   363     # get_request()
   364 
   365 
   366     def server_close(self):
   367         self.run = False
   368         self.stop_transcoders()
   369 
   370         BaseHTTPServer.HTTPServer.server_close(self)
   371     # server_close()
   372 
   373 
   374     def stop_transcoders(self):
   375         self._lock.acquire()
   376         for transcoder, request in self._transcoders.iteritems():
   377             self.log.info("Stop transcoder: %s, client=%s" %
   378                           (transcoder, request.client_address))
   379             transcoder.stop()
   380         self._lock.release()
   381     # stop_transcoders()
   382 
   383 
   384     def get_transcoders(self):
   385         self._lock.acquire()
   386         try:
   387             return self._transcoders.items()
   388         finally:
   389             self._lock.release()
   390     # get_transcoders()
   391 
   392 
   393     def add_transcoders(self, request, transcoder):
   394         self._lock.acquire()
   395         try:
   396             self._transcoders[transcoder] = request
   397         finally:
   398             self._lock.release()
   399     # add_transcoders()
   400 
   401 
   402     def del_transcoders(self, request, transcoder):
   403         self._lock.acquire()
   404         try:
   405             del self._transcoders[transcoder]
   406         finally:
   407             self._lock.release()
   408     # del_transcoders()
   409 # Server
   410 
   411 
   412 
   413 def serve_forever(host="0.0.0.0", port=40000):
   414     addr = (host, port)
   415     RequestHandler.protocol_version = "HTTP/1.0"
   416     httpd = Server(addr, RequestHandler)
   417     httpd.serve_forever()
   418 # serve_forever()
   419 
   420 
   421 def load_plugins_transcoders(directory):
   422     RequestHandler.load_plugins_transcoders(directory)
   423 # load_plugins_transcoders()