gmyth-stream/server/0.2/lib/server.py
author renatofilho
Thu Aug 16 14:46:11 2007 +0100 (2007-08-16)
branchtrunk
changeset 807 add4025ca678
parent 717 24db16480456
permissions -rw-r--r--
[svn r813] created a timeout function for invalid channels
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.2"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26     tid = -1
    27 
    28     def __init__(self, params):
    29         self.params = params
    30     # __init__()
    31 
    32 
    33     def params_first(self, key, default=None):
    34         if default is None:
    35             return self.params[key][0]
    36         else:
    37             try:
    38                 return self.params[key][0]
    39             except:
    40                 return default
    41     # params_first()
    42 
    43 
    44     def get_mimetype(self):
    45         mux = self.params_first("mux", "mpg")
    46 
    47         if mux == "mpeg":
    48             return "video/mpeg"
    49         elif mux == "avi":
    50             return "video/x-msvideo"
    51         else:
    52             return "application/octet-stream"
    53     # get_mimetype()
    54 
    55 
    56     def start(self, outfile):
    57         return True
    58     # start()
    59 
    60 
    61     def stop(self):
    62         return True
    63     # stop()
    64 
    65 
    66     def __str__(self):
    67         return '%s( params=%s )' % \
    68                (self.__class__.__name__,
    69                 self.params)
    70     # __str__()
    71 # Transcoder
    72 
    73 
    74 
    75 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    76     log = log.getLogger("gms.request")
    77     def_transcoder = None
    78     transcoders = utils.PluginSet(Transcoder)
    79 
    80     menu = {
    81         "Stop": "/stop-transcoder.do",
    82         "Status": "/status.do",
    83         "Version": "/version.do",
    84         "Shutdown": "/shutdown.do"
    85         }
    86 
    87     @classmethod
    88     def load_plugins_transcoders(cls, directory):
    89         cls.transcoders.load_from_directory(directory)
    90 
    91         if cls.def_transcoder is None and cls.transcoders:
    92             cls.def_transcoder = cls.transcoders[0].name
    93     # load_plugins_transcoders()
    94 
    95 
    96     def do_dispatch(self, body):
    97         self.url = self.path
    98 
    99         pieces = urlparse.urlparse(self.path)
   100         self.path = pieces[2]
   101         self.query = cgi.parse_qs(pieces[4])
   102 
   103         if self.path == "/":
   104             self.serve_main(body)
   105         elif self.path == "/shutdown.do":
   106             self.serve_shutdown(body)
   107         elif self.path == "/stop-transcoder.do":
   108             self.serve_stop_transcoder(body)
   109         elif self.path == "/status.do":
   110             self.serve_status(body)
   111         elif self.path == "/version.do":
   112             self.serve_version(body)
   113         elif self.path == "/stream.do":
   114             self.serve_stream(body)
   115         else:
   116             action = self.query.get("action", None)
   117             if "stream.do" in action:
   118                 self.serve_stream(body)
   119             else:
   120                 self.send_error(404, "File not found")
   121     # do_dispatch()
   122 
   123 
   124     def do_GET(self):
   125         self.do_dispatch(True)
   126     # do_GET()
   127 
   128 
   129     def do_HEAD(self):
   130         self.do_dispatch(False)
   131     # do_HEAD()
   132 
   133 
   134     def _nav_items(self):
   135         ret = ""
   136         for name, url in self.menu.items():
   137             ret += utils.getHTML("menu", {"name": name, "url": url})
   138 
   139         return ret
   140     # _nav_items()
   141 
   142     def serve_main(self, body):
   143         self.send_response(200)
   144         self.send_header("Content-Type", "text/html")
   145         self.send_header('Connection', 'close')
   146         self.end_headers()
   147         if body:
   148             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   149     # serve_main()
   150 
   151     def serve_version(self, body):
   152         self.send_response(200)
   153         self.send_header("Content-Type", "text/html")
   154         self.send_header('Connection', 'close')
   155         self.end_headers()
   156         if body:
   157             self.wfile.write("Version: %s" %  __version__)
   158 
   159 
   160     def serve_shutdown(self, body):
   161         self.send_response(200)
   162         self.send_header("Content-Type", "text/html")
   163         self.send_header('Connection', 'close')
   164         self.end_headers()
   165         if body:
   166             self.wfile.write(utils.getHTML("shutdown"))
   167         self.server.server_close()
   168     # serve_shutdown()
   169 
   170 
   171     def serve_stop_all_transcoders(self, body):
   172         self.send_response(200)
   173         self.send_header("Content-Type", "text/html")
   174         self.send_header('Connection', 'close')
   175         self.end_headers()
   176         if body:
   177             self.server.stop_transcoders()
   178             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   179     # serve_stop_all_transcoders()
   180 
   181 
   182     def serve_stop_selected_transcoders(self, body, requests):
   183         self.send_response(200)
   184         self.send_header("Content-Type", "text/html")
   185         self.send_header('Connection', 'close')
   186         self.end_headers()
   187         opts = ""
   188         if body:
   189             transcoders = self.server.get_transcoders()
   190 
   191             for req in requests:
   192                 try:
   193                     host, port = req.split(":")
   194                 except IndexError:
   195                     continue
   196 
   197                 port = int(port)
   198                 addr = (host, port)
   199 
   200                 for t, r in transcoders:
   201                     if r.client_address == addr:
   202                         try:
   203                             t.stop()
   204                         except Exception, e:
   205                             self.log.info("Plugin already stopped")
   206 
   207                         opts += self._create_html_item("%s: %s:%s" % (
   208                             t, addr[0], addr[1]))
   209 
   210                         break
   211 
   212                 self.wfile.write(utils.getHTML("stop_selected",
   213                                                {"menu": self._nav_items(),
   214                                                 "opts": opts}))
   215     # serve_stop_selected_transcoders()
   216 
   217 
   218     def serve_stop_transcoder(self, body):
   219         req = self.query.get("request", None)
   220         if req and "all" in req:
   221             self.serve_stop_all_transcoders(body)
   222         elif req:
   223             self.serve_stop_selected_transcoders(body, req)
   224         else:
   225             self.serve_status(body)
   226     # serve_stop_transcoder()
   227 
   228 
   229     def serve_status(self, body):
   230         self.send_response(200)
   231         self.send_header("Content-Type", "text/html")
   232         self.send_header('Connection', 'close')
   233         self.end_headers()
   234         stopone = ""
   235 
   236         if body:
   237             tl = self.server.get_transcoders()
   238             if not tl:
   239                 running = "<p>No running transcoder.</p>\n"
   240                 stopall = ""
   241 
   242             elif self.query.get("ip") and self.query.get("file"):
   243                 for transcoder, request in tl:
   244                     filename = "%s" % self.query.get("file")[0]
   245                     tfilename = "%s" % transcoder.params_first("uri")
   246 
   247                     if tfilename.find(filename) >= 0 and \
   248                            request.client_address[0] == self.query.get("ip")[0]:
   249                         self.wfile.write("Status: %s %%" % transcoder.status)
   250                         return True
   251 
   252                 return False
   253 
   254             else:
   255                 running = "<p>Running transcoders:</p>\n"
   256                 stopall = utils._create_html_item("<a href='%s?request=all'>"
   257                                                  "[STOP ALL]</a>" %
   258                                                  self.menu["Stop"])
   259 
   260                 for transcoder, request in tl:
   261                     stopone += utils._create_html_item("%s: %s:%s<a href='%s?"
   262                                                        "request=%s:%s'>"
   263                                                        "[STOP]</a> - "
   264                                                        "Status: %s%%"\
   265                                                      % (
   266                         transcoder, request.client_address[0],
   267                         request.client_address[1],
   268                         self.menu["Stop"], request.client_address[0],
   269                         request.client_address[1],
   270                         transcoder.status) )
   271 
   272                 self.wfile.write(utils.getHTML("status",
   273                                                {"menu": self._nav_items(),
   274                                                 "running": running,
   275                                                 "stopall": stopall,
   276                                                 "stopone": stopone}))
   277     # serve_status()
   278 
   279 
   280     def _get_transcoder(self):
   281         # get transcoder option: mencoder is the default
   282         request_transcoders = self.query.get("transcoder", ["mencoder"])
   283 
   284         for t in request_transcoders:
   285             transcoder = self.transcoders.get(t)
   286             if transcoder:
   287                 return transcoder
   288 
   289         if not transcoder:
   290             return self.transcoders[self.def_transcoder]
   291     # _get_transcoder()
   292 
   293 
   294     def serve_stream(self, body):
   295         transcoder = self._get_transcoder()
   296         try:
   297             obj = transcoder(self.query)
   298         except Exception, e:
   299             self.send_error(500, str(e))
   300             return
   301 
   302         self.send_response(200)
   303         self.send_header("Content-Type", obj.get_mimetype())
   304         self.send_header('Connection', 'close')
   305         self.end_headers()
   306 
   307         if body:
   308             self.server.add_transcoders(self, obj)
   309             obj.start(self.wfile)
   310             self.server.del_transcoders(self, obj)
   311     # serve_stream()
   312 
   313 
   314     def log_request(self, code='-', size='-'):
   315         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   316     # log_request()
   317 
   318 
   319     def log_error(self, format, *args):
   320         self.log.error("%s: %s" % (self.address_string(), format % args))
   321     # log_error()
   322 
   323 
   324     def log_message(self, format, *args):
   325         self.log.info("%s: %s" % (self.address_string(), format % args))
   326     # log_message()
   327 # RequestHandler
   328 
   329 
   330 
   331 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   332     log = log.getLogger("gms.server")
   333     run = True
   334     _transcoders = {}
   335     _lock = threading.RLock()
   336 
   337     def serve_forever(self):
   338         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   339                       self.socket.getsockname())
   340         try:
   341             while self.run:
   342                 self.handle_request()
   343         except KeyboardInterrupt, e:
   344             pass
   345 
   346         self.log.debug("Stopping all remaining transcoders...")
   347         self.stop_transcoders()
   348         self.log.debug("Transcoders stopped!")
   349     # serve_forever()
   350 
   351 
   352     def get_request(self):
   353         skt = self.socket
   354         old = skt.gettimeout()
   355         skt.settimeout(0.5)
   356         while self.run:
   357             try:
   358                 r = skt.accept()
   359                 skt.settimeout(old)
   360                 return r
   361             except socket.timeout, e:
   362                 pass
   363         raise socket.error("Not running")
   364     # get_request()
   365 
   366 
   367     def server_close(self):
   368         self.run = False
   369         self.stop_transcoders()
   370 
   371         BaseHTTPServer.HTTPServer.server_close(self)
   372     # server_close()
   373 
   374 
   375     def stop_transcoders(self):
   376         self._lock.acquire()
   377         for transcoder, request in self._transcoders.iteritems():
   378             self.log.info("Stop transcoder: %s, client=%s" %
   379                           (transcoder, request.client_address))
   380             transcoder.stop()
   381         self._lock.release()
   382     # stop_transcoders()
   383 
   384 
   385     def get_transcoders(self):
   386         self._lock.acquire()
   387         try:
   388             return self._transcoders.items()
   389         finally:
   390             self._lock.release()
   391     # get_transcoders()
   392 
   393 
   394     def add_transcoders(self, request, transcoder):
   395         self._lock.acquire()
   396         try:
   397             self._transcoders[transcoder] = request
   398         finally:
   399             self._lock.release()
   400     # add_transcoders()
   401 
   402 
   403     def del_transcoders(self, request, transcoder):
   404         self._lock.acquire()
   405         try:
   406             del self._transcoders[transcoder]
   407         finally:
   408             self._lock.release()
   409     # del_transcoders()
   410 # Server
   411 
   412 
   413 
   414 def serve_forever(host="0.0.0.0", port=40000):
   415     addr = (host, port)
   416     RequestHandler.protocol_version = "HTTP/1.0"
   417     httpd = Server(addr, RequestHandler)
   418     httpd.serve_forever()
   419 # serve_forever()
   420 
   421 
   422 def load_plugins_transcoders(directory):
   423     RequestHandler.load_plugins_transcoders(directory)
   424 # load_plugins_transcoders()