gmyth-stream/server/0.2/lib/server.py
author morphbr
Thu Apr 19 15:13:48 2007 +0100 (2007-04-19)
branchtrunk
changeset 569 644a526d4086
parent 565 ed34b1dab103
child 571 f33b61b9d8a5
permissions -rw-r--r--
[svn r574] * GMyth-Streamer
- Updated mencoder plugin (0.2)
- Updated core engine (0.2)
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gmyth-stream.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25 
    26     def __init__(self, params):
    27         self.params = params
    28     # __init__()
    29 
    30 
    31     def params_first(self, key, default=None):
    32         if default is None:
    33             return self.params[key][0]
    34         else:
    35             try:
    36                 return self.params[key][0]
    37             except:
    38                 return default
    39     # params_first()
    40 
    41 
    42     def get_mimetype(self):
    43         mux = self.params_first("mux", "mpg")
    44 
    45         if mux == "mpeg":
    46             return "video/mpeg"
    47         elif mux == "avi":
    48             return "video/x-msvideo"
    49         else:
    50             return "application/octet-stream"
    51     # get_mimetype()
    52 
    53 
    54     def start(self, outfile):
    55         return True
    56     # start()
    57 
    58 
    59     def stop(self):
    60         return Tru
    61     # stop()
    62 
    63 
    64     def __str__(self):
    65         return '%s("%s", mux="%s", params=%s)' % \
    66                (self.__class__.__name__,
    67                 self.params_first("uri", "None"),
    68                 self.params_first("mux", "mpg"),
    69                 self.params)
    70     # __str__()
    71 # Transcoder
    72 
    73 
    74 
    75 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    76     log = log.getLogger("gmyth-stream.request")
    77     def_transcoder = None
    78     transcoders = utils.PluginSet(Transcoder)
    79 
    80     @classmethod
    81     def load_plugins_transcoders(cls, directory):
    82         cls.transcoders.load_from_directory(directory)
    83 
    84         if cls.def_transcoder is None and cls.transcoders:
    85             cls.def_transcoder = cls.transcoders[0].name
    86     # load_plugins_transcoders()
    87 
    88 
    89     def do_dispatch(self, body):
    90         self.url = self.path
    91 
    92         pieces = urlparse.urlparse(self.path)
    93         self.path = pieces[2]
    94         self.query = cgi.parse_qs(pieces[4])
    95 
    96         if self.path == "/":
    97             self.serve_main(body)
    98         elif self.path == "/shutdown.do":
    99             self.serve_shutdown(body)
   100         elif self.path == "/stop-transcoder.do":
   101             self.serve_stop_transcoder(body)
   102         elif self.path == "/status.do":
   103             self.serve_status(body)
   104         elif self.path == "/play.do":
   105             self.serve_play(body)
   106         elif self.path == "/stream.do":
   107             self.serve_stream(body)
   108         else:
   109             self.send_error(404, "File not found")
   110     # do_dispatch()
   111 
   112 
   113     def do_GET(self):
   114         self.do_dispatch(True)
   115     # do_GET()
   116 
   117 
   118     def do_HEAD(self):
   119         self.do_dispatch(False)
   120     # do_HEAD()
   121 
   122 
   123     def _nav_items(self):
   124         self.wfile.write("""\
   125    <li><a href="/play.do">Play</a></li>
   126    <li><a href="/status.do">Status</a></li>
   127    <li><a href="/stop-transcoder.do">Stop transcoders</a></li>
   128    <li><a href="/shutdown.do">Shutdown Server</a></li>
   129 """)
   130     # _nav_items()
   131 
   132 
   133     def serve_main(self, body):
   134         self.send_response(200)
   135         self.send_header("Content-Type", "text/html")
   136         self.send_header('Connection', 'close')
   137         self.end_headers()
   138         if body:
   139             self.wfile.write("""\
   140 <html>
   141    <head><title>GMyth-Streamer Server</title></head>
   142    <body>
   143 <h1>Welcome to GMyth-Streamer Server</h1>
   144 <ul>
   145 """)
   146             self._nav_items()
   147             self.wfile.write("""\
   148 </ul>
   149    </body>
   150 </html>
   151 """)
   152     # serve_main()
   153 
   154 
   155     def serve_play(self, body):
   156         self.send_response(200)
   157         self.send_header("Content-Type", "text/html")
   158         self.send_header('Connection', 'close')
   159         self.end_headers()
   160         if body:
   161             self.wfile.write("""\
   162 <html>
   163    <head><title>GMyth-Streamer Server</title></head>
   164    <body>
   165    <h1>Play</h1>
   166    <form action="/stream.do" method="GET">
   167       <input type="text" name="type" value="file" />://<input type="text" name="location" value=""/>
   168       <input type="submit" />
   169    </form>
   170    <ul>
   171 """)
   172             self._nav_items()
   173             self.wfile.write("""\
   174    </ul>
   175    </body>
   176 </html>
   177 """)
   178     # serve_play()
   179 
   180 
   181     def serve_shutdown(self, body):
   182         self.send_response(200)
   183         self.send_header("Content-Type", "text/html")
   184         self.send_header('Connection', 'close')
   185         self.end_headers()
   186         if body:
   187             self.wfile.write("""\
   188 <html>
   189    <head><title>GMyth-Streamer Server Exited</title></head>
   190    <body>
   191       <h1>GMyth-Streamer is not running anymore</h1>
   192    </body>
   193 </html>
   194 """)
   195         self.server.server_close()
   196     # serve_shutdown()
   197 
   198 
   199     def serve_stop_all_transcoders(self, body):
   200         self.send_response(200)
   201         self.send_header("Content-Type", "text/html")
   202         self.send_header('Connection', 'close')
   203         self.end_headers()
   204         if body:
   205             self.server.stop_transcoders()
   206             self.wfile.write("""\
   207 <html>
   208    <head><title>GMyth-Streamer Server Stopped Transcoders</title></head>
   209    <body>
   210       <h1>GMyth-Streamer stopped running transcoders</h1>
   211       <ul>
   212 """)
   213             self._nav_items()
   214             self.wfile.write("""\
   215       </ul>
   216    </body>
   217 </html>
   218     """)
   219     # serve_stop_all_transcoders()
   220 
   221 
   222     def serve_stop_selected_transcoders(self, body, requests):
   223         self.send_response(200)
   224         self.send_header("Content-Type", "text/html")
   225         self.send_header('Connection', 'close')
   226         self.end_headers()
   227         if body:
   228             self.wfile.write("""\
   229 <html>
   230    <head><title>GMyth-Streamer Server Stopped Transcoders</title></head>
   231    <body>
   232       <h1>GMyth-Streamer stopped running transcoders:</h1>
   233       <ul>
   234     """)
   235             transcoders = self.server.get_transcoders()
   236 
   237             for req in requests:
   238                 try:
   239                     host, port = req.split(":")
   240                 except IndexError:
   241                     continue
   242 
   243                 port = int(port)
   244                 addr = (host, port)
   245 
   246                 for t, r in transcoders:
   247                     if r.client_address == addr:
   248                         t.stop()
   249                         self.wfile.write("""\
   250          <li>%s: %s:%s</li>
   251 """ % (t, addr[0], addr[1]))
   252                         t.stop()
   253                         break
   254             self.wfile.write("""\
   255       </ul>
   256       <ul>
   257 """)
   258             self._nav_items()
   259             self.wfile.write("""\
   260       </ul>
   261    </body>
   262 </html>
   263 """)
   264     # serve_stop_selected_transcoders()
   265 
   266 
   267     def serve_stop_transcoder(self, body):
   268         req = self.query.get("request", None)
   269         if req and "all" in req:
   270             self.serve_stop_all_transcoders(body)
   271         elif req:
   272             self.serve_stop_selected_transcoders(body, req)
   273         else:
   274             self.serve_status(body)
   275     # serve_stop_transcoder()
   276 
   277 
   278     def serve_status(self, body):
   279         self.send_response(200)
   280         self.send_header("Content-Type", "text/html")
   281         self.send_header('Connection', 'close')
   282         self.end_headers()
   283         if body:
   284             self.wfile.write("""\
   285 <html>
   286    <head><title>GMyth-Streamer Server Status</title></head>
   287    <body>
   288       <h1>GMyth-Streamer Status</h1>
   289 """)
   290             tl = self.server.get_transcoders()
   291             if not tl:
   292                 self.wfile.write("<p>No running transcoder.</p>\n")
   293             else:
   294                 self.wfile.write("<p>Running transcoders:</p>\n")
   295                 self.wfile.write("""\
   296       <ul>
   297          <li><a href="/stop-transcoder.do?request=all">[STOP ALL]</a></li>
   298 """)
   299                 for transcoder, request in tl:
   300                     self.wfile.write("""\
   301       <li>%s: %s:%s <a href="/stop-transcoder.do?request=%s:%s">[STOP]</a></li>
   302 """ % (transcoder, request.client_address[0], request.client_address[1],
   303        request.client_address[0], request.client_address[1]))
   304 
   305                 self.wfile.write("""\
   306       </ul>
   307       <ul>
   308 """)
   309             self._nav_items()
   310             self.wfile.write("""\
   311       </ul>
   312    </body>
   313 </html>
   314 """)
   315     # serve_status()
   316 
   317 
   318     def _get_transcoder(self):
   319         # get transcoder option: mencoder is the default
   320         request_transcoders = self.query.get("transcoder", ["mencoder"])
   321 
   322         for t in request_transcoders:
   323             transcoder = self.transcoders.get(t)
   324             if transcoder:
   325                 return transcoder
   326 
   327         if not transcoder:
   328             return self.transcoders[self.def_transcoder]
   329     # _get_transcoder()
   330 
   331 
   332     def serve_stream(self, body):
   333         transcoder = self._get_transcoder()
   334         try:
   335             obj = transcoder(self.query)
   336         except Exception, e:
   337             self.send_error(500, str(e))
   338             return
   339 
   340         self.send_response(200)
   341         self.send_header("Content-Type", obj.get_mimetype())
   342         self.send_header('Connection', 'close')
   343         self.end_headers()
   344 
   345         if body:
   346             self.server.add_transcoders(self, obj)
   347             obj.start(self.wfile)
   348             self.server.del_transcoders(self, obj)
   349     # serve_stream()
   350 
   351 
   352     def log_request(self, code='-', size='-'):
   353         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   354     # log_request()
   355 
   356 
   357     def log_error(self, format, *args):
   358         self.log.error("%s: %s" % (self.address_string(), format % args))
   359     # log_error()
   360 
   361 
   362     def log_message(self, format, *args):
   363         self.log.info("%s: %s" % (self.address_string(), format % args))
   364     # log_message()
   365 # RequestHandler
   366 
   367 
   368 
   369 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   370     log = log.getLogger("gmyth-streamer.server")
   371     run = True
   372     _transcoders = {}
   373     _lock = threading.RLock()
   374 
   375     def serve_forever(self):
   376         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   377                       self.socket.getsockname())
   378         try:
   379             while self.run:
   380                 self.handle_request()
   381         except KeyboardInterrupt, e:
   382             pass
   383 
   384         self.log.debug("Stopping all remaining transcoders...")
   385         self.stop_transcoders()
   386         self.log.debug("Transcoders stopped!")
   387     # serve_forever()
   388 
   389 
   390     def get_request(self):
   391         skt = self.socket
   392         old = skt.gettimeout()
   393         skt.settimeout(0.5)
   394         while self.run:
   395             try:
   396                 r = skt.accept()
   397                 skt.settimeout(old)
   398                 return r
   399             except socket.timeout, e:
   400                 pass
   401         raise socket.error("Not running")
   402     # get_request()
   403 
   404 
   405     def server_close(self):
   406         self.run = False
   407         self.stop_transcoders()
   408 
   409         BaseHTTPServer.HTTPServer.server_close(self)
   410     # server_close()
   411 
   412 
   413     def stop_transcoders(self):
   414         self._lock.acquire()
   415         for transcoder, request in self._transcoders.iteritems():
   416             self.log.info("Stop transcoder: %s, client=%s" %
   417                           (transcoder, request.client_address))
   418             transcoder.stop()
   419         self._lock.release()
   420     # stop_transcoders()
   421 
   422 
   423     def get_transcoders(self):
   424         self._lock.acquire()
   425         try:
   426             return self._transcoders.items()
   427         finally:
   428             self._lock.release()
   429     # get_transcoders()
   430 
   431 
   432     def add_transcoders(self, request, transcoder):
   433         self._lock.acquire()
   434         try:
   435             self._transcoders[transcoder] = request
   436         finally:
   437             self._lock.release()
   438     # add_transcoders()
   439 
   440 
   441     def del_transcoders(self, request, transcoder):
   442         self._lock.acquire()
   443         try:
   444             del self._transcoders[transcoder]
   445         finally:
   446             self._lock.release()
   447     # del_transcoders()
   448 # Server
   449 
   450 
   451 
   452 def serve_forever(host="0.0.0.0", port=40000):
   453     addr = (host, port)
   454 
   455     RequestHandler.protocol_version = "HTTP/1.0"
   456     httpd = Server(addr, RequestHandler)
   457     httpd.serve_forever()
   458 # serve_forever()
   459 
   460 
   461 def load_plugins_transcoders(directory):
   462     RequestHandler.load_plugins_transcoders(directory)
   463 # load_plugins_transcoders()