gmyth-stream/server/0.2/lib/server.py
author morphbr
Mon Apr 30 18:16:36 2007 +0100 (2007-04-30)
branchtrunk
changeset 605 4dd9bf602c18
parent 585 a1783dab9ba6
child 628 c6af9d7a88b5
permissions -rw-r--r--
[svn r611] * GMyth-Streamer
- updated mencoder plugin

* GMyth
- samples: gmyth_cat code update
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25     status = None
    26 
    27     def __init__(self, params):
    28         self.params = params
    29     # __init__()
    30 
    31 
    32     def params_first(self, key, default=None):
    33         if default is None:
    34             return self.params[key][0]
    35         else:
    36             try:
    37                 return self.params[key][0]
    38             except:
    39                 return default
    40     # params_first()
    41 
    42 
    43     def get_mimetype(self):
    44         mux = self.params_first("mux", "mpg")
    45 
    46         if mux == "mpeg":
    47             return "video/mpeg"
    48         elif mux == "avi":
    49             return "video/x-msvideo"
    50         else:
    51             return "application/octet-stream"
    52     # get_mimetype()
    53 
    54 
    55     def start(self, outfile):
    56         return True
    57     # start()
    58 
    59 
    60     def stop(self):
    61         return Tru
    62     # stop()
    63 
    64 
    65     def __str__(self):
    66         return '%s("%s", mux="%s", params=%s, addr=%s)' % \
    67                (self.__class__.__name__,
    68                 self.params_first("uri", "None"),
    69                 self.params_first("mux", "mpg"),
    70                 self.params,
    71                 repr(self))
    72     # __str__()
    73 # Transcoder
    74 
    75 
    76 
    77 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    78     log = log.getLogger("gms.request")
    79     def_transcoder = None
    80     transcoders = utils.PluginSet(Transcoder)
    81 
    82     menu = {
    83         "Status": "/status.do",
    84         "Stop": "/stop-transcoder.do",
    85         "Shutdown": "/shutdown.do"
    86         }
    87 
    88     @classmethod
    89     def load_plugins_transcoders(cls, directory):
    90         cls.transcoders.load_from_directory(directory)
    91 
    92         if cls.def_transcoder is None and cls.transcoders:
    93             cls.def_transcoder = cls.transcoders[0].name
    94     # load_plugins_transcoders()
    95 
    96 
    97     def do_dispatch(self, body):
    98         self.url = self.path
    99 
   100         pieces = urlparse.urlparse(self.path)
   101         self.path = pieces[2]
   102         self.query = cgi.parse_qs(pieces[4])
   103 
   104         if self.path == "/":
   105             self.serve_main(body)
   106         elif self.path == "/shutdown.do":
   107             self.serve_shutdown(body)
   108         elif self.path == "/stop-transcoder.do":
   109             self.serve_stop_transcoder(body)
   110         elif self.path == "/status.do":
   111             self.serve_status(body)
   112         elif self.path == "/play.do":
   113             self.serve_play(body)
   114         elif self.path == "/stream.do":
   115             self.serve_stream(body)
   116         else:
   117             self.send_error(404, "File not found")
   118     # do_dispatch()
   119 
   120 
   121     def do_GET(self):
   122         self.do_dispatch(True)
   123     # do_GET()
   124 
   125 
   126     def do_HEAD(self):
   127         self.do_dispatch(False)
   128     # do_HEAD()
   129 
   130 
   131     def _nav_items(self):
   132         ret = ""
   133         for name, url in self.menu.items():
   134             ret += utils.getHTML("menu", {"name": name, "url": url})
   135 
   136         return ret
   137     # _nav_items()
   138 
   139     def _create_html_item(self, opt):
   140         return "<li>%s</li>\n" % opt
   141     # _create_html_item
   142 
   143     def serve_main(self, body):
   144         self.send_response(200)
   145         self.send_header("Content-Type", "text/html")
   146         self.send_header('Connection', 'close')
   147         self.end_headers()
   148         if body:
   149             self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
   150     # serve_main()
   151 
   152 
   153     def serve_shutdown(self, body):
   154         self.send_response(200)
   155         self.send_header("Content-Type", "text/html")
   156         self.send_header('Connection', 'close')
   157         self.end_headers()
   158         if body:
   159             self.wfile.write(utils.getHTML("shutdown"))
   160         self.server.server_close()
   161     # serve_shutdown()
   162 
   163 
   164     def serve_stop_all_transcoders(self, body):
   165         self.send_response(200)
   166         self.send_header("Content-Type", "text/html")
   167         self.send_header('Connection', 'close')
   168         self.end_headers()
   169         if body:
   170             self.server.stop_transcoders()
   171             self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
   172     # serve_stop_all_transcoders()
   173 
   174 
   175     def serve_stop_selected_transcoders(self, body, requests):
   176         self.send_response(200)
   177         self.send_header("Content-Type", "text/html")
   178         self.send_header('Connection', 'close')
   179         self.end_headers()
   180         opts = ""
   181         if body:
   182             transcoders = self.server.get_transcoders()
   183 
   184             for req in requests:
   185                 try:
   186                     host, port = req.split(":")
   187                 except IndexError:
   188                     continue
   189 
   190                 port = int(port)
   191                 addr = (host, port)
   192 
   193                 for t, r in transcoders:
   194                     if r.client_address == addr:
   195                         try:
   196                             t.stop()
   197                         except Exception, e:
   198                             self.log.info("Plugin already stopped")
   199 
   200                         opts += self._create_html_item("%s: %s:%s" % (
   201                             t, addr[0], addr[1]))
   202 
   203                         break
   204 
   205                 self.wfile.write(utils.getHTML("stop_selected",
   206                                                {"menu": self._nav_items(),
   207                                                 "opts": opts}))
   208     # serve_stop_selected_transcoders()
   209 
   210 
   211     def serve_stop_transcoder(self, body):
   212         req = self.query.get("request", None)
   213         if req and "all" in req:
   214             self.serve_stop_all_transcoders(body)
   215         elif req:
   216             self.serve_stop_selected_transcoders(body, req)
   217         else:
   218             self.serve_status(body)
   219     # serve_stop_transcoder()
   220 
   221 
   222     def serve_status(self, body):
   223         self.send_response(200)
   224         self.send_header("Content-Type", "text/html")
   225         self.send_header('Connection', 'close')
   226         self.end_headers()
   227 
   228         if body:
   229             tl = self.server.get_transcoders()
   230             if not tl:
   231                 running = "<p>No running transcoder.</p>\n"
   232                 stopall = ""
   233                 stopone = ""
   234             else:
   235                 running = "<p>Running transcoders:</p>\n"
   236                 stopall = self._create_html_item("<a href='%s?request=all'>[STOP ALL]</a>" %
   237                                                  self.menu["Stop"])
   238 
   239                 for transcoder, request in tl:
   240                     stopone = self._create_html_item("%s: %s:%s<a href='%s?request=%s:%s'>"
   241                                                      "[STOP]</a> - Status: %s%%" % (
   242                         transcoder, request.client_address[0], request.client_address[1],
   243                         self.menu["Stop"], request.client_address[0], request.client_address[1],
   244                         transcoder.status) )
   245 
   246             self.wfile.write(utils.getHTML("status",
   247                                            {"menu": self._nav_items(),
   248                                             "running": running,
   249                                             "stopall": stopall,
   250                                             "stopone": stopone}))
   251     # serve_status()
   252 
   253 
   254     def _get_transcoder(self):
   255         # get transcoder option: mencoder is the default
   256         request_transcoders = self.query.get("transcoder", ["mencoder"])
   257 
   258         for t in request_transcoders:
   259             transcoder = self.transcoders.get(t)
   260             if transcoder:
   261                 return transcoder
   262 
   263         if not transcoder:
   264             return self.transcoders[self.def_transcoder]
   265     # _get_transcoder()
   266 
   267 
   268     def serve_stream(self, body):
   269         transcoder = self._get_transcoder()
   270         try:
   271             obj = transcoder(self.query)
   272         except Exception, e:
   273             self.send_error(500, str(e))
   274             return
   275 
   276         self.send_response(200)
   277         self.send_header("Content-Type", obj.get_mimetype())
   278         self.send_header('Connection', 'close')
   279         self.end_headers()
   280 
   281         if body:
   282             self.server.add_transcoders(self, obj)
   283             obj.start(self.wfile)
   284             self.server.del_transcoders(self, obj)
   285     # serve_stream()
   286 
   287 
   288     def log_request(self, code='-', size='-'):
   289         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   290     # log_request()
   291 
   292 
   293     def log_error(self, format, *args):
   294         self.log.error("%s: %s" % (self.address_string(), format % args))
   295     # log_error()
   296 
   297 
   298     def log_message(self, format, *args):
   299         self.log.info("%s: %s" % (self.address_string(), format % args))
   300     # log_message()
   301 # RequestHandler
   302 
   303 
   304 
   305 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   306     log = log.getLogger("gms.server")
   307     run = True
   308     _transcoders = {}
   309     _lock = threading.RLock()
   310 
   311     def serve_forever(self):
   312         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   313                       self.socket.getsockname())
   314         try:
   315             while self.run:
   316                 self.handle_request()
   317         except KeyboardInterrupt, e:
   318             pass
   319 
   320         self.log.debug("Stopping all remaining transcoders...")
   321         self.stop_transcoders()
   322         self.log.debug("Transcoders stopped!")
   323     # serve_forever()
   324 
   325 
   326     def get_request(self):
   327         skt = self.socket
   328         old = skt.gettimeout()
   329         skt.settimeout(0.5)
   330         while self.run:
   331             try:
   332                 r = skt.accept()
   333                 skt.settimeout(old)
   334                 return r
   335             except socket.timeout, e:
   336                 pass
   337         raise socket.error("Not running")
   338     # get_request()
   339 
   340 
   341     def server_close(self):
   342         self.run = False
   343         self.stop_transcoders()
   344 
   345         BaseHTTPServer.HTTPServer.server_close(self)
   346     # server_close()
   347 
   348 
   349     def stop_transcoders(self):
   350         self._lock.acquire()
   351         for transcoder, request in self._transcoders.iteritems():
   352             self.log.info("Stop transcoder: %s, client=%s" %
   353                           (transcoder, request.client_address))
   354             transcoder.stop()
   355         self._lock.release()
   356     # stop_transcoders()
   357 
   358 
   359     def get_transcoders(self):
   360         self._lock.acquire()
   361         try:
   362             return self._transcoders.items()
   363         finally:
   364             self._lock.release()
   365     # get_transcoders()
   366 
   367 
   368     def add_transcoders(self, request, transcoder):
   369         self._lock.acquire()
   370         try:
   371             self._transcoders[transcoder] = request
   372         finally:
   373             self._lock.release()
   374     # add_transcoders()
   375 
   376 
   377     def del_transcoders(self, request, transcoder):
   378         self._lock.acquire()
   379         try:
   380             del self._transcoders[transcoder]
   381         finally:
   382             self._lock.release()
   383     # del_transcoders()
   384 # Server
   385 
   386 
   387 
   388 def serve_forever(host="0.0.0.0", port=40000):
   389     addr = (host, port)
   390     RequestHandler.protocol_version = "HTTP/1.0"
   391     httpd = Server(addr, RequestHandler)
   392     httpd.serve_forever()
   393 # serve_forever()
   394 
   395 
   396 def load_plugins_transcoders(directory):
   397     RequestHandler.load_plugins_transcoders(directory)
   398 # load_plugins_transcoders()