gmyth-stream/server/0.2/lib/server.py
author rosfran
Fri Apr 20 16:01:39 2007 +0100 (2007-04-20)
branchtrunk
changeset 580 7efc73147f5e
parent 574 2dfea125c76c
child 585 a1783dab9ba6
permissions -rw-r--r--
[svn r585] New function to get the channel list from database.
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gms.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25 
    26     def __init__(self, params):
    27         self.params = params
    28     # __init__()
    29 
    30 
    31     def params_first(self, key, default=None):
    32         if default is None:
    33             return self.params[key][0]
    34         else:
    35             try:
    36                 return self.params[key][0]
    37             except:
    38                 return default
    39     # params_first()
    40 
    41 
    42     def get_mimetype(self):
    43         mux = self.params_first("mux", "mpg")
    44 
    45         if mux == "mpeg":
    46             return "video/mpeg"
    47         elif mux == "avi":
    48             return "video/x-msvideo"
    49         else:
    50             return "application/octet-stream"
    51     # get_mimetype()
    52 
    53 
    54     def start(self, outfile):
    55         return True
    56     # start()
    57 
    58 
    59     def stop(self):
    60         return Tru
    61     # stop()
    62 
    63 
    64     def __str__(self):
    65         return '%s("%s", mux="%s", params=%s, addr=%s)' % \
    66                (self.__class__.__name__,
    67                 self.params_first("uri", "None"),
    68                 self.params_first("mux", "mpg"),
    69                 self.params,
    70                 repr(self))
    71     # __str__()
    72 # Transcoder
    73 
    74 
    75 
    76 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    77     log = log.getLogger("gms.request")
    78     def_transcoder = None
    79     transcoders = utils.PluginSet(Transcoder)
    80 
    81     @classmethod
    82     def load_plugins_transcoders(cls, directory):
    83         cls.transcoders.load_from_directory(directory)
    84 
    85         if cls.def_transcoder is None and cls.transcoders:
    86             cls.def_transcoder = cls.transcoders[0].name
    87     # load_plugins_transcoders()
    88 
    89 
    90     def do_dispatch(self, body):
    91         self.url = self.path
    92 
    93         pieces = urlparse.urlparse(self.path)
    94         self.path = pieces[2]
    95         self.query = cgi.parse_qs(pieces[4])
    96 
    97         if self.path == "/":
    98             self.serve_main(body)
    99         elif self.path == "/shutdown.do":
   100             self.serve_shutdown(body)
   101         elif self.path == "/stop-transcoder.do":
   102             self.serve_stop_transcoder(body)
   103         elif self.path == "/status.do":
   104             self.serve_status(body)
   105         elif self.path == "/play.do":
   106             self.serve_play(body)
   107         elif self.path == "/stream.do":
   108             self.serve_stream(body)
   109         else:
   110             self.send_error(404, "File not found")
   111     # do_dispatch()
   112 
   113 
   114     def do_GET(self):
   115         self.do_dispatch(True)
   116     # do_GET()
   117 
   118 
   119     def do_HEAD(self):
   120         self.do_dispatch(False)
   121     # do_HEAD()
   122 
   123 
   124     def _nav_items(self):
   125         self.wfile.write("""\
   126    <li><a href="/play.do">Play</a></li>
   127    <li><a href="/status.do">Status</a></li>
   128    <li><a href="/stop-transcoder.do">Stop transcoders</a></li>
   129    <li><a href="/shutdown.do">Shutdown Server</a></li>
   130 """)
   131     # _nav_items()
   132 
   133 
   134     def serve_main(self, body):
   135         self.send_response(200)
   136         self.send_header("Content-Type", "text/html")
   137         self.send_header('Connection', 'close')
   138         self.end_headers()
   139         if body:
   140             self.wfile.write("""\
   141 <html>
   142    <head><title>GMyth-Streamer Server</title></head>
   143    <body>
   144 <h1>Welcome to GMyth-Streamer Server</h1>
   145 <ul>
   146 """)
   147             self._nav_items()
   148             self.wfile.write("""\
   149 </ul>
   150    </body>
   151 </html>
   152 """)
   153     # serve_main()
   154 
   155 
   156     def serve_play(self, body):
   157         self.send_response(200)
   158         self.send_header("Content-Type", "text/html")
   159         self.send_header('Connection', 'close')
   160         self.end_headers()
   161         if body:
   162             self.wfile.write("""\
   163 <html>
   164    <head><title>GMyth-Streamer Server</title></head>
   165    <body>
   166    <h1>Play</h1>
   167    <form action="/stream.do" method="GET">
   168       <input type="text" name="type" value="file" />://<input type="text" name="location" value=""/>
   169       <input type="submit" />
   170    </form>
   171    <ul>
   172 """)
   173             self._nav_items()
   174             self.wfile.write("""\
   175    </ul>
   176    </body>
   177 </html>
   178 """)
   179     # serve_play()
   180 
   181 
   182     def serve_shutdown(self, body):
   183         self.send_response(200)
   184         self.send_header("Content-Type", "text/html")
   185         self.send_header('Connection', 'close')
   186         self.end_headers()
   187         if body:
   188             self.wfile.write("""\
   189 <html>
   190    <head><title>GMyth-Streamer Server Exited</title></head>
   191    <body>
   192       <h1>GMyth-Streamer is not running anymore</h1>
   193    </body>
   194 </html>
   195 """)
   196         self.server.server_close()
   197     # serve_shutdown()
   198 
   199 
   200     def serve_stop_all_transcoders(self, body):
   201         self.send_response(200)
   202         self.send_header("Content-Type", "text/html")
   203         self.send_header('Connection', 'close')
   204         self.end_headers()
   205         if body:
   206             self.server.stop_transcoders()
   207             self.wfile.write("""\
   208 <html>
   209    <head><title>GMyth-Streamer Server Stopped Transcoders</title></head>
   210    <body>
   211       <h1>GMyth-Streamer stopped running transcoders</h1>
   212       <ul>
   213 """)
   214             self._nav_items()
   215             self.wfile.write("""\
   216       </ul>
   217    </body>
   218 </html>
   219     """)
   220     # serve_stop_all_transcoders()
   221 
   222 
   223     def serve_stop_selected_transcoders(self, body, requests):
   224         self.send_response(200)
   225         self.send_header("Content-Type", "text/html")
   226         self.send_header('Connection', 'close')
   227         self.end_headers()
   228         if body:
   229             self.wfile.write("""\
   230 <html>
   231    <head><title>GMyth-Streamer Server Stopped Transcoders</title></head>
   232    <body>
   233       <h1>GMyth-Streamer stopped running transcoders:</h1>
   234       <ul>
   235     """)
   236             transcoders = self.server.get_transcoders()
   237 
   238             for req in requests:
   239                 try:
   240                     host, port = req.split(":")
   241                 except IndexError:
   242                     continue
   243 
   244                 port = int(port)
   245                 addr = (host, port)
   246 
   247                 for t, r in transcoders:
   248                     if r.client_address == addr:
   249                         try:
   250                             t.stop()
   251                         except Exception, e:
   252                             self.log.info("Plugin already stopped")
   253 
   254                         self.wfile.write("""\
   255          <li>%s: %s:%s</li>
   256 """ % (t, addr[0], addr[1]))
   257                         break
   258             self.wfile.write("""\
   259       </ul>
   260       <ul>
   261 """)
   262             self._nav_items()
   263             self.wfile.write("""\
   264       </ul>
   265    </body>
   266 </html>
   267 """)
   268     # serve_stop_selected_transcoders()
   269 
   270 
   271     def serve_stop_transcoder(self, body):
   272         req = self.query.get("request", None)
   273         if req and "all" in req:
   274             self.serve_stop_all_transcoders(body)
   275         elif req:
   276             self.serve_stop_selected_transcoders(body, req)
   277         else:
   278             self.serve_status(body)
   279     # serve_stop_transcoder()
   280 
   281 
   282     def serve_status(self, body):
   283         self.send_response(200)
   284         self.send_header("Content-Type", "text/html")
   285         self.send_header('Connection', 'close')
   286         self.end_headers()
   287         if body:
   288             self.wfile.write("""\
   289 <html>
   290    <head><title>GMyth-Streamer Server Status</title></head>
   291    <body>
   292       <h1>GMyth-Streamer Status</h1>
   293 """)
   294             tl = self.server.get_transcoders()
   295             if not tl:
   296                 self.wfile.write("<p>No running transcoder.</p>\n")
   297             else:
   298                 self.wfile.write("<p>Running transcoders:</p>\n")
   299                 self.wfile.write("""\
   300       <ul>
   301          <li><a href="/stop-transcoder.do?request=all">[STOP ALL]</a></li>
   302 """)
   303                 for transcoder, request in tl:
   304                     self.wfile.write("""\
   305       <li>%s: %s:%s <a href="/stop-transcoder.do?request=%s:%s">[STOP]</a></li>
   306 """ % (transcoder, request.client_address[0], request.client_address[1],
   307        request.client_address[0], request.client_address[1]))
   308 
   309                 self.wfile.write("""\
   310       </ul>
   311       <ul>
   312 """)
   313             self._nav_items()
   314             self.wfile.write("""\
   315       </ul>
   316    </body>
   317 </html>
   318 """)
   319     # serve_status()
   320 
   321 
   322     def _get_transcoder(self):
   323         # get transcoder option: mencoder is the default
   324         request_transcoders = self.query.get("transcoder", ["mencoder"])
   325 
   326         for t in request_transcoders:
   327             transcoder = self.transcoders.get(t)
   328             if transcoder:
   329                 return transcoder
   330 
   331         if not transcoder:
   332             return self.transcoders[self.def_transcoder]
   333     # _get_transcoder()
   334 
   335 
   336     def serve_stream(self, body):
   337         transcoder = self._get_transcoder()
   338         try:
   339             obj = transcoder(self.query)
   340         except Exception, e:
   341             self.send_error(500, str(e))
   342             return
   343 
   344         self.send_response(200)
   345         self.send_header("Content-Type", obj.get_mimetype())
   346         self.send_header('Connection', 'close')
   347         self.end_headers()
   348 
   349         if body:
   350             self.server.add_transcoders(self, obj)
   351             obj.start(self.wfile)
   352             self.server.del_transcoders(self, obj)
   353     # serve_stream()
   354 
   355 
   356     def log_request(self, code='-', size='-'):
   357         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   358     # log_request()
   359 
   360 
   361     def log_error(self, format, *args):
   362         self.log.error("%s: %s" % (self.address_string(), format % args))
   363     # log_error()
   364 
   365 
   366     def log_message(self, format, *args):
   367         self.log.info("%s: %s" % (self.address_string(), format % args))
   368     # log_message()
   369 # RequestHandler
   370 
   371 
   372 
   373 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   374     log = log.getLogger("gms.server")
   375     run = True
   376     _transcoders = {}
   377     _lock = threading.RLock()
   378 
   379     def serve_forever(self):
   380         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   381                       self.socket.getsockname())
   382         try:
   383             while self.run:
   384                 self.handle_request()
   385         except KeyboardInterrupt, e:
   386             pass
   387 
   388         self.log.debug("Stopping all remaining transcoders...")
   389         self.stop_transcoders()
   390         self.log.debug("Transcoders stopped!")
   391     # serve_forever()
   392 
   393 
   394     def get_request(self):
   395         skt = self.socket
   396         old = skt.gettimeout()
   397         skt.settimeout(0.5)
   398         while self.run:
   399             try:
   400                 r = skt.accept()
   401                 skt.settimeout(old)
   402                 return r
   403             except socket.timeout, e:
   404                 pass
   405         raise socket.error("Not running")
   406     # get_request()
   407 
   408 
   409     def server_close(self):
   410         self.run = False
   411         self.stop_transcoders()
   412 
   413         BaseHTTPServer.HTTPServer.server_close(self)
   414     # server_close()
   415 
   416 
   417     def stop_transcoders(self):
   418         self._lock.acquire()
   419         for transcoder, request in self._transcoders.iteritems():
   420             self.log.info("Stop transcoder: %s, client=%s" %
   421                           (transcoder, request.client_address))
   422             transcoder.stop()
   423         self._lock.release()
   424     # stop_transcoders()
   425 
   426 
   427     def get_transcoders(self):
   428         self._lock.acquire()
   429         try:
   430             return self._transcoders.items()
   431         finally:
   432             self._lock.release()
   433     # get_transcoders()
   434 
   435 
   436     def add_transcoders(self, request, transcoder):
   437         self._lock.acquire()
   438         try:
   439             self._transcoders[transcoder] = request
   440         finally:
   441             self._lock.release()
   442     # add_transcoders()
   443 
   444 
   445     def del_transcoders(self, request, transcoder):
   446         self._lock.acquire()
   447         try:
   448             del self._transcoders[transcoder]
   449         finally:
   450             self._lock.release()
   451     # del_transcoders()
   452 # Server
   453 
   454 
   455 
   456 def serve_forever(host="0.0.0.0", port=40000):
   457     addr = (host, port)
   458 
   459     RequestHandler.protocol_version = "HTTP/1.0"
   460     httpd = Server(addr, RequestHandler)
   461     httpd.serve_forever()
   462 # serve_forever()
   463 
   464 
   465 def load_plugins_transcoders(directory):
   466     RequestHandler.load_plugins_transcoders(directory)
   467 # load_plugins_transcoders()