gmyth-stream/server/0.2/lib/server.py
author renatofilho
Thu Apr 19 16:14:41 2007 +0100 (2007-04-19)
branchtrunk
changeset 573 f4927ca53da7
parent 571 f33b61b9d8a5
child 574 2dfea125c76c
permissions -rw-r--r--
[svn r578] * GMyth-Streamer:
- Fixed problems with gstreamer plugin
     1 #!/usr/bin/env python
     2 
     3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
     4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
     5 __license__ = "GPL"
     6 __version__ = "0.3"
     7 
     8 import os
     9 import threading
    10 import SocketServer
    11 import BaseHTTPServer
    12 import socket
    13 import urlparse
    14 import cgi
    15 import lib.utils as utils
    16 import logging as log
    17 
    18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
    19            "load_plugins_transcoders")
    20 
    21 class Transcoder(object):
    22     log = log.getLogger("gmyth-stream.transcoder")
    23     priority = 0   # negative values have higher priorities
    24     name = None # to be used in requests
    25 
    26     def __init__(self, params):
    27         self.params = params
    28     # __init__()
    29 
    30 
    31     def params_first(self, key, default=None):
    32         if default is None:
    33             return self.params[key][0]
    34         else:
    35             try:
    36                 return self.params[key][0]
    37             except:
    38                 return default
    39     # params_first()
    40 
    41 
    42     def get_mimetype(self):
    43         mux = self.params_first("mux", "mpg")
    44 
    45         if mux == "mpeg":
    46             return "video/mpeg"
    47         elif mux == "avi":
    48             return "video/x-msvideo"
    49         else:
    50             return "application/octet-stream"
    51     # get_mimetype()
    52 
    53 
    54     def start(self, outfile):
    55         return True
    56     # start()
    57 
    58 
    59     def stop(self):
    60         return Tru
    61     # stop()
    62 
    63 
    64     def __str__(self):
    65         return '%s("%s", mux="%s", params=%s)' % \
    66                (self.__class__.__name__,
    67                 self.params_first("uri", "None"),
    68                 self.params_first("mux", "mpg"),
    69                 self.params)
    70     # __str__()
    71 # Transcoder
    72 
    73 
    74 
    75 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    76     log = log.getLogger("gmyth-stream.request")
    77     def_transcoder = None
    78     transcoders = utils.PluginSet(Transcoder)
    79 
    80     @classmethod
    81     def load_plugins_transcoders(cls, directory):
    82         cls.transcoders.load_from_directory(directory)
    83 
    84         if cls.def_transcoder is None and cls.transcoders:
    85             cls.def_transcoder = cls.transcoders[0].name
    86     # load_plugins_transcoders()
    87 
    88 
    89     def do_dispatch(self, body):
    90         self.url = self.path
    91 
    92         pieces = urlparse.urlparse(self.path)
    93         self.path = pieces[2]
    94         self.query = cgi.parse_qs(pieces[4])
    95 
    96         if self.path == "/":
    97             self.serve_main(body)
    98         elif self.path == "/shutdown.do":
    99             self.serve_shutdown(body)
   100         elif self.path == "/stop-transcoder.do":
   101             self.serve_stop_transcoder(body)
   102         elif self.path == "/status.do":
   103             self.serve_status(body)
   104         elif self.path == "/play.do":
   105             self.serve_play(body)
   106         elif self.path == "/stream.do":
   107             self.serve_stream(body)
   108         else:
   109             self.send_error(404, "File not found")
   110     # do_dispatch()
   111 
   112 
   113     def do_GET(self):
   114         self.do_dispatch(True)
   115     # do_GET()
   116 
   117 
   118     def do_HEAD(self):
   119         self.do_dispatch(False)
   120     # do_HEAD()
   121 
   122 
   123     def _nav_items(self):
   124         self.wfile.write("""\
   125    <li><a href="/play.do">Play</a></li>
   126    <li><a href="/status.do">Status</a></li>
   127    <li><a href="/stop-transcoder.do">Stop transcoders</a></li>
   128    <li><a href="/shutdown.do">Shutdown Server</a></li>
   129 """)
   130     # _nav_items()
   131 
   132 
   133     def serve_main(self, body):
   134         self.send_response(200)
   135         self.send_header("Content-Type", "text/html")
   136         self.send_header('Connection', 'close')
   137         self.end_headers()
   138         if body:
   139             self.wfile.write("""\
   140 <html>
   141    <head><title>GMyth-Streamer Server</title></head>
   142    <body>
   143 <h1>Welcome to GMyth-Streamer Server</h1>
   144 <ul>
   145 """)
   146             self._nav_items()
   147             self.wfile.write("""\
   148 </ul>
   149    </body>
   150 </html>
   151 """)
   152     # serve_main()
   153 
   154 
   155     def serve_play(self, body):
   156         self.send_response(200)
   157         self.send_header("Content-Type", "text/html")
   158         self.send_header('Connection', 'close')
   159         self.end_headers()
   160         if body:
   161             self.wfile.write("""\
   162 <html>
   163    <head><title>GMyth-Streamer Server</title></head>
   164    <body>
   165    <h1>Play</h1>
   166    <form action="/stream.do" method="GET">
   167       <input type="text" name="type" value="file" />://<input type="text" name="location" value=""/>
   168       <input type="submit" />
   169    </form>
   170    <ul>
   171 """)
   172             self._nav_items()
   173             self.wfile.write("""\
   174    </ul>
   175    </body>
   176 </html>
   177 """)
   178     # serve_play()
   179 
   180 
   181     def serve_shutdown(self, body):
   182         self.send_response(200)
   183         self.send_header("Content-Type", "text/html")
   184         self.send_header('Connection', 'close')
   185         self.end_headers()
   186         if body:
   187             self.wfile.write("""\
   188 <html>
   189    <head><title>GMyth-Streamer Server Exited</title></head>
   190    <body>
   191       <h1>GMyth-Streamer is not running anymore</h1>
   192    </body>
   193 </html>
   194 """)
   195         self.server.server_close()
   196     # serve_shutdown()
   197 
   198 
   199     def serve_stop_all_transcoders(self, body):
   200         self.send_response(200)
   201         self.send_header("Content-Type", "text/html")
   202         self.send_header('Connection', 'close')
   203         self.end_headers()
   204         if body:
   205             self.server.stop_transcoders()
   206             self.wfile.write("""\
   207 <html>
   208    <head><title>GMyth-Streamer Server Stopped Transcoders</title></head>
   209    <body>
   210       <h1>GMyth-Streamer stopped running transcoders</h1>
   211       <ul>
   212 """)
   213             self._nav_items()
   214             self.wfile.write("""\
   215       </ul>
   216    </body>
   217 </html>
   218     """)
   219     # serve_stop_all_transcoders()
   220 
   221 
   222     def serve_stop_selected_transcoders(self, body, requests):
   223         self.send_response(200)
   224         self.send_header("Content-Type", "text/html")
   225         self.send_header('Connection', 'close')
   226         self.end_headers()
   227         if body:
   228             self.wfile.write("""\
   229 <html>
   230    <head><title>GMyth-Streamer Server Stopped Transcoders</title></head>
   231    <body>
   232       <h1>GMyth-Streamer stopped running transcoders:</h1>
   233       <ul>
   234     """)
   235             transcoders = self.server.get_transcoders()
   236 
   237             for req in requests:
   238                 try:
   239                     host, port = req.split(":")
   240                 except IndexError:
   241                     continue
   242 
   243                 port = int(port)
   244                 addr = (host, port)
   245 
   246                 for t, r in transcoders:
   247                     if r.client_address == addr:
   248                         try:
   249                             t.stop()
   250                         except Exception, e:
   251                             self.log.info("Plugin already stopped")
   252 
   253                         self.wfile.write("""\
   254          <li>%s: %s:%s</li>
   255 """ % (t, addr[0], addr[1]))
   256                         break
   257             self.wfile.write("""\
   258       </ul>
   259       <ul>
   260 """)
   261             self._nav_items()
   262             self.wfile.write("""\
   263       </ul>
   264    </body>
   265 </html>
   266 """)
   267     # serve_stop_selected_transcoders()
   268 
   269 
   270     def serve_stop_transcoder(self, body):
   271         req = self.query.get("request", None)
   272         if req and "all" in req:
   273             self.serve_stop_all_transcoders(body)
   274         elif req:
   275             self.serve_stop_selected_transcoders(body, req)
   276         else:
   277             self.serve_status(body)
   278     # serve_stop_transcoder()
   279 
   280 
   281     def serve_status(self, body):
   282         self.send_response(200)
   283         self.send_header("Content-Type", "text/html")
   284         self.send_header('Connection', 'close')
   285         self.end_headers()
   286         if body:
   287             self.wfile.write("""\
   288 <html>
   289    <head><title>GMyth-Streamer Server Status</title></head>
   290    <body>
   291       <h1>GMyth-Streamer Status</h1>
   292 """)
   293             tl = self.server.get_transcoders()
   294             if not tl:
   295                 self.wfile.write("<p>No running transcoder.</p>\n")
   296             else:
   297                 self.wfile.write("<p>Running transcoders:</p>\n")
   298                 self.wfile.write("""\
   299       <ul>
   300          <li><a href="/stop-transcoder.do?request=all">[STOP ALL]</a></li>
   301 """)
   302                 for transcoder, request in tl:
   303                     self.wfile.write("""\
   304       <li>%s: %s:%s <a href="/stop-transcoder.do?request=%s:%s">[STOP]</a></li>
   305 """ % (transcoder, request.client_address[0], request.client_address[1],
   306        request.client_address[0], request.client_address[1]))
   307 
   308                 self.wfile.write("""\
   309       </ul>
   310       <ul>
   311 """)
   312             self._nav_items()
   313             self.wfile.write("""\
   314       </ul>
   315    </body>
   316 </html>
   317 """)
   318     # serve_status()
   319 
   320 
   321     def _get_transcoder(self):
   322         # get transcoder option: mencoder is the default
   323         request_transcoders = self.query.get("transcoder", ["mencoder"])
   324 
   325         for t in request_transcoders:
   326             transcoder = self.transcoders.get(t)
   327             if transcoder:
   328                 return transcoder
   329 
   330         if not transcoder:
   331             return self.transcoders[self.def_transcoder]
   332     # _get_transcoder()
   333 
   334 
   335     def serve_stream(self, body):
   336         transcoder = self._get_transcoder()
   337         try:
   338             obj = transcoder(self.query)
   339         except Exception, e:
   340             self.send_error(500, str(e))
   341             return
   342 
   343         self.send_response(200)
   344         self.send_header("Content-Type", obj.get_mimetype())
   345         self.send_header('Connection', 'close')
   346         self.end_headers()
   347 
   348         if body:
   349             self.server.add_transcoders(self, obj)
   350             obj.start(self.wfile)
   351             self.server.del_transcoders(self, obj)
   352     # serve_stream()
   353 
   354 
   355     def log_request(self, code='-', size='-'):
   356         self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
   357     # log_request()
   358 
   359 
   360     def log_error(self, format, *args):
   361         self.log.error("%s: %s" % (self.address_string(), format % args))
   362     # log_error()
   363 
   364 
   365     def log_message(self, format, *args):
   366         self.log.info("%s: %s" % (self.address_string(), format % args))
   367     # log_message()
   368 # RequestHandler
   369 
   370 
   371 
   372 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
   373     log = log.getLogger("gmyth-streamer.server")
   374     run = True
   375     _transcoders = {}
   376     _lock = threading.RLock()
   377 
   378     def serve_forever(self):
   379         self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
   380                       self.socket.getsockname())
   381         try:
   382             while self.run:
   383                 self.handle_request()
   384         except KeyboardInterrupt, e:
   385             pass
   386 
   387         self.log.debug("Stopping all remaining transcoders...")
   388         self.stop_transcoders()
   389         self.log.debug("Transcoders stopped!")
   390     # serve_forever()
   391 
   392 
   393     def get_request(self):
   394         skt = self.socket
   395         old = skt.gettimeout()
   396         skt.settimeout(0.5)
   397         while self.run:
   398             try:
   399                 r = skt.accept()
   400                 skt.settimeout(old)
   401                 return r
   402             except socket.timeout, e:
   403                 pass
   404         raise socket.error("Not running")
   405     # get_request()
   406 
   407 
   408     def server_close(self):
   409         self.run = False
   410         self.stop_transcoders()
   411 
   412         BaseHTTPServer.HTTPServer.server_close(self)
   413     # server_close()
   414 
   415 
   416     def stop_transcoders(self):
   417         self._lock.acquire()
   418         for transcoder, request in self._transcoders.iteritems():
   419             self.log.info("Stop transcoder: %s, client=%s" %
   420                           (transcoder, request.client_address))
   421             transcoder.stop()
   422         self._lock.release()
   423     # stop_transcoders()
   424 
   425 
   426     def get_transcoders(self):
   427         self._lock.acquire()
   428         try:
   429             return self._transcoders.items()
   430         finally:
   431             self._lock.release()
   432     # get_transcoders()
   433 
   434 
   435     def add_transcoders(self, request, transcoder):
   436         self._lock.acquire()
   437         try:
   438             self._transcoders[transcoder] = request
   439         finally:
   440             self._lock.release()
   441     # add_transcoders()
   442 
   443 
   444     def del_transcoders(self, request, transcoder):
   445         self._lock.acquire()
   446         try:
   447             del self._transcoders[transcoder]
   448         finally:
   449             self._lock.release()
   450     # del_transcoders()
   451 # Server
   452 
   453 
   454 
   455 def serve_forever(host="0.0.0.0", port=40000):
   456     addr = (host, port)
   457 
   458     RequestHandler.protocol_version = "HTTP/1.0"
   459     httpd = Server(addr, RequestHandler)
   460     httpd.serve_forever()
   461 # serve_forever()
   462 
   463 
   464 def load_plugins_transcoders(directory):
   465     RequestHandler.load_plugins_transcoders(directory)
   466 # load_plugins_transcoders()