gmyth-stream/server/0.2/lib/server.py
author rosfran
Wed Apr 25 15:50:41 2007 +0100 (2007-04-25)
branchtrunk
changeset 594 f36075170a4e
parent 577 7dc357cbaa40
child 595 5c5cff842d57
permissions -rw-r--r--
[svn r600] Added clean-up code to the GMyth test cases.
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25 
    26     def __init__(self, params):
    27         self.params = params
    28     # __init__()
    29 
    30 
    31     def params_first(self, key, default=None):
    32         if default is None:
    33             return self.params[key][0]
    34         else:
    35             try:
    36                 return self.params[key][0]
    37             except:
    38                 return default
    39     # params_first()
    40 
    41 
    42     def get_mimetype(self):
    43         mux = self.params_first("mux", "mpg")
    44 
    45         if mux == "mpeg":
    46             return "video/mpeg"
    47         elif mux == "avi":
    48             return "video/x-msvideo"
    49         else:
    50             return "application/octet-stream"
    51     # get_mimetype()
    52 
    53 
    54     def start(self, outfile):
    55         return True
    56     # start()
    57 
    58 
    59     def stop(self):
    60         return Tru
    61     # stop()
    62 
    63 
    64     def __str__(self):
    65         return '%s("%s", mux="%s", params=%s, addr=%s)' % \
    66                (self.__class__.__name__,
    67                 self.params_first("uri", "None"),
    68                 self.params_first("mux", "mpg"),
    69                 self.params,
    70                 repr(self))
    71     # __str__()
    72 # Transcoder
    73 
    74 
    75 
    76 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    77     log = log.getLogger("gms.request")
    78     def_transcoder = None
    79     transcoders = utils.PluginSet(Transcoder)
    80 
    81     menu = {
    82         "Status": "/status.do",
    83         "Stop": "/stop-transcoder.do",
    84         "Shutdown": "/shutdown.do"
    85         }
    86 
    87     @classmethod
    88     def load_plugins_transcoders(cls, directory):
    89         cls.transcoders.load_from_directory(directory)
    90 
    91         if cls.def_transcoder is None and cls.transcoders:
    92             cls.def_transcoder = cls.transcoders[0].name
    93     # load_plugins_transcoders()
    94 
    95 
    96     def do_dispatch(self, body):
    97         self.url = self.path
    98 
    99         pieces = urlparse.urlparse(self.path)
   100         self.path = pieces[2]
   101         self.query = cgi.parse_qs(pieces[4])
   102 
   103         if self.path == "/":
   104             self.serve_main(body)
   105         elif self.path == "/shutdown.do":
   106             self.serve_shutdown(body)
   107         elif self.path == "/stop-transcoder.do":
   108             self.serve_stop_transcoder(body)
   109         elif self.path == "/status.do":
   110             self.serve_status(body)
   111         elif self.path == "/play.do":
   112             self.serve_play(body)
   113         elif self.path == "/stream.do":
   114             self.serve_stream(body)
   115         else:
   116             self.send_error(404, "File not found")
   117     # do_dispatch()
   118 
   119 
   120     def do_GET(self):
   121         self.do_dispatch(True)
   122     # do_GET()
   123 
   124 
   125     def do_HEAD(self):
   126         self.do_dispatch(False)
   127     # do_HEAD()
   128 
   129 
   130     def _nav_items(self):
   131         ret = ""
   132         for name, url in self.menu.items():
   133             ret += utils.getHTML("menu", {"name": name, "url": url})
   134 
   135         return ret
   136     # _nav_items()
   137 
   138     def _create_html_item(self, opt):
   139         return "<li>%s</li>\n" % opt
   140     # _create_html_item
   141 
   142     def serve_main(self, body):
   143         self.send_response(200)
   144         self.send_header("Content-Type", "text/html")
   145         self.send_header('Connection', 'close')
   146         self.end_headers()
   147         if body:
   148             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   149     # serve_main()
   150 
   151 
   152     def serve_shutdown(self, body):
   153         self.send_response(200)
   154         self.send_header("Content-Type", "text/html")
   155         self.send_header('Connection', 'close')
   156         self.end_headers()
   157         if body:
   158             self.wfile.write(utils.getHTML("shutdown"))
   159         self.server.server_close()
   160     # serve_shutdown()
   161 
   162 
   163     def serve_stop_all_transcoders(self, body):
   164         self.send_response(200)
   165         self.send_header("Content-Type", "text/html")
   166         self.send_header('Connection', 'close')
   167         self.end_headers()
   168         if body:
   169             self.server.stop_transcoders()
   170             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   171     # serve_stop_all_transcoders()
   172 
   173 
   174     def serve_stop_selected_transcoders(self, body, requests):
   175         self.send_response(200)
   176         self.send_header("Content-Type", "text/html")
   177         self.send_header('Connection', 'close')
   178         self.end_headers()
   179         opts = ""
   180         if body:
   181             transcoders = self.server.get_transcoders()
   182 
   183             for req in requests:
   184                 try:
   185                     host, port = req.split(":")
   186                 except IndexError:
   187                     continue
   188 
   189                 port = int(port)
   190                 addr = (host, port)
   191 
   192                 for t, r in transcoders:
   193                     if r.client_address == addr:
   194                         try:
   195                             t.stop()
   196                         except Exception, e:
   197                             self.log.info("Plugin already stopped")
   198 
   199                         opts += self._create_html_item("%s: %s:%s" % (
   200                             t, addr[0], addr[1]))
   201 
   202                         break
   203 
   204                 self.wfile.write(utils.getHTML("stop_selected",
   205                                                {"menu": self._nav_items(),
   206                                                 "opts": opts}))
   207     # serve_stop_selected_transcoders()
   208 
   209 
   210     def serve_stop_transcoder(self, body):
   211         req = self.query.get("request", None)
   212         if req and "all" in req:
   213             self.serve_stop_all_transcoders(body)
   214         elif req:
   215             self.serve_stop_selected_transcoders(body, req)
   216         else:
   217             self.serve_status(body)
   218     # serve_stop_transcoder()
   219 
   220 
   221     def serve_status(self, body):
   222         self.send_response(200)
   223         self.send_header("Content-Type", "text/html")
   224         self.send_header('Connection', 'close')
   225         self.end_headers()
   226 
   227         if body:
   228             tl = self.server.get_transcoders()
   229             if not tl:
   230                 running = "<p>No running transcoder.</p>\n"
   231                 stopall = ""
   232                 stopone = ""
   233             else:
   234                 running = "<p>Running transcoders:</p>\n"
   235                 stopall = self._create_html_item("<a href='%s?request=all'>[STOP ALL]</a>" %
   236                                                  self.menu["Stop"])
   237 
   238                 for transcoder, request in tl:
   239                     stopone = self._create_html_item("%s: %s:%s<a href='%s?request=%s:%s'>"
   240                                                      "[STOP]</a>" % (
   241                         transcoder, request.client_address[0], request.client_address[1],
   242                         self.menu["Stop"], request.client_address[0], request.client_address[1]) )
   243 
   244             self.wfile.write(utils.getHTML("status",
   245                                            {"menu": self._nav_items(),
   246                                             "running": running,
   247                                             "stopall": stopall,
   248                                             "stopone": stopone}))
   249     # serve_status()
   250 
   251 
   252     def _get_transcoder(self):
   253         # get transcoder option: mencoder is the default
   254         request_transcoders = self.query.get("transcoder", ["mencoder"])
   255 
   256         for t in request_transcoders:
   257             transcoder = self.transcoders.get(t)
   258             if transcoder:
   259                 return transcoder
   260 
   261         if not transcoder:
   262             return self.transcoders[self.def_transcoder]
   263     # _get_transcoder()
   264 
   265 
   266     def serve_stream(self, body):
   267         transcoder = self._get_transcoder()
   268         try:
   269             obj = transcoder(self.query)
   270         except Exception, e:
   271             self.send_error(500, str(e))
   272             return
   273 
   274         self.send_response(200)
   275         self.send_header("Content-Type", obj.get_mimetype())
   276         self.send_header('Connection', 'close')
   277         self.end_headers()
   278 
   279         if body:
   280             self.server.add_transcoders(self, obj)
   281             obj.start(self.wfile)
   282             self.server.del_transcoders(self, obj)
   283     # serve_stream()
   284 
   285 
   286     def log_request(self, code='-', size='-'):
   287         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   288     # log_request()
   289 
   290 
   291     def log_error(self, format, *args):
   292         self.log.error("%s: %s" % (self.address_string(), format % args))
   293     # log_error()
   294 
   295 
   296     def log_message(self, format, *args):
   297         self.log.info("%s: %s" % (self.address_string(), format % args))
   298     # log_message()
   299 # RequestHandler
   300 
   301 
   302 
   303 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   304     log = log.getLogger("gms.server")
   305     run = True
   306     _transcoders = {}
   307     _lock = threading.RLock()
   308 
   309     def serve_forever(self):
   310         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   311                       self.socket.getsockname())
   312         try:
   313             while self.run:
   314                 self.handle_request()
   315         except KeyboardInterrupt, e:
   316             pass
   317 
   318         self.log.debug("Stopping all remaining transcoders...")
   319         self.stop_transcoders()
   320         self.log.debug("Transcoders stopped!")
   321     # serve_forever()
   322 
   323 
   324     def get_request(self):
   325         skt = self.socket
   326         old = skt.gettimeout()
   327         skt.settimeout(0.5)
   328         while self.run:
   329             try:
   330                 r = skt.accept()
   331                 skt.settimeout(old)
   332                 return r
   333             except socket.timeout, e:
   334                 pass
   335         raise socket.error("Not running")
   336     # get_request()
   337 
   338 
   339     def server_close(self):
   340         self.run = False
   341         self.stop_transcoders()
   342 
   343         BaseHTTPServer.HTTPServer.server_close(self)
   344     # server_close()
   345 
   346 
   347     def stop_transcoders(self):
   348         self._lock.acquire()
   349         for transcoder, request in self._transcoders.iteritems():
   350             self.log.info("Stop transcoder: %s, client=%s" %
   351                           (transcoder, request.client_address))
   352             transcoder.stop()
   353         self._lock.release()
   354     # stop_transcoders()
   355 
   356 
   357     def get_transcoders(self):
   358         self._lock.acquire()
   359         try:
   360             return self._transcoders.items()
   361         finally:
   362             self._lock.release()
   363     # get_transcoders()
   364 
   365 
   366     def add_transcoders(self, request, transcoder):
   367         self._lock.acquire()
   368         try:
   369             self._transcoders[transcoder] = request
   370         finally:
   371             self._lock.release()
   372     # add_transcoders()
   373 
   374 
   375     def del_transcoders(self, request, transcoder):
   376         self._lock.acquire()
   377         try:
   378             del self._transcoders[transcoder]
   379         finally:
   380             self._lock.release()
   381     # del_transcoders()
   382 # Server
   383 
   384 
   385 
   386 def serve_forever(host="0.0.0.0", port=40000):
   387     addr = (host, port)
   388     RequestHandler.protocol_version = "HTTP/1.0"
   389     httpd = Server(addr, RequestHandler)
   390     httpd.serve_forever()
   391 # serve_forever()
   392 
   393 
   394 def load_plugins_transcoders(directory):
   395     RequestHandler.load_plugins_transcoders(directory)
   396 # load_plugins_transcoders()