gmyth-stream/server/0.2/lib/server.py
author morphbr
Thu May 17 19:22:03 2007 +0100 (2007-05-17)
branchtrunk
changeset 684 cabd7221c449
parent 682 367d791aeb57
child 686 b29ea6deb6f8
permissions -rw-r--r--
[svn r690] * GMyth-Streamer
- Bug fix regarding transcoders status
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s( params=%s )' % \
    67                (self.__class__.__name__,
    68                 self.params)
    69     # __str__()
    70 # Transcoder
    71 
    72 
    73 
    74 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    75     log = log.getLogger("gms.request")
    76     def_transcoder = None
    77     transcoders = utils.PluginSet(Transcoder)
    78 
    79     menu = {
    80         "Stop": "/stop-transcoder.do",
    81         "Status": "/status.do",
    82         "Version": "/version.do",
    83         "Shutdown": "/shutdown.do"
    84         }
    85 
    86     @classmethod
    87     def load_plugins_transcoders(cls, directory):
    88         cls.transcoders.load_from_directory(directory)
    89 
    90         if cls.def_transcoder is None and cls.transcoders:
    91             cls.def_transcoder = cls.transcoders[0].name
    92     # load_plugins_transcoders()
    93 
    94 
    95     def do_dispatch(self, body):
    96         self.url = self.path
    97 
    98         pieces = urlparse.urlparse(self.path)
    99         self.path = pieces[2]
   100         self.query = cgi.parse_qs(pieces[4])
   101 
   102         if self.path == "/":
   103             self.serve_main(body)
   104         elif self.path == "/shutdown.do":
   105             self.serve_shutdown(body)
   106         elif self.path == "/stop-transcoder.do":
   107             self.serve_stop_transcoder(body)
   108         elif self.path == "/status.do":
   109             self.serve_status(body)
   110         elif self.path == "/version.do":
   111             self.serve_version(body)
   112         elif self.path == "/stream.do":
   113             self.serve_stream(body)
   114         else:
   115             action = self.query.get("action", None)
   116             if action is not None:
   117                 self.serve_stream(body)
   118             else:
   119                 self.send_error(404, "File not found")
   120     # do_dispatch()
   121 
   122 
   123     def do_GET(self):
   124         self.do_dispatch(True)
   125     # do_GET()
   126 
   127 
   128     def do_HEAD(self):
   129         self.do_dispatch(False)
   130     # do_HEAD()
   131 
   132 
   133     def _nav_items(self):
   134         ret = ""
   135         for name, url in self.menu.items():
   136             ret += utils.getHTML("menu", {"name": name, "url": url})
   137 
   138         return ret
   139     # _nav_items()
   140 
   141     def serve_main(self, body):
   142         self.send_response(200)
   143         self.send_header("Content-Type", "text/html")
   144         self.send_header('Connection', 'close')
   145         self.end_headers()
   146         if body:
   147             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   148     # serve_main()
   149 
   150     def serve_version(self, body):
   151         self.send_response(200)
   152         self.send_header("Content-Type", "text/html")
   153         self.send_header('Connection', 'close')
   154         self.end_headers()
   155         if body:
   156             self.wfile.write("Version: %s" %  __version__)
   157 
   158 
   159     def serve_shutdown(self, body):
   160         self.send_response(200)
   161         self.send_header("Content-Type", "text/html")
   162         self.send_header('Connection', 'close')
   163         self.end_headers()
   164         if body:
   165             self.wfile.write(utils.getHTML("shutdown"))
   166         self.server.server_close()
   167     # serve_shutdown()
   168 
   169 
   170     def serve_stop_all_transcoders(self, body):
   171         self.send_response(200)
   172         self.send_header("Content-Type", "text/html")
   173         self.send_header('Connection', 'close')
   174         self.end_headers()
   175         if body:
   176             self.server.stop_transcoders()
   177             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   178     # serve_stop_all_transcoders()
   179 
   180 
   181     def serve_stop_selected_transcoders(self, body, requests):
   182         self.send_response(200)
   183         self.send_header("Content-Type", "text/html")
   184         self.send_header('Connection', 'close')
   185         self.end_headers()
   186         opts = ""
   187         if body:
   188             transcoders = self.server.get_transcoders()
   189 
   190             for req in requests:
   191                 try:
   192                     host, port = req.split(":")
   193                 except IndexError:
   194                     continue
   195 
   196                 port = int(port)
   197                 addr = (host, port)
   198 
   199                 for t, r in transcoders:
   200                     if r.client_address == addr:
   201                         try:
   202                             t.stop()
   203                         except Exception, e:
   204                             self.log.info("Plugin already stopped")
   205 
   206                         opts += self._create_html_item("%s: %s:%s" % (
   207                             t, addr[0], addr[1]))
   208 
   209                         break
   210 
   211                 self.wfile.write(utils.getHTML("stop_selected",
   212                                                {"menu": self._nav_items(),
   213                                                 "opts": opts}))
   214     # serve_stop_selected_transcoders()
   215 
   216 
   217     def serve_stop_transcoder(self, body):
   218         req = self.query.get("request", None)
   219         if req and "all" in req:
   220             self.serve_stop_all_transcoders(body)
   221         elif req:
   222             self.serve_stop_selected_transcoders(body, req)
   223         else:
   224             self.serve_status(body)
   225     # serve_stop_transcoder()
   226 
   227 
   228     def serve_status(self, body):
   229         self.send_response(200)
   230         self.send_header("Content-Type", "text/html")
   231         self.send_header('Connection', 'close')
   232         self.end_headers()
   233         stopone = ""
   234 
   235         if body:
   236             tl = self.server.get_transcoders()
   237             if not tl:
   238                 running = "<p>No running transcoder.</p>\n"
   239                 stopall = ""
   240 
   241             elif self.query.get("ip") and self.query.get("file"):
   242                 for transcoder, request in tl:
   243                     filename = "%s" % self.query.get("file")[0]
   244                     tfilename = "%s" % transcoder.params_first("uri")
   245 
   246                     self.log.debug("FILENAME: %s" % filename)
   247                     self.log.debug("TFILENAME: %s" % tfilename)
   248 
   249                     if tfilename.find(filename) >= 0 and \
   250                            request.client_address[0] == self.query.get("ip")[0]:
   251                         self.wfile.write("Status: %s %%" % transcoder.status)
   252                         return True
   253 
   254                 return False
   255 
   256             else:
   257                 running = "<p>Running transcoders:</p>\n"
   258                 stopall = utils._create_html_item("<a href='%s?request=all'>"
   259                                                  "[STOP ALL]</a>" %
   260                                                  self.menu["Stop"])
   261 
   262                 for transcoder, request in tl:
   263                     stopone += utils._create_html_item("%s: %s:%s<a href='%s?"
   264                                                        "request=%s:%s'>"
   265                                                        "[STOP]</a> - "
   266                                                        "Status: %s%%"\
   267                                                      % (
   268                         transcoder, request.client_address[0],
   269                         request.client_address[1],
   270                         self.menu["Stop"], request.client_address[0],
   271                         request.client_address[1],
   272                         transcoder.status) )
   273 
   274                 self.wfile.write(utils.getHTML("status",
   275                                                {"menu": self._nav_items(),
   276                                                 "running": running,
   277                                                 "stopall": stopall,
   278                                                 "stopone": stopone}))
   279     # serve_status()
   280 
   281 
   282     def _get_transcoder(self):
   283         # get transcoder option: mencoder is the default
   284         request_transcoders = self.query.get("transcoder", ["mencoder"])
   285 
   286         for t in request_transcoders:
   287             transcoder = self.transcoders.get(t)
   288             if transcoder:
   289                 return transcoder
   290 
   291         if not transcoder:
   292             return self.transcoders[self.def_transcoder]
   293     # _get_transcoder()
   294 
   295 
   296     def serve_stream(self, body):
   297         transcoder = self._get_transcoder()
   298         try:
   299             obj = transcoder(self.query)
   300         except Exception, e:
   301             self.send_error(500, str(e))
   302             return
   303 
   304         self.send_response(200)
   305         self.send_header("Content-Type", obj.get_mimetype())
   306         self.send_header('Connection', 'close')
   307         self.end_headers()
   308 
   309         if body:
   310             self.server.add_transcoders(self, obj)
   311             obj.start(self.wfile)
   312             self.server.del_transcoders(self, obj)
   313     # serve_stream()
   314 
   315 
   316     def log_request(self, code='-', size='-'):
   317         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   318     # log_request()
   319 
   320 
   321     def log_error(self, format, *args):
   322         self.log.error("%s: %s" % (self.address_string(), format % args))
   323     # log_error()
   324 
   325 
   326     def log_message(self, format, *args):
   327         self.log.info("%s: %s" % (self.address_string(), format % args))
   328     # log_message()
   329 # RequestHandler
   330 
   331 
   332 
   333 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   334     log = log.getLogger("gms.server")
   335     run = True
   336     _transcoders = {}
   337     _lock = threading.RLock()
   338 
   339     def serve_forever(self):
   340         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   341                       self.socket.getsockname())
   342         try:
   343             while self.run:
   344                 self.handle_request()
   345         except KeyboardInterrupt, e:
   346             pass
   347 
   348         self.log.debug("Stopping all remaining transcoders...")
   349         self.stop_transcoders()
   350         self.log.debug("Transcoders stopped!")
   351     # serve_forever()
   352 
   353 
   354     def get_request(self):
   355         skt = self.socket
   356         old = skt.gettimeout()
   357         skt.settimeout(0.5)
   358         while self.run:
   359             try:
   360                 r = skt.accept()
   361                 skt.settimeout(old)
   362                 return r
   363             except socket.timeout, e:
   364                 pass
   365         raise socket.error("Not running")
   366     # get_request()
   367 
   368 
   369     def server_close(self):
   370         self.run = False
   371         self.stop_transcoders()
   372 
   373         BaseHTTPServer.HTTPServer.server_close(self)
   374     # server_close()
   375 
   376 
   377     def stop_transcoders(self):
   378         self._lock.acquire()
   379         for transcoder, request in self._transcoders.iteritems():
   380             self.log.info("Stop transcoder: %s, client=%s" %
   381                           (transcoder, request.client_address))
   382             transcoder.stop()
   383         self._lock.release()
   384     # stop_transcoders()
   385 
   386 
   387     def get_transcoders(self):
   388         self._lock.acquire()
   389         try:
   390             return self._transcoders.items()
   391         finally:
   392             self._lock.release()
   393     # get_transcoders()
   394 
   395 
   396     def add_transcoders(self, request, transcoder):
   397         self._lock.acquire()
   398         try:
   399             self._transcoders[transcoder] = request
   400         finally:
   401             self._lock.release()
   402     # add_transcoders()
   403 
   404 
   405     def del_transcoders(self, request, transcoder):
   406         self._lock.acquire()
   407         try:
   408             del self._transcoders[transcoder]
   409         finally:
   410             self._lock.release()
   411     # del_transcoders()
   412 # Server
   413 
   414 
   415 
   416 def serve_forever(host="0.0.0.0", port=40000):
   417     addr = (host, port)
   418     RequestHandler.protocol_version = "HTTP/1.0"
   419     httpd = Server(addr, RequestHandler)
   420     httpd.serve_forever()
   421 # serve_forever()
   422 
   423 
   424 def load_plugins_transcoders(directory):
   425     RequestHandler.load_plugins_transcoders(directory)
   426 # load_plugins_transcoders()