gmyth-stream/server/0.2/lib/server.py
author rosfran
Wed Apr 18 23:13:26 2007 +0100 (2007-04-18)
branchtrunk
changeset 568 f5ef83bbe8b5
child 569 644a526d4086
permissions -rw-r--r--
[svn r573] New function to get the information about the program info path directiory.
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gmyth-stream.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25 
    26     def __init__(self, params):
    27         self.params = params
    28     # __init__()
    29 
    30 
    31     def params_first(self, key, default=None):
    32         if default is None:
    33             return self.params[key][0]
    34         else:
    35             try:
    36                 return self.params[key][0]
    37             except:
    38                 return default
    39     # params_first()
    40 
    41 
    42     def get_mimetype(self):
    43         mux = self.params_first("mux", "mpg")
    44 
    45         if mux == "mpeg":
    46             return "video/mpeg"
    47         elif mux == "avi":
    48             return "video/x-msvideo"
    49         else:
    50             return "application/octet-stream"
    51     # get_mimetype()
    52 
    53 
    54     def start(self, outfile):
    55         return True
    56     # start()
    57 
    58 
    59     def stop(self):
    60         return Tru
    61     # stop()
    62 
    63 
    64     def __str__(self):
    65         return '%s("%s", mux="%s", params=%s)' % \
    66                (self.__class__.__name__,
    67                 self.params_first("uri", "None"),
    68                 self.params_first("mux", "mpg"),
    69                 self.params)
    70     # __str__()
    71 # Transcoder
    72 
    73 
    74 
    75 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    76     log = log.getLogger("gmyth-stream.request")
    77     def_transcoder = None
    78     transcoders = utils.PluginSet(Transcoder)
    79 
    80     @classmethod
    81     def load_plugins_transcoders(cls, directory):
    82         cls.transcoders.load_from_directory(directory)
    83 
    84         if cls.def_transcoder is None and cls.transcoders:
    85             cls.def_transcoder = cls.transcoders[0].name
    86     # load_plugins_transcoders()
    87 
    88 
    89     def do_dispatch(self, body):
    90         self.url = self.path
    91 
    92         pieces = urlparse.urlparse(self.path)
    93         self.path = pieces[2]
    94         self.query = cgi.parse_qs(pieces[4])
    95 
    96         if self.path == "/":
    97             self.serve_main(body)
    98         elif self.path == "/shutdown.do":
    99             self.serve_shutdown(body)
   100         elif self.path == "/stop-transcoder.do":
   101             self.serve_stop_transcoder(body)
   102         elif self.path == "/status.do":
   103             self.serve_status(body)
   104         elif self.path == "/play.do":
   105             self.serve_play(body)
   106         elif self.path == "/stream.do":
   107             self.serve_stream(body)
   108         else:
   109             self.send_error(404, "File not found")
   110     # do_dispatch()
   111 
   112 
   113     def do_GET(self):
   114         self.do_dispatch(True)
   115     # do_GET()
   116 
   117 
   118     def do_HEAD(self):
   119         self.do_dispatch(False)
   120     # do_HEAD()
   121 
   122 
   123     def _nav_items(self):
   124         self.wfile.write("""\
   125    <li><a href="/play.do">Play</a></li>
   126    <li><a href="/status.do">Status</a></li>
   127    <li><a href="/stop-transcoder.do">Stop transcoders</a></li>
   128    <li><a href="/shutdown.do">Shutdown Server</a></li>
   129 """)
   130     # _nav_items()
   131 
   132 
   133     def serve_main(self, body):
   134         self.send_response(200)
   135         self.send_header("Content-Type", "text/html")
   136         self.send_header('Connection', 'close')
   137         self.end_headers()
   138         if body:
   139             self.wfile.write("""\
   140 <html>
   141    <head><title>Catota Server</title></head>
   142    <body>
   143 <h1>Welcome to Catota Server</h1>
   144 <ul>
   145 """)
   146             self._nav_items()
   147             self.wfile.write("""\
   148 </ul>
   149    </body>
   150 </html>
   151 """)
   152     # serve_main()
   153 
   154 
   155     def serve_play(self, body):
   156         self.send_response(200)
   157         self.send_header("Content-Type", "text/html")
   158         self.send_header('Connection', 'close')
   159         self.end_headers()
   160         if body:
   161             self.wfile.write("""\
   162 <html>
   163    <head><title>Catota Server</title></head>
   164    <body>
   165    <h1>Play</h1>
   166    <form action="/stream.do" method="GET">
   167       <input type="text" name="type" value="file" />://<input type="text" name="location" value=""/>
   168       <input type="submit" />
   169    </form>
   170    <ul>
   171 """)
   172             self._nav_items()
   173             self.wfile.write("""\
   174    </ul>
   175    </body>
   176 </html>
   177 """)
   178     # serve_play()
   179 
   180 
   181     def serve_shutdown(self, body):
   182         self.send_response(200)
   183         self.send_header("Content-Type", "text/html")
   184         self.send_header('Connection', 'close')
   185         self.end_headers()
   186         if body:
   187             self.wfile.write("""\
   188 <html>
   189    <head><title>Catota Server Exited</title></head>
   190    <body>
   191       <h1>Catota is not running anymore</h1>
   192    </body>
   193 </html>
   194 """)
   195         self.server.server_close()
   196     # serve_shutdown()
   197 
   198 
   199     def serve_stop_all_transcoders(self, body):
   200         self.send_response(200)
   201         self.send_header("Content-Type", "text/html")
   202         self.send_header('Connection', 'close')
   203         self.end_headers()
   204         if body:
   205             self.server.stop_transcoders()
   206             self.wfile.write("""\
   207 <html>
   208    <head><title>Catota Server Stopped Transcoders</title></head>
   209    <body>
   210       <h1>Catota stopped running transcoders</h1>
   211       <ul>
   212 """)
   213             self._nav_items()
   214             self.wfile.write("""\
   215       </ul>
   216    </body>
   217 </html>
   218     """)
   219     # serve_stop_all_transcoders()
   220 
   221 
   222     def serve_stop_selected_transcoders(self, body, requests):
   223         self.send_response(200)
   224         self.send_header("Content-Type", "text/html")
   225         self.send_header('Connection', 'close')
   226         self.end_headers()
   227         if body:
   228             self.wfile.write("""\
   229 <html>
   230    <head><title>Catota Server Stopped Transcoders</title></head>
   231    <body>
   232       <h1>Catota stopped running transcoders:</h1>
   233       <ul>
   234     """)
   235             transcoders = self.server.get_transcoders()
   236 
   237             for req in requests:
   238                 try:
   239                     host, port = req.split(":")
   240                 except IndexError:
   241                     continue
   242 
   243                 port = int(port)
   244                 addr = (host, port)
   245 
   246                 for t, r in transcoders:
   247                     if r.client_address == addr:
   248                         t.stop()
   249                         self.server.del_transcoders(self, t)
   250                         self.wfile.write("""\
   251          <li>%s: %s:%s</li>
   252 """ % (t, addr[0], addr[1]))
   253                         t.stop()
   254                         break
   255             self.wfile.write("""\
   256       </ul>
   257       <ul>
   258 """)
   259             self._nav_items()
   260             self.wfile.write("""\
   261       </ul>
   262    </body>
   263 </html>
   264 """)
   265     # serve_stop_selected_transcoders()
   266 
   267 
   268     def serve_stop_transcoder(self, body):
   269         req = self.query.get("request", None)
   270         if req and "all" in req:
   271             self.serve_stop_all_transcoders(body)
   272         elif req:
   273             self.serve_stop_selected_transcoders(body, req)
   274         else:
   275             self.serve_status(body)
   276     # serve_stop_transcoder()
   277 
   278 
   279     def serve_status(self, body):
   280         self.send_response(200)
   281         self.send_header("Content-Type", "text/html")
   282         self.send_header('Connection', 'close')
   283         self.end_headers()
   284         if body:
   285             self.wfile.write("""\
   286 <html>
   287    <head><title>Catota Server Status</title></head>
   288    <body>
   289       <h1>Catota Status</h1>
   290 """)
   291             tl = self.server.get_transcoders()
   292             if not tl:
   293                 self.wfile.write("<p>No running transcoder.</p>\n")
   294             else:
   295                 self.wfile.write("<p>Running transcoders:</p>\n")
   296                 self.wfile.write("""\
   297       <ul>
   298          <li><a href="/stop-transcoder.do?request=all">[STOP ALL]</a></li>
   299 """)
   300                 for transcoder, request in tl:
   301                     self.wfile.write("""\
   302       <li>%s: %s:%s <a href="/stop-transcoder.do?request=%s:%s">[STOP]</a></li>
   303 """ % (transcoder, request.client_address[0], request.client_address[1],
   304        request.client_address[0], request.client_address[1]))
   305 
   306                 self.wfile.write("""\
   307       </ul>
   308       <ul>
   309 """)
   310             self._nav_items()
   311             self.wfile.write("""\
   312       </ul>
   313    </body>
   314 </html>
   315 """)
   316     # serve_status()
   317 
   318 
   319     def _get_transcoder(self):
   320         # get transcoder option: mencoder is the default
   321         request_transcoders = self.query.get("transcoder", ["mencoder"])
   322 
   323         for t in request_transcoders:
   324             transcoder = self.transcoders.get(t)
   325             if transcoder:
   326                 return transcoder
   327 
   328         if not transcoder:
   329             return self.transcoders[self.def_transcoder]
   330     # _get_transcoder()
   331 
   332 
   333     def serve_stream(self, body):
   334         transcoder = self._get_transcoder()
   335         try:
   336             obj = transcoder(self.query)
   337         except Exception, e:
   338             self.send_error(500, str(e))
   339             return
   340 
   341         self.send_response(200)
   342         self.send_header("Content-Type", obj.get_mimetype())
   343         self.send_header('Connection', 'close')
   344         self.end_headers()
   345 
   346         if body:
   347             self.server.add_transcoders(self, obj)
   348             obj.start(self.wfile)
   349             self.server.del_transcoders(self, obj)
   350     # serve_stream()
   351 
   352 
   353     def log_request(self, code='-', size='-'):
   354         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   355     # log_request()
   356 
   357 
   358     def log_error(self, format, *args):
   359         self.log.error("%s: %s" % (self.address_string(), format % args))
   360     # log_error()
   361 
   362 
   363     def log_message(self, format, *args):
   364         self.log.info("%s: %s" % (self.address_string(), format % args))
   365     # log_message()
   366 # RequestHandler
   367 
   368 
   369 
   370 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   371     log = log.getLogger("gmyth-streamer.server")
   372     run = True
   373     _transcoders = {}
   374     _lock = threading.RLock()
   375 
   376     def serve_forever(self):
   377         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   378                       self.socket.getsockname())
   379         try:
   380             while self.run:
   381                 self.handle_request()
   382         except KeyboardInterrupt, e:
   383             pass
   384 
   385         self.log.debug("Stopping all remaining transcoders...")
   386         self.stop_transcoders()
   387         self.log.debug("Transcoders stopped!")
   388     # serve_forever()
   389 
   390 
   391     def get_request(self):
   392         skt = self.socket
   393         old = skt.gettimeout()
   394         skt.settimeout(0.5)
   395         while self.run:
   396             try:
   397                 r = skt.accept()
   398                 skt.settimeout(old)
   399                 return r
   400             except socket.timeout, e:
   401                 pass
   402         raise socket.error("Not running")
   403     # get_request()
   404 
   405 
   406     def server_close(self):
   407         self.run = False
   408         self.stop_transcoders()
   409 
   410         BaseHTTPServer.HTTPServer.server_close(self)
   411     # server_close()
   412 
   413 
   414     def stop_transcoders(self):
   415         self._lock.acquire()
   416         for transcoder, request in self._transcoders.iteritems():
   417             self.log.info("Stop transcoder: %s, client=%s" %
   418                           (transcoder, request.client_address))
   419             transcoder.stop()
   420         self._lock.release()
   421     # stop_transcoders()
   422 
   423 
   424     def get_transcoders(self):
   425         self._lock.acquire()
   426         try:
   427             return self._transcoders.items()
   428         finally:
   429             self._lock.release()
   430     # get_transcoders()
   431 
   432 
   433     def add_transcoders(self, request, transcoder):
   434         self._lock.acquire()
   435         try:
   436             self._transcoders[transcoder] = request
   437         finally:
   438             self._lock.release()
   439     # add_transcoders()
   440 
   441 
   442     def del_transcoders(self, request, transcoder):
   443         self._lock.acquire()
   444         try:
   445             del self._transcoders[transcoder]
   446         finally:
   447             self._lock.release()
   448     # del_transcoders()
   449 # Server
   450 
   451 
   452 
   453 def serve_forever(host="0.0.0.0", port=40000):
   454     addr = (host, port)
   455 
   456     RequestHandler.protocol_version = "HTTP/1.0"
   457     httpd = Server(addr, RequestHandler)
   458     httpd.serve_forever()
   459 # serve_forever()
   460 
   461 
   462 def load_plugins_transcoders(directory):
   463     RequestHandler.load_plugins_transcoders(directory)
   464 # load_plugins_transcoders()