gmyth-stream/server/0.2/lib/server.py
author melunko
Tue May 08 14:54:37 2007 +0100 (2007-05-08)
branchtrunk
changeset 633 65eaca328e22
parent 595 5c5cff842d57
child 638 e38953623405
permissions -rw-r--r--
[svn r639] Added creation of package gmyth-utils containing gmyth-cat application
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s("%s", mux="%s", params=%s, addr=%s)' % \
    67                (self.__class__.__name__,
    68                 self.params_first("uri", "None"),
    69                 self.params_first("mux", "mpg"),
    70                 self.params,
    71                 repr(self))
    72     # __str__()
    73 # Transcoder
    74 
    75 
    76 
    77 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    78     log = log.getLogger("gms.request")
    79     def_transcoder = None
    80     transcoders = utils.PluginSet(Transcoder)
    81 
    82     menu = {
    83         "Status": "/status.do",
    84         "Stop": "/stop-transcoder.do",
    85         "Shutdown": "/shutdown.do"
    86         }
    87 
    88     @classmethod
    89     def load_plugins_transcoders(cls, directory):
    90         cls.transcoders.load_from_directory(directory)
    91 
    92         if cls.def_transcoder is None and cls.transcoders:
    93             cls.def_transcoder = cls.transcoders[0].name
    94     # load_plugins_transcoders()
    95 
    96 
    97     def do_dispatch(self, body):
    98         self.url = self.path
    99 
   100         pieces = urlparse.urlparse(self.path)
   101         self.path = pieces[2]
   102         self.query = cgi.parse_qs(pieces[4])
   103 
   104         if self.path == "/":
   105             self.serve_main(body)
   106         elif self.path == "/shutdown.do":
   107             self.serve_shutdown(body)
   108         elif self.path == "/stop-transcoder.do":
   109             self.serve_stop_transcoder(body)
   110         elif self.path == "/status.do":
   111             self.serve_status(body)
   112         elif self.path == "/play.do":
   113             self.serve_play(body)
   114         elif self.path == "/stream.do":
   115             self.serve_stream(body)
   116         else:
   117             action = self.query.get("action", None)
   118             if action is not None:
   119                 self.serve_stream(body)
   120             else:
   121                 self.send_error(404, "File not found")
   122     # do_dispatch()
   123 
   124 
   125     def do_GET(self):
   126         self.do_dispatch(True)
   127     # do_GET()
   128 
   129 
   130     def do_HEAD(self):
   131         self.do_dispatch(False)
   132     # do_HEAD()
   133 
   134 
   135     def _nav_items(self):
   136         ret = ""
   137         for name, url in self.menu.items():
   138             ret += utils.getHTML("menu", {"name": name, "url": url})
   139 
   140         return ret
   141     # _nav_items()
   142 
   143     def _create_html_item(self, opt):
   144         return "<li>%s</li>\n" % opt
   145     # _create_html_item
   146 
   147     def serve_main(self, body):
   148         self.send_response(200)
   149         self.send_header("Content-Type", "text/html")
   150         self.send_header('Connection', 'close')
   151         self.end_headers()
   152         if body:
   153             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   154     # serve_main()
   155 
   156 
   157     def serve_shutdown(self, body):
   158         self.send_response(200)
   159         self.send_header("Content-Type", "text/html")
   160         self.send_header('Connection', 'close')
   161         self.end_headers()
   162         if body:
   163             self.wfile.write(utils.getHTML("shutdown"))
   164         self.server.server_close()
   165     # serve_shutdown()
   166 
   167 
   168     def serve_stop_all_transcoders(self, body):
   169         self.send_response(200)
   170         self.send_header("Content-Type", "text/html")
   171         self.send_header('Connection', 'close')
   172         self.end_headers()
   173         if body:
   174             self.server.stop_transcoders()
   175             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   176     # serve_stop_all_transcoders()
   177 
   178 
   179     def serve_stop_selected_transcoders(self, body, requests):
   180         self.send_response(200)
   181         self.send_header("Content-Type", "text/html")
   182         self.send_header('Connection', 'close')
   183         self.end_headers()
   184         opts = ""
   185         if body:
   186             transcoders = self.server.get_transcoders()
   187 
   188             for req in requests:
   189                 try:
   190                     host, port = req.split(":")
   191                 except IndexError:
   192                     continue
   193 
   194                 port = int(port)
   195                 addr = (host, port)
   196 
   197                 for t, r in transcoders:
   198                     if r.client_address == addr:
   199                         try:
   200                             t.stop()
   201                         except Exception, e:
   202                             self.log.info("Plugin already stopped")
   203 
   204                         opts += self._create_html_item("%s: %s:%s" % (
   205                             t, addr[0], addr[1]))
   206 
   207                         break
   208 
   209                 self.wfile.write(utils.getHTML("stop_selected",
   210                                                {"menu": self._nav_items(),
   211                                                 "opts": opts}))
   212     # serve_stop_selected_transcoders()
   213 
   214 
   215     def serve_stop_transcoder(self, body):
   216         req = self.query.get("request", None)
   217         if req and "all" in req:
   218             self.serve_stop_all_transcoders(body)
   219         elif req:
   220             self.serve_stop_selected_transcoders(body, req)
   221         else:
   222             self.serve_status(body)
   223     # serve_stop_transcoder()
   224 
   225 
   226     def serve_status(self, body):
   227         self.send_response(200)
   228         self.send_header("Content-Type", "text/html")
   229         self.send_header('Connection', 'close')
   230         self.end_headers()
   231 
   232         if body:
   233             tl = self.server.get_transcoders()
   234             if not tl:
   235                 running = "<p>No running transcoder.</p>\n"
   236                 stopall = ""
   237                 stopone = ""
   238             else:
   239                 running = "<p>Running transcoders:</p>\n"
   240                 stopall = self._create_html_item("<a href='%s?request=all'>[STOP ALL]</a>" %
   241                                                  self.menu["Stop"])
   242 
   243                 for transcoder, request in tl:
   244                     stopone = self._create_html_item("%s: %s:%s<a href='%s?request=%s:%s'>"
   245                                                      "[STOP]</a> - Status: %s%%" % (
   246                         transcoder, request.client_address[0], request.client_address[1],
   247                         self.menu["Stop"], request.client_address[0], request.client_address[1],
   248                         transcoder.status) )
   249 
   250             self.wfile.write(utils.getHTML("status",
   251                                            {"menu": self._nav_items(),
   252                                             "running": running,
   253                                             "stopall": stopall,
   254                                             "stopone": stopone}))
   255     # serve_status()
   256 
   257 
   258     def _get_transcoder(self):
   259         # get transcoder option: mencoder is the default
   260         request_transcoders = self.query.get("transcoder", ["mencoder"])
   261 
   262         for t in request_transcoders:
   263             transcoder = self.transcoders.get(t)
   264             if transcoder:
   265                 return transcoder
   266 
   267         if not transcoder:
   268             return self.transcoders[self.def_transcoder]
   269     # _get_transcoder()
   270 
   271 
   272     def serve_stream(self, body):
   273         transcoder = self._get_transcoder()
   274         try:
   275             obj = transcoder(self.query)
   276         except Exception, e:
   277             self.send_error(500, str(e))
   278             return
   279 
   280         self.send_response(200)
   281         self.send_header("Content-Type", obj.get_mimetype())
   282         self.send_header('Connection', 'close')
   283         self.end_headers()
   284 
   285         if body:
   286             self.server.add_transcoders(self, obj)
   287             obj.start(self.wfile)
   288             self.server.del_transcoders(self, obj)
   289     # serve_stream()
   290 
   291 
   292     def log_request(self, code='-', size='-'):
   293         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   294     # log_request()
   295 
   296 
   297     def log_error(self, format, *args):
   298         self.log.error("%s: %s" % (self.address_string(), format % args))
   299     # log_error()
   300 
   301 
   302     def log_message(self, format, *args):
   303         self.log.info("%s: %s" % (self.address_string(), format % args))
   304     # log_message()
   305 # RequestHandler
   306 
   307 
   308 
   309 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   310     log = log.getLogger("gms.server")
   311     run = True
   312     _transcoders = {}
   313     _lock = threading.RLock()
   314 
   315     def serve_forever(self):
   316         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   317                       self.socket.getsockname())
   318         try:
   319             while self.run:
   320                 self.handle_request()
   321         except KeyboardInterrupt, e:
   322             pass
   323 
   324         self.log.debug("Stopping all remaining transcoders...")
   325         self.stop_transcoders()
   326         self.log.debug("Transcoders stopped!")
   327     # serve_forever()
   328 
   329 
   330     def get_request(self):
   331         skt = self.socket
   332         old = skt.gettimeout()
   333         skt.settimeout(0.5)
   334         while self.run:
   335             try:
   336                 r = skt.accept()
   337                 skt.settimeout(old)
   338                 return r
   339             except socket.timeout, e:
   340                 pass
   341         raise socket.error("Not running")
   342     # get_request()
   343 
   344 
   345     def server_close(self):
   346         self.run = False
   347         self.stop_transcoders()
   348 
   349         BaseHTTPServer.HTTPServer.server_close(self)
   350     # server_close()
   351 
   352 
   353     def stop_transcoders(self):
   354         self._lock.acquire()
   355         for transcoder, request in self._transcoders.iteritems():
   356             self.log.info("Stop transcoder: %s, client=%s" %
   357                           (transcoder, request.client_address))
   358             transcoder.stop()
   359         self._lock.release()
   360     # stop_transcoders()
   361 
   362 
   363     def get_transcoders(self):
   364         self._lock.acquire()
   365         try:
   366             return self._transcoders.items()
   367         finally:
   368             self._lock.release()
   369     # get_transcoders()
   370 
   371 
   372     def add_transcoders(self, request, transcoder):
   373         self._lock.acquire()
   374         try:
   375             self._transcoders[transcoder] = request
   376         finally:
   377             self._lock.release()
   378     # add_transcoders()
   379 
   380 
   381     def del_transcoders(self, request, transcoder):
   382         self._lock.acquire()
   383         try:
   384             del self._transcoders[transcoder]
   385         finally:
   386             self._lock.release()
   387     # del_transcoders()
   388 # Server
   389 
   390 
   391 
   392 def serve_forever(host="0.0.0.0", port=40000):
   393     addr = (host, port)
   394     RequestHandler.protocol_version = "HTTP/1.0"
   395     httpd = Server(addr, RequestHandler)
   396     httpd.serve_forever()
   397 # serve_forever()
   398 
   399 
   400 def load_plugins_transcoders(directory):
   401     RequestHandler.load_plugins_transcoders(directory)
   402 # load_plugins_transcoders()