1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/gmyth-stream/server/0.2/lib/server.py Wed Apr 18 23:13:26 2007 +0100
1.3 @@ -0,0 +1,464 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
1.7 +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
1.8 +__license__ = "GPL"
1.9 +__version__ = "0.3"
1.10 +
1.11 +import os
1.12 +import threading
1.13 +import SocketServer
1.14 +import BaseHTTPServer
1.15 +import socket
1.16 +import urlparse
1.17 +import cgi
1.18 +import lib.utils as utils
1.19 +import logging as log
1.20 +
1.21 +__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
1.22 + "load_plugins_transcoders")
1.23 +
1.24 +class Transcoder(object):
1.25 + log = log.getLogger("gmyth-stream.transcoder")
1.26 + priority = 0 # negative values have higher priorities
1.27 + name = None # to be used in requests
1.28 +
1.29 + def __init__(self, params):
1.30 + self.params = params
1.31 + # __init__()
1.32 +
1.33 +
1.34 + def params_first(self, key, default=None):
1.35 + if default is None:
1.36 + return self.params[key][0]
1.37 + else:
1.38 + try:
1.39 + return self.params[key][0]
1.40 + except:
1.41 + return default
1.42 + # params_first()
1.43 +
1.44 +
1.45 + def get_mimetype(self):
1.46 + mux = self.params_first("mux", "mpg")
1.47 +
1.48 + if mux == "mpeg":
1.49 + return "video/mpeg"
1.50 + elif mux == "avi":
1.51 + return "video/x-msvideo"
1.52 + else:
1.53 + return "application/octet-stream"
1.54 + # get_mimetype()
1.55 +
1.56 +
1.57 + def start(self, outfile):
1.58 + return True
1.59 + # start()
1.60 +
1.61 +
1.62 + def stop(self):
1.63 + return Tru
1.64 + # stop()
1.65 +
1.66 +
1.67 + def __str__(self):
1.68 + return '%s("%s", mux="%s", params=%s)' % \
1.69 + (self.__class__.__name__,
1.70 + self.params_first("uri", "None"),
1.71 + self.params_first("mux", "mpg"),
1.72 + self.params)
1.73 + # __str__()
1.74 +# Transcoder
1.75 +
1.76 +
1.77 +
1.78 +class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
1.79 + log = log.getLogger("gmyth-stream.request")
1.80 + def_transcoder = None
1.81 + transcoders = utils.PluginSet(Transcoder)
1.82 +
1.83 + @classmethod
1.84 + def load_plugins_transcoders(cls, directory):
1.85 + cls.transcoders.load_from_directory(directory)
1.86 +
1.87 + if cls.def_transcoder is None and cls.transcoders:
1.88 + cls.def_transcoder = cls.transcoders[0].name
1.89 + # load_plugins_transcoders()
1.90 +
1.91 +
1.92 + def do_dispatch(self, body):
1.93 + self.url = self.path
1.94 +
1.95 + pieces = urlparse.urlparse(self.path)
1.96 + self.path = pieces[2]
1.97 + self.query = cgi.parse_qs(pieces[4])
1.98 +
1.99 + if self.path == "/":
1.100 + self.serve_main(body)
1.101 + elif self.path == "/shutdown.do":
1.102 + self.serve_shutdown(body)
1.103 + elif self.path == "/stop-transcoder.do":
1.104 + self.serve_stop_transcoder(body)
1.105 + elif self.path == "/status.do":
1.106 + self.serve_status(body)
1.107 + elif self.path == "/play.do":
1.108 + self.serve_play(body)
1.109 + elif self.path == "/stream.do":
1.110 + self.serve_stream(body)
1.111 + else:
1.112 + self.send_error(404, "File not found")
1.113 + # do_dispatch()
1.114 +
1.115 +
1.116 + def do_GET(self):
1.117 + self.do_dispatch(True)
1.118 + # do_GET()
1.119 +
1.120 +
1.121 + def do_HEAD(self):
1.122 + self.do_dispatch(False)
1.123 + # do_HEAD()
1.124 +
1.125 +
1.126 + def _nav_items(self):
1.127 + self.wfile.write("""\
1.128 + <li><a href="/play.do">Play</a></li>
1.129 + <li><a href="/status.do">Status</a></li>
1.130 + <li><a href="/stop-transcoder.do">Stop transcoders</a></li>
1.131 + <li><a href="/shutdown.do">Shutdown Server</a></li>
1.132 +""")
1.133 + # _nav_items()
1.134 +
1.135 +
1.136 + def serve_main(self, body):
1.137 + self.send_response(200)
1.138 + self.send_header("Content-Type", "text/html")
1.139 + self.send_header('Connection', 'close')
1.140 + self.end_headers()
1.141 + if body:
1.142 + self.wfile.write("""\
1.143 +<html>
1.144 + <head><title>Catota Server</title></head>
1.145 + <body>
1.146 +<h1>Welcome to Catota Server</h1>
1.147 +<ul>
1.148 +""")
1.149 + self._nav_items()
1.150 + self.wfile.write("""\
1.151 +</ul>
1.152 + </body>
1.153 +</html>
1.154 +""")
1.155 + # serve_main()
1.156 +
1.157 +
1.158 + def serve_play(self, body):
1.159 + self.send_response(200)
1.160 + self.send_header("Content-Type", "text/html")
1.161 + self.send_header('Connection', 'close')
1.162 + self.end_headers()
1.163 + if body:
1.164 + self.wfile.write("""\
1.165 +<html>
1.166 + <head><title>Catota Server</title></head>
1.167 + <body>
1.168 + <h1>Play</h1>
1.169 + <form action="/stream.do" method="GET">
1.170 + <input type="text" name="type" value="file" />://<input type="text" name="location" value=""/>
1.171 + <input type="submit" />
1.172 + </form>
1.173 + <ul>
1.174 +""")
1.175 + self._nav_items()
1.176 + self.wfile.write("""\
1.177 + </ul>
1.178 + </body>
1.179 +</html>
1.180 +""")
1.181 + # serve_play()
1.182 +
1.183 +
1.184 + def serve_shutdown(self, body):
1.185 + self.send_response(200)
1.186 + self.send_header("Content-Type", "text/html")
1.187 + self.send_header('Connection', 'close')
1.188 + self.end_headers()
1.189 + if body:
1.190 + self.wfile.write("""\
1.191 +<html>
1.192 + <head><title>Catota Server Exited</title></head>
1.193 + <body>
1.194 + <h1>Catota is not running anymore</h1>
1.195 + </body>
1.196 +</html>
1.197 +""")
1.198 + self.server.server_close()
1.199 + # serve_shutdown()
1.200 +
1.201 +
1.202 + def serve_stop_all_transcoders(self, body):
1.203 + self.send_response(200)
1.204 + self.send_header("Content-Type", "text/html")
1.205 + self.send_header('Connection', 'close')
1.206 + self.end_headers()
1.207 + if body:
1.208 + self.server.stop_transcoders()
1.209 + self.wfile.write("""\
1.210 +<html>
1.211 + <head><title>Catota Server Stopped Transcoders</title></head>
1.212 + <body>
1.213 + <h1>Catota stopped running transcoders</h1>
1.214 + <ul>
1.215 +""")
1.216 + self._nav_items()
1.217 + self.wfile.write("""\
1.218 + </ul>
1.219 + </body>
1.220 +</html>
1.221 + """)
1.222 + # serve_stop_all_transcoders()
1.223 +
1.224 +
1.225 + def serve_stop_selected_transcoders(self, body, requests):
1.226 + self.send_response(200)
1.227 + self.send_header("Content-Type", "text/html")
1.228 + self.send_header('Connection', 'close')
1.229 + self.end_headers()
1.230 + if body:
1.231 + self.wfile.write("""\
1.232 +<html>
1.233 + <head><title>Catota Server Stopped Transcoders</title></head>
1.234 + <body>
1.235 + <h1>Catota stopped running transcoders:</h1>
1.236 + <ul>
1.237 + """)
1.238 + transcoders = self.server.get_transcoders()
1.239 +
1.240 + for req in requests:
1.241 + try:
1.242 + host, port = req.split(":")
1.243 + except IndexError:
1.244 + continue
1.245 +
1.246 + port = int(port)
1.247 + addr = (host, port)
1.248 +
1.249 + for t, r in transcoders:
1.250 + if r.client_address == addr:
1.251 + t.stop()
1.252 + self.server.del_transcoders(self, t)
1.253 + self.wfile.write("""\
1.254 + <li>%s: %s:%s</li>
1.255 +""" % (t, addr[0], addr[1]))
1.256 + t.stop()
1.257 + break
1.258 + self.wfile.write("""\
1.259 + </ul>
1.260 + <ul>
1.261 +""")
1.262 + self._nav_items()
1.263 + self.wfile.write("""\
1.264 + </ul>
1.265 + </body>
1.266 +</html>
1.267 +""")
1.268 + # serve_stop_selected_transcoders()
1.269 +
1.270 +
1.271 + def serve_stop_transcoder(self, body):
1.272 + req = self.query.get("request", None)
1.273 + if req and "all" in req:
1.274 + self.serve_stop_all_transcoders(body)
1.275 + elif req:
1.276 + self.serve_stop_selected_transcoders(body, req)
1.277 + else:
1.278 + self.serve_status(body)
1.279 + # serve_stop_transcoder()
1.280 +
1.281 +
1.282 + def serve_status(self, body):
1.283 + self.send_response(200)
1.284 + self.send_header("Content-Type", "text/html")
1.285 + self.send_header('Connection', 'close')
1.286 + self.end_headers()
1.287 + if body:
1.288 + self.wfile.write("""\
1.289 +<html>
1.290 + <head><title>Catota Server Status</title></head>
1.291 + <body>
1.292 + <h1>Catota Status</h1>
1.293 +""")
1.294 + tl = self.server.get_transcoders()
1.295 + if not tl:
1.296 + self.wfile.write("<p>No running transcoder.</p>\n")
1.297 + else:
1.298 + self.wfile.write("<p>Running transcoders:</p>\n")
1.299 + self.wfile.write("""\
1.300 + <ul>
1.301 + <li><a href="/stop-transcoder.do?request=all">[STOP ALL]</a></li>
1.302 +""")
1.303 + for transcoder, request in tl:
1.304 + self.wfile.write("""\
1.305 + <li>%s: %s:%s <a href="/stop-transcoder.do?request=%s:%s">[STOP]</a></li>
1.306 +""" % (transcoder, request.client_address[0], request.client_address[1],
1.307 + request.client_address[0], request.client_address[1]))
1.308 +
1.309 + self.wfile.write("""\
1.310 + </ul>
1.311 + <ul>
1.312 +""")
1.313 + self._nav_items()
1.314 + self.wfile.write("""\
1.315 + </ul>
1.316 + </body>
1.317 +</html>
1.318 +""")
1.319 + # serve_status()
1.320 +
1.321 +
1.322 + def _get_transcoder(self):
1.323 + # get transcoder option: mencoder is the default
1.324 + request_transcoders = self.query.get("transcoder", ["mencoder"])
1.325 +
1.326 + for t in request_transcoders:
1.327 + transcoder = self.transcoders.get(t)
1.328 + if transcoder:
1.329 + return transcoder
1.330 +
1.331 + if not transcoder:
1.332 + return self.transcoders[self.def_transcoder]
1.333 + # _get_transcoder()
1.334 +
1.335 +
1.336 + def serve_stream(self, body):
1.337 + transcoder = self._get_transcoder()
1.338 + try:
1.339 + obj = transcoder(self.query)
1.340 + except Exception, e:
1.341 + self.send_error(500, str(e))
1.342 + return
1.343 +
1.344 + self.send_response(200)
1.345 + self.send_header("Content-Type", obj.get_mimetype())
1.346 + self.send_header('Connection', 'close')
1.347 + self.end_headers()
1.348 +
1.349 + if body:
1.350 + self.server.add_transcoders(self, obj)
1.351 + obj.start(self.wfile)
1.352 + self.server.del_transcoders(self, obj)
1.353 + # serve_stream()
1.354 +
1.355 +
1.356 + def log_request(self, code='-', size='-'):
1.357 + self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
1.358 + # log_request()
1.359 +
1.360 +
1.361 + def log_error(self, format, *args):
1.362 + self.log.error("%s: %s" % (self.address_string(), format % args))
1.363 + # log_error()
1.364 +
1.365 +
1.366 + def log_message(self, format, *args):
1.367 + self.log.info("%s: %s" % (self.address_string(), format % args))
1.368 + # log_message()
1.369 +# RequestHandler
1.370 +
1.371 +
1.372 +
1.373 +class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
1.374 + log = log.getLogger("gmyth-streamer.server")
1.375 + run = True
1.376 + _transcoders = {}
1.377 + _lock = threading.RLock()
1.378 +
1.379 + def serve_forever(self):
1.380 + self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
1.381 + self.socket.getsockname())
1.382 + try:
1.383 + while self.run:
1.384 + self.handle_request()
1.385 + except KeyboardInterrupt, e:
1.386 + pass
1.387 +
1.388 + self.log.debug("Stopping all remaining transcoders...")
1.389 + self.stop_transcoders()
1.390 + self.log.debug("Transcoders stopped!")
1.391 + # serve_forever()
1.392 +
1.393 +
1.394 + def get_request(self):
1.395 + skt = self.socket
1.396 + old = skt.gettimeout()
1.397 + skt.settimeout(0.5)
1.398 + while self.run:
1.399 + try:
1.400 + r = skt.accept()
1.401 + skt.settimeout(old)
1.402 + return r
1.403 + except socket.timeout, e:
1.404 + pass
1.405 + raise socket.error("Not running")
1.406 + # get_request()
1.407 +
1.408 +
1.409 + def server_close(self):
1.410 + self.run = False
1.411 + self.stop_transcoders()
1.412 +
1.413 + BaseHTTPServer.HTTPServer.server_close(self)
1.414 + # server_close()
1.415 +
1.416 +
1.417 + def stop_transcoders(self):
1.418 + self._lock.acquire()
1.419 + for transcoder, request in self._transcoders.iteritems():
1.420 + self.log.info("Stop transcoder: %s, client=%s" %
1.421 + (transcoder, request.client_address))
1.422 + transcoder.stop()
1.423 + self._lock.release()
1.424 + # stop_transcoders()
1.425 +
1.426 +
1.427 + def get_transcoders(self):
1.428 + self._lock.acquire()
1.429 + try:
1.430 + return self._transcoders.items()
1.431 + finally:
1.432 + self._lock.release()
1.433 + # get_transcoders()
1.434 +
1.435 +
1.436 + def add_transcoders(self, request, transcoder):
1.437 + self._lock.acquire()
1.438 + try:
1.439 + self._transcoders[transcoder] = request
1.440 + finally:
1.441 + self._lock.release()
1.442 + # add_transcoders()
1.443 +
1.444 +
1.445 + def del_transcoders(self, request, transcoder):
1.446 + self._lock.acquire()
1.447 + try:
1.448 + del self._transcoders[transcoder]
1.449 + finally:
1.450 + self._lock.release()
1.451 + # del_transcoders()
1.452 +# Server
1.453 +
1.454 +
1.455 +
1.456 +def serve_forever(host="0.0.0.0", port=40000):
1.457 + addr = (host, port)
1.458 +
1.459 + RequestHandler.protocol_version = "HTTP/1.0"
1.460 + httpd = Server(addr, RequestHandler)
1.461 + httpd.serve_forever()
1.462 +# serve_forever()
1.463 +
1.464 +
1.465 +def load_plugins_transcoders(directory):
1.466 + RequestHandler.load_plugins_transcoders(directory)
1.467 +# load_plugins_transcoders()