gmyth-stream/server/0.2/lib/server.py
author morphbr
Tue May 15 18:25:19 2007 +0100 (2007-05-15)
branchtrunk
changeset 653 3433df0d6ae3
parent 638 e38953623405
child 682 367d791aeb57
permissions -rw-r--r--
[svn r659] * GMyth-Stream
- created mencoder_lib to cleanup code
- fixed progress bar function
- get status by task
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s( params=%s )' % \
    67                (self.__class__.__name__,
    68                 self.params)
    69     # __str__()
    70 # Transcoder
    71 
    72 
    73 
    74 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    75     log = log.getLogger("gms.request")
    76     def_transcoder = None
    77     transcoders = utils.PluginSet(Transcoder)
    78 
    79     menu = {
    80         "Stop": "/stop-transcoder.do",
    81         "Status": "/status.do",
    82         "Version": "/version.do",
    83         "Shutdown": "/shutdown.do"
    84         }
    85 
    86     @classmethod
    87     def load_plugins_transcoders(cls, directory):
    88         cls.transcoders.load_from_directory(directory)
    89 
    90         if cls.def_transcoder is None and cls.transcoders:
    91             cls.def_transcoder = cls.transcoders[0].name
    92     # load_plugins_transcoders()
    93 
    94 
    95     def do_dispatch(self, body):
    96         self.url = self.path
    97 
    98         pieces = urlparse.urlparse(self.path)
    99         self.path = pieces[2]
   100         self.query = cgi.parse_qs(pieces[4])
   101 
   102         if self.path == "/":
   103             self.serve_main(body)
   104         elif self.path == "/shutdown.do":
   105             self.serve_shutdown(body)
   106         elif self.path == "/stop-transcoder.do":
   107             self.serve_stop_transcoder(body)
   108         elif self.path == "/status.do":
   109             self.serve_status(body)
   110         elif self.path == "/version.do":
   111             self.serve_version(body)
   112         elif self.path == "/stream.do":
   113             self.serve_stream(body)
   114         else:
   115             action = self.query.get("action", None)
   116             if action is not None:
   117                 self.serve_stream(body)
   118             else:
   119                 self.send_error(404, "File not found")
   120     # do_dispatch()
   121 
   122 
   123     def do_GET(self):
   124         self.do_dispatch(True)
   125     # do_GET()
   126 
   127 
   128     def do_HEAD(self):
   129         self.do_dispatch(False)
   130     # do_HEAD()
   131 
   132 
   133     def _nav_items(self):
   134         ret = ""
   135         for name, url in self.menu.items():
   136             ret += utils.getHTML("menu", {"name": name, "url": url})
   137 
   138         return ret
   139     # _nav_items()
   140 
   141     def _create_html_item(self, opt):
   142         return "<li>%s</li>\n" % opt
   143     # _create_html_item
   144 
   145     def serve_main(self, body):
   146         self.send_response(200)
   147         self.send_header("Content-Type", "text/html")
   148         self.send_header('Connection', 'close')
   149         self.end_headers()
   150         if body:
   151             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   152     # serve_main()
   153 
   154     def serve_version(self, body):
   155         self.send_response(200)
   156         self.send_header("Content-Type", "text/html")
   157         self.send_header('Connection', 'close')
   158         self.end_headers()
   159         if body:
   160             self.wfile.write("Version: %s" %  __version__)
   161 
   162 
   163     def serve_shutdown(self, body):
   164         self.send_response(200)
   165         self.send_header("Content-Type", "text/html")
   166         self.send_header('Connection', 'close')
   167         self.end_headers()
   168         if body:
   169             self.wfile.write(utils.getHTML("shutdown"))
   170         self.server.server_close()
   171     # serve_shutdown()
   172 
   173 
   174     def serve_stop_all_transcoders(self, body):
   175         self.send_response(200)
   176         self.send_header("Content-Type", "text/html")
   177         self.send_header('Connection', 'close')
   178         self.end_headers()
   179         if body:
   180             self.server.stop_transcoders()
   181             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   182     # serve_stop_all_transcoders()
   183 
   184 
   185     def serve_stop_selected_transcoders(self, body, requests):
   186         self.send_response(200)
   187         self.send_header("Content-Type", "text/html")
   188         self.send_header('Connection', 'close')
   189         self.end_headers()
   190         opts = ""
   191         if body:
   192             transcoders = self.server.get_transcoders()
   193 
   194             for req in requests:
   195                 try:
   196                     host, port = req.split(":")
   197                 except IndexError:
   198                     continue
   199 
   200                 port = int(port)
   201                 addr = (host, port)
   202 
   203                 for t, r in transcoders:
   204                     if r.client_address == addr:
   205                         try:
   206                             t.stop()
   207                         except Exception, e:
   208                             self.log.info("Plugin already stopped")
   209 
   210                         opts += self._create_html_item("%s: %s:%s" % (
   211                             t, addr[0], addr[1]))
   212 
   213                         break
   214 
   215                 self.wfile.write(utils.getHTML("stop_selected",
   216                                                {"menu": self._nav_items(),
   217                                                 "opts": opts}))
   218     # serve_stop_selected_transcoders()
   219 
   220 
   221     def serve_stop_transcoder(self, body):
   222         req = self.query.get("request", None)
   223         if req and "all" in req:
   224             self.serve_stop_all_transcoders(body)
   225         elif req:
   226             self.serve_stop_selected_transcoders(body, req)
   227         else:
   228             self.serve_status(body)
   229     # serve_stop_transcoder()
   230 
   231 
   232     def serve_status(self, body):
   233         self.send_response(200)
   234         self.send_header("Content-Type", "text/html")
   235         self.send_header('Connection', 'close')
   236         self.end_headers()
   237 
   238         if body:
   239             tl = self.server.get_transcoders()
   240             if not tl:
   241                 running = "<p>No running transcoder.</p>\n"
   242                 stopall = ""
   243                 stopone = ""
   244 
   245             elif self.query.get("ip") and self.query.get("file"):
   246                 for transcoder, request in tl:
   247                     filename = "%s" % self.query.get("file")[0]
   248                     tfilename = "%s" % transcoder.params_first("uri")
   249 
   250                     if tfilename.find(filename) >= 0 and \
   251                            request.client_address[0] == self.query.get("ip")[0]:
   252                         self.wfile.write("Status: %s %%" % transcoder.status)
   253                     return
   254 
   255             else:
   256                 running = "<p>Running transcoders:</p>\n"
   257                 stopall = self._create_html_item("<a href='%s?request=all'>"
   258                                                  "[STOP ALL]</a>" %
   259                                                  self.menu["Stop"])
   260 
   261                 for transcoder, request in tl:
   262                     stopone = self._create_html_item("%s: %s:%s<a href='%s?"
   263                                                      "request=%s:%s'>"
   264                                                      "[STOP]</a> - Status: %s%%"\
   265                                                      % (
   266                         transcoder, request.client_address[0],
   267                         request.client_address[1],
   268                         self.menu["Stop"], request.client_address[0],
   269                         request.client_address[1],
   270                         transcoder.status) )
   271 
   272             self.wfile.write(utils.getHTML("status",
   273                                            {"menu": self._nav_items(),
   274                                             "running": running,
   275                                             "stopall": stopall,
   276                                             "stopone": stopone}))
   277     # serve_status()
   278 
   279 
   280     def _get_transcoder(self):
   281         # get transcoder option: mencoder is the default
   282         request_transcoders = self.query.get("transcoder", ["mencoder"])
   283 
   284         for t in request_transcoders:
   285             transcoder = self.transcoders.get(t)
   286             if transcoder:
   287                 return transcoder
   288 
   289         if not transcoder:
   290             return self.transcoders[self.def_transcoder]
   291     # _get_transcoder()
   292 
   293 
   294     def serve_stream(self, body):
   295         transcoder = self._get_transcoder()
   296         try:
   297             obj = transcoder(self.query)
   298         except Exception, e:
   299             self.send_error(500, str(e))
   300             return
   301 
   302         self.send_response(200)
   303         self.send_header("Content-Type", obj.get_mimetype())
   304         self.send_header('Connection', 'close')
   305         self.end_headers()
   306 
   307         if body:
   308             self.server.add_transcoders(self, obj)
   309             obj.start(self.wfile)
   310             self.server.del_transcoders(self, obj)
   311     # serve_stream()
   312 
   313 
   314     def log_request(self, code='-', size='-'):
   315         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   316     # log_request()
   317 
   318 
   319     def log_error(self, format, *args):
   320         self.log.error("%s: %s" % (self.address_string(), format % args))
   321     # log_error()
   322 
   323 
   324     def log_message(self, format, *args):
   325         self.log.info("%s: %s" % (self.address_string(), format % args))
   326     # log_message()
   327 # RequestHandler
   328 
   329 
   330 
   331 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   332     log = log.getLogger("gms.server")
   333     run = True
   334     _transcoders = {}
   335     _lock = threading.RLock()
   336 
   337     def serve_forever(self):
   338         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   339                       self.socket.getsockname())
   340         try:
   341             while self.run:
   342                 self.handle_request()
   343         except KeyboardInterrupt, e:
   344             pass
   345 
   346         self.log.debug("Stopping all remaining transcoders...")
   347         self.stop_transcoders()
   348         self.log.debug("Transcoders stopped!")
   349     # serve_forever()
   350 
   351 
   352     def get_request(self):
   353         skt = self.socket
   354         old = skt.gettimeout()
   355         skt.settimeout(0.5)
   356         while self.run:
   357             try:
   358                 r = skt.accept()
   359                 skt.settimeout(old)
   360                 return r
   361             except socket.timeout, e:
   362                 pass
   363         raise socket.error("Not running")
   364     # get_request()
   365 
   366 
   367     def server_close(self):
   368         self.run = False
   369         self.stop_transcoders()
   370 
   371         BaseHTTPServer.HTTPServer.server_close(self)
   372     # server_close()
   373 
   374 
   375     def stop_transcoders(self):
   376         self._lock.acquire()
   377         for transcoder, request in self._transcoders.iteritems():
   378             self.log.info("Stop transcoder: %s, client=%s" %
   379                           (transcoder, request.client_address))
   380             transcoder.stop()
   381         self._lock.release()
   382     # stop_transcoders()
   383 
   384 
   385     def get_transcoders(self):
   386         self._lock.acquire()
   387         try:
   388             return self._transcoders.items()
   389         finally:
   390             self._lock.release()
   391     # get_transcoders()
   392 
   393 
   394     def add_transcoders(self, request, transcoder):
   395         self._lock.acquire()
   396         try:
   397             self._transcoders[transcoder] = request
   398         finally:
   399             self._lock.release()
   400     # add_transcoders()
   401 
   402 
   403     def del_transcoders(self, request, transcoder):
   404         self._lock.acquire()
   405         try:
   406             del self._transcoders[transcoder]
   407         finally:
   408             self._lock.release()
   409     # del_transcoders()
   410 # Server
   411 
   412 
   413 
   414 def serve_forever(host="0.0.0.0", port=40000):
   415     addr = (host, port)
   416     RequestHandler.protocol_version = "HTTP/1.0"
   417     httpd = Server(addr, RequestHandler)
   418     httpd.serve_forever()
   419 # serve_forever()
   420 
   421 
   422 def load_plugins_transcoders(directory):
   423     RequestHandler.load_plugins_transcoders(directory)
   424 # load_plugins_transcoders()