gmyth-stream/server/0.2/lib/server.py
author rosfran
Tue May 15 15:45:34 2007 +0100 (2007-05-15)
branchtrunk
changeset 652 c98ddd4a9513
parent 628 c6af9d7a88b5
child 653 3433df0d6ae3
permissions -rw-r--r--
[svn r658] Added g_main_context_iteration to while loop on reading from FileTransfer.
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s("%s", mux="%s", params=%s, addr=%s)' % \
    67                (self.__class__.__name__,
    68                 self.params_first("uri", "None"),
    69                 self.params_first("mux", "mpg"),
    70                 self.params,
    71                 repr(self))
    72     # __str__()
    73 # Transcoder
    74 
    75 
    76 
    77 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    78     log = log.getLogger("gms.request")
    79     def_transcoder = None
    80     transcoders = utils.PluginSet(Transcoder)
    81 
    82     menu = {
    83         "Stop": "/stop-transcoder.do",
    84         "Status": "/status.do",
    85         "Version": "/version.do",
    86         "Shutdown": "/shutdown.do"
    87         }
    88 
    89     @classmethod
    90     def load_plugins_transcoders(cls, directory):
    91         cls.transcoders.load_from_directory(directory)
    92 
    93         if cls.def_transcoder is None and cls.transcoders:
    94             cls.def_transcoder = cls.transcoders[0].name
    95     # load_plugins_transcoders()
    96 
    97 
    98     def do_dispatch(self, body):
    99         self.url = self.path
   100 
   101         pieces = urlparse.urlparse(self.path)
   102         self.path = pieces[2]
   103         self.query = cgi.parse_qs(pieces[4])
   104 
   105         if self.path == "/":
   106             self.serve_main(body)
   107         elif self.path == "/shutdown.do":
   108             self.serve_shutdown(body)
   109         elif self.path == "/stop-transcoder.do":
   110             self.serve_stop_transcoder(body)
   111         elif self.path == "/status.do":
   112             self.serve_status(body)
   113         elif self.path == "/version.do":
   114             self.serve_version(body)
   115         elif self.path == "/stream.do":
   116             self.serve_stream(body)
   117         else:
   118             action = self.query.get("action", None)
   119             if action is not None:
   120                 self.serve_stream(body)
   121             else:
   122                 self.send_error(404, "File not found")
   123     # do_dispatch()
   124 
   125 
   126     def do_GET(self):
   127         self.do_dispatch(True)
   128     # do_GET()
   129 
   130 
   131     def do_HEAD(self):
   132         self.do_dispatch(False)
   133     # do_HEAD()
   134 
   135 
   136     def _nav_items(self):
   137         ret = ""
   138         for name, url in self.menu.items():
   139             ret += utils.getHTML("menu", {"name": name, "url": url})
   140 
   141         return ret
   142     # _nav_items()
   143 
   144     def _create_html_item(self, opt):
   145         return "<li>%s</li>\n" % opt
   146     # _create_html_item
   147 
   148     def serve_main(self, body):
   149         self.send_response(200)
   150         self.send_header("Content-Type", "text/html")
   151         self.send_header('Connection', 'close')
   152         self.end_headers()
   153         if body:
   154             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   155     # serve_main()
   156 
   157     def serve_version(self, body):
   158         self.send_response(200)
   159         self.send_header("Content-Type", "text/html")
   160         self.send_header('Connection', 'close')
   161         self.end_headers()
   162         if body:
   163             self.wfile.write("Version: %s" %  __version__)
   164 
   165 
   166     def serve_shutdown(self, body):
   167         self.send_response(200)
   168         self.send_header("Content-Type", "text/html")
   169         self.send_header('Connection', 'close')
   170         self.end_headers()
   171         if body:
   172             self.wfile.write(utils.getHTML("shutdown"))
   173         self.server.server_close()
   174     # serve_shutdown()
   175 
   176 
   177     def serve_stop_all_transcoders(self, body):
   178         self.send_response(200)
   179         self.send_header("Content-Type", "text/html")
   180         self.send_header('Connection', 'close')
   181         self.end_headers()
   182         if body:
   183             self.server.stop_transcoders()
   184             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   185     # serve_stop_all_transcoders()
   186 
   187 
   188     def serve_stop_selected_transcoders(self, body, requests):
   189         self.send_response(200)
   190         self.send_header("Content-Type", "text/html")
   191         self.send_header('Connection', 'close')
   192         self.end_headers()
   193         opts = ""
   194         if body:
   195             transcoders = self.server.get_transcoders()
   196 
   197             for req in requests:
   198                 try:
   199                     host, port = req.split(":")
   200                 except IndexError:
   201                     continue
   202 
   203                 port = int(port)
   204                 addr = (host, port)
   205 
   206                 for t, r in transcoders:
   207                     if r.client_address == addr:
   208                         try:
   209                             t.stop()
   210                         except Exception, e:
   211                             self.log.info("Plugin already stopped")
   212 
   213                         opts += self._create_html_item("%s: %s:%s" % (
   214                             t, addr[0], addr[1]))
   215 
   216                         break
   217 
   218                 self.wfile.write(utils.getHTML("stop_selected",
   219                                                {"menu": self._nav_items(),
   220                                                 "opts": opts}))
   221     # serve_stop_selected_transcoders()
   222 
   223 
   224     def serve_stop_transcoder(self, body):
   225         req = self.query.get("request", None)
   226         if req and "all" in req:
   227             self.serve_stop_all_transcoders(body)
   228         elif req:
   229             self.serve_stop_selected_transcoders(body, req)
   230         else:
   231             self.serve_status(body)
   232     # serve_stop_transcoder()
   233 
   234 
   235     def serve_status(self, body):
   236         self.send_response(200)
   237         self.send_header("Content-Type", "text/html")
   238         self.send_header('Connection', 'close')
   239         self.end_headers()
   240 
   241         if body:
   242             tl = self.server.get_transcoders()
   243             if not tl:
   244                 running = "<p>No running transcoder.</p>\n"
   245                 stopall = ""
   246                 stopone = ""
   247             else:
   248                 running = "<p>Running transcoders:</p>\n"
   249                 stopall = self._create_html_item("<a href='%s?request=all'>[STOP ALL]</a>" %
   250                                                  self.menu["Stop"])
   251 
   252                 for transcoder, request in tl:
   253                     stopone = self._create_html_item("%s: %s:%s<a href='%s?request=%s:%s'>"
   254                                                      "[STOP]</a> - Status: %s%%" % (
   255                         transcoder, request.client_address[0], request.client_address[1],
   256                         self.menu["Stop"], request.client_address[0], request.client_address[1],
   257                         transcoder.status) )
   258 
   259             self.wfile.write(utils.getHTML("status",
   260                                            {"menu": self._nav_items(),
   261                                             "running": running,
   262                                             "stopall": stopall,
   263                                             "stopone": stopone}))
   264     # serve_status()
   265 
   266 
   267     def _get_transcoder(self):
   268         # get transcoder option: mencoder is the default
   269         request_transcoders = self.query.get("transcoder", ["mencoder"])
   270 
   271         for t in request_transcoders:
   272             transcoder = self.transcoders.get(t)
   273             if transcoder:
   274                 return transcoder
   275 
   276         if not transcoder:
   277             return self.transcoders[self.def_transcoder]
   278     # _get_transcoder()
   279 
   280 
   281     def serve_stream(self, body):
   282         transcoder = self._get_transcoder()
   283         try:
   284             obj = transcoder(self.query)
   285         except Exception, e:
   286             self.send_error(500, str(e))
   287             return
   288 
   289         self.send_response(200)
   290         self.send_header("Content-Type", obj.get_mimetype())
   291         self.send_header('Connection', 'close')
   292         self.end_headers()
   293 
   294         if body:
   295             self.server.add_transcoders(self, obj)
   296             obj.start(self.wfile)
   297             self.server.del_transcoders(self, obj)
   298     # serve_stream()
   299 
   300 
   301     def log_request(self, code='-', size='-'):
   302         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   303     # log_request()
   304 
   305 
   306     def log_error(self, format, *args):
   307         self.log.error("%s: %s" % (self.address_string(), format % args))
   308     # log_error()
   309 
   310 
   311     def log_message(self, format, *args):
   312         self.log.info("%s: %s" % (self.address_string(), format % args))
   313     # log_message()
   314 # RequestHandler
   315 
   316 
   317 
   318 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   319     log = log.getLogger("gms.server")
   320     run = True
   321     _transcoders = {}
   322     _lock = threading.RLock()
   323 
   324     def serve_forever(self):
   325         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   326                       self.socket.getsockname())
   327         try:
   328             while self.run:
   329                 self.handle_request()
   330         except KeyboardInterrupt, e:
   331             pass
   332 
   333         self.log.debug("Stopping all remaining transcoders...")
   334         self.stop_transcoders()
   335         self.log.debug("Transcoders stopped!")
   336     # serve_forever()
   337 
   338 
   339     def get_request(self):
   340         skt = self.socket
   341         old = skt.gettimeout()
   342         skt.settimeout(0.5)
   343         while self.run:
   344             try:
   345                 r = skt.accept()
   346                 skt.settimeout(old)
   347                 return r
   348             except socket.timeout, e:
   349                 pass
   350         raise socket.error("Not running")
   351     # get_request()
   352 
   353 
   354     def server_close(self):
   355         self.run = False
   356         self.stop_transcoders()
   357 
   358         BaseHTTPServer.HTTPServer.server_close(self)
   359     # server_close()
   360 
   361 
   362     def stop_transcoders(self):
   363         self._lock.acquire()
   364         for transcoder, request in self._transcoders.iteritems():
   365             self.log.info("Stop transcoder: %s, client=%s" %
   366                           (transcoder, request.client_address))
   367             transcoder.stop()
   368         self._lock.release()
   369     # stop_transcoders()
   370 
   371 
   372     def get_transcoders(self):
   373         self._lock.acquire()
   374         try:
   375             return self._transcoders.items()
   376         finally:
   377             self._lock.release()
   378     # get_transcoders()
   379 
   380 
   381     def add_transcoders(self, request, transcoder):
   382         self._lock.acquire()
   383         try:
   384             self._transcoders[transcoder] = request
   385         finally:
   386             self._lock.release()
   387     # add_transcoders()
   388 
   389 
   390     def del_transcoders(self, request, transcoder):
   391         self._lock.acquire()
   392         try:
   393             del self._transcoders[transcoder]
   394         finally:
   395             self._lock.release()
   396     # del_transcoders()
   397 # Server
   398 
   399 
   400 
   401 def serve_forever(host="0.0.0.0", port=40000):
   402     addr = (host, port)
   403     RequestHandler.protocol_version = "HTTP/1.0"
   404     httpd = Server(addr, RequestHandler)
   405     httpd.serve_forever()
   406 # serve_forever()
   407 
   408 
   409 def load_plugins_transcoders(directory):
   410     RequestHandler.load_plugins_transcoders(directory)
   411 # load_plugins_transcoders()