1.1 --- a/gmyth-stream/server/0.2/lib/server.py Tue Aug 28 15:41:35 2007 +0100
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,424 +0,0 @@
1.4 -#!/usr/bin/env python
1.5 -
1.6 -__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
1.7 -__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
1.8 -__license__ = "GPL"
1.9 -__version__ = "0.2"
1.10 -
1.11 -import os
1.12 -import threading
1.13 -import SocketServer
1.14 -import BaseHTTPServer
1.15 -import socket
1.16 -import urlparse
1.17 -import cgi
1.18 -import lib.utils as utils
1.19 -import logging as log
1.20 -
1.21 -__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
1.22 - "load_plugins_transcoders")
1.23 -
1.24 -class Transcoder(object):
1.25 - log = log.getLogger("gms.transcoder")
1.26 - priority = 0 # negative values have higher priorities
1.27 - name = None # to be used in requests
1.28 - status = None
1.29 - tid = -1
1.30 -
1.31 - def __init__(self, params):
1.32 - self.params = params
1.33 - # __init__()
1.34 -
1.35 -
1.36 - def params_first(self, key, default=None):
1.37 - if default is None:
1.38 - return self.params[key][0]
1.39 - else:
1.40 - try:
1.41 - return self.params[key][0]
1.42 - except:
1.43 - return default
1.44 - # params_first()
1.45 -
1.46 -
1.47 - def get_mimetype(self):
1.48 - mux = self.params_first("mux", "mpg")
1.49 -
1.50 - if mux == "mpeg":
1.51 - return "video/mpeg"
1.52 - elif mux == "avi":
1.53 - return "video/x-msvideo"
1.54 - else:
1.55 - return "application/octet-stream"
1.56 - # get_mimetype()
1.57 -
1.58 -
1.59 - def start(self, outfile):
1.60 - return True
1.61 - # start()
1.62 -
1.63 -
1.64 - def stop(self):
1.65 - return True
1.66 - # stop()
1.67 -
1.68 -
1.69 - def __str__(self):
1.70 - return '%s( params=%s )' % \
1.71 - (self.__class__.__name__,
1.72 - self.params)
1.73 - # __str__()
1.74 -# Transcoder
1.75 -
1.76 -
1.77 -
1.78 -class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
1.79 - log = log.getLogger("gms.request")
1.80 - def_transcoder = None
1.81 - transcoders = utils.PluginSet(Transcoder)
1.82 -
1.83 - menu = {
1.84 - "Stop": "/stop-transcoder.do",
1.85 - "Status": "/status.do",
1.86 - "Version": "/version.do",
1.87 - "Shutdown": "/shutdown.do"
1.88 - }
1.89 -
1.90 - @classmethod
1.91 - def load_plugins_transcoders(cls, directory):
1.92 - cls.transcoders.load_from_directory(directory)
1.93 -
1.94 - if cls.def_transcoder is None and cls.transcoders:
1.95 - cls.def_transcoder = cls.transcoders[0].name
1.96 - # load_plugins_transcoders()
1.97 -
1.98 -
1.99 - def do_dispatch(self, body):
1.100 - self.url = self.path
1.101 -
1.102 - pieces = urlparse.urlparse(self.path)
1.103 - self.path = pieces[2]
1.104 - self.query = cgi.parse_qs(pieces[4])
1.105 -
1.106 - if self.path == "/":
1.107 - self.serve_main(body)
1.108 - elif self.path == "/shutdown.do":
1.109 - self.serve_shutdown(body)
1.110 - elif self.path == "/stop-transcoder.do":
1.111 - self.serve_stop_transcoder(body)
1.112 - elif self.path == "/status.do":
1.113 - self.serve_status(body)
1.114 - elif self.path == "/version.do":
1.115 - self.serve_version(body)
1.116 - elif self.path == "/stream.do":
1.117 - self.serve_stream(body)
1.118 - else:
1.119 - action = self.query.get("action", None)
1.120 - if "stream.do" in action:
1.121 - self.serve_stream(body)
1.122 - else:
1.123 - self.send_error(404, "File not found")
1.124 - # do_dispatch()
1.125 -
1.126 -
1.127 - def do_GET(self):
1.128 - self.do_dispatch(True)
1.129 - # do_GET()
1.130 -
1.131 -
1.132 - def do_HEAD(self):
1.133 - self.do_dispatch(False)
1.134 - # do_HEAD()
1.135 -
1.136 -
1.137 - def _nav_items(self):
1.138 - ret = ""
1.139 - for name, url in self.menu.items():
1.140 - ret += utils.getHTML("menu", {"name": name, "url": url})
1.141 -
1.142 - return ret
1.143 - # _nav_items()
1.144 -
1.145 - def serve_main(self, body):
1.146 - self.send_response(200)
1.147 - self.send_header("Content-Type", "text/html")
1.148 - self.send_header('Connection', 'close')
1.149 - self.end_headers()
1.150 - if body:
1.151 - self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
1.152 - # serve_main()
1.153 -
1.154 - def serve_version(self, body):
1.155 - self.send_response(200)
1.156 - self.send_header("Content-Type", "text/html")
1.157 - self.send_header('Connection', 'close')
1.158 - self.end_headers()
1.159 - if body:
1.160 - self.wfile.write("Version: %s" % __version__)
1.161 -
1.162 -
1.163 - def serve_shutdown(self, body):
1.164 - self.send_response(200)
1.165 - self.send_header("Content-Type", "text/html")
1.166 - self.send_header('Connection', 'close')
1.167 - self.end_headers()
1.168 - if body:
1.169 - self.wfile.write(utils.getHTML("shutdown"))
1.170 - self.server.server_close()
1.171 - # serve_shutdown()
1.172 -
1.173 -
1.174 - def serve_stop_all_transcoders(self, body):
1.175 - self.send_response(200)
1.176 - self.send_header("Content-Type", "text/html")
1.177 - self.send_header('Connection', 'close')
1.178 - self.end_headers()
1.179 - if body:
1.180 - self.server.stop_transcoders()
1.181 - self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
1.182 - # serve_stop_all_transcoders()
1.183 -
1.184 -
1.185 - def serve_stop_selected_transcoders(self, body, requests):
1.186 - self.send_response(200)
1.187 - self.send_header("Content-Type", "text/html")
1.188 - self.send_header('Connection', 'close')
1.189 - self.end_headers()
1.190 - opts = ""
1.191 - if body:
1.192 - transcoders = self.server.get_transcoders()
1.193 -
1.194 - for req in requests:
1.195 - try:
1.196 - host, port = req.split(":")
1.197 - except IndexError:
1.198 - continue
1.199 -
1.200 - port = int(port)
1.201 - addr = (host, port)
1.202 -
1.203 - for t, r in transcoders:
1.204 - if r.client_address == addr:
1.205 - try:
1.206 - t.stop()
1.207 - except Exception, e:
1.208 - self.log.info("Plugin already stopped")
1.209 -
1.210 - opts += self._create_html_item("%s: %s:%s" % (
1.211 - t, addr[0], addr[1]))
1.212 -
1.213 - break
1.214 -
1.215 - self.wfile.write(utils.getHTML("stop_selected",
1.216 - {"menu": self._nav_items(),
1.217 - "opts": opts}))
1.218 - # serve_stop_selected_transcoders()
1.219 -
1.220 -
1.221 - def serve_stop_transcoder(self, body):
1.222 - req = self.query.get("request", None)
1.223 - if req and "all" in req:
1.224 - self.serve_stop_all_transcoders(body)
1.225 - elif req:
1.226 - self.serve_stop_selected_transcoders(body, req)
1.227 - else:
1.228 - self.serve_status(body)
1.229 - # serve_stop_transcoder()
1.230 -
1.231 -
1.232 - def serve_status(self, body):
1.233 - self.send_response(200)
1.234 - self.send_header("Content-Type", "text/html")
1.235 - self.send_header('Connection', 'close')
1.236 - self.end_headers()
1.237 - stopone = ""
1.238 -
1.239 - if body:
1.240 - tl = self.server.get_transcoders()
1.241 - if not tl:
1.242 - running = "<p>No running transcoder.</p>\n"
1.243 - stopall = ""
1.244 -
1.245 - elif self.query.get("ip") and self.query.get("file"):
1.246 - for transcoder, request in tl:
1.247 - filename = "%s" % self.query.get("file")[0]
1.248 - tfilename = "%s" % transcoder.params_first("uri")
1.249 -
1.250 - if tfilename.find(filename) >= 0 and \
1.251 - request.client_address[0] == self.query.get("ip")[0]:
1.252 - self.wfile.write("Status: %s %%" % transcoder.status)
1.253 - return True
1.254 -
1.255 - return False
1.256 -
1.257 - else:
1.258 - running = "<p>Running transcoders:</p>\n"
1.259 - stopall = utils._create_html_item("<a href='%s?request=all'>"
1.260 - "[STOP ALL]</a>" %
1.261 - self.menu["Stop"])
1.262 -
1.263 - for transcoder, request in tl:
1.264 - stopone += utils._create_html_item("%s: %s:%s<a href='%s?"
1.265 - "request=%s:%s'>"
1.266 - "[STOP]</a> - "
1.267 - "Status: %s%%"\
1.268 - % (
1.269 - transcoder, request.client_address[0],
1.270 - request.client_address[1],
1.271 - self.menu["Stop"], request.client_address[0],
1.272 - request.client_address[1],
1.273 - transcoder.status) )
1.274 -
1.275 - self.wfile.write(utils.getHTML("status",
1.276 - {"menu": self._nav_items(),
1.277 - "running": running,
1.278 - "stopall": stopall,
1.279 - "stopone": stopone}))
1.280 - # serve_status()
1.281 -
1.282 -
1.283 - def _get_transcoder(self):
1.284 - # get transcoder option: mencoder is the default
1.285 - request_transcoders = self.query.get("transcoder", ["mencoder"])
1.286 -
1.287 - for t in request_transcoders:
1.288 - transcoder = self.transcoders.get(t)
1.289 - if transcoder:
1.290 - return transcoder
1.291 -
1.292 - if not transcoder:
1.293 - return self.transcoders[self.def_transcoder]
1.294 - # _get_transcoder()
1.295 -
1.296 -
1.297 - def serve_stream(self, body):
1.298 - transcoder = self._get_transcoder()
1.299 - try:
1.300 - obj = transcoder(self.query)
1.301 - except Exception, e:
1.302 - self.send_error(500, str(e))
1.303 - return
1.304 -
1.305 - self.send_response(200)
1.306 - self.send_header("Content-Type", obj.get_mimetype())
1.307 - self.send_header('Connection', 'close')
1.308 - self.end_headers()
1.309 -
1.310 - if body:
1.311 - self.server.add_transcoders(self, obj)
1.312 - obj.start(self.wfile)
1.313 - self.server.del_transcoders(self, obj)
1.314 - # serve_stream()
1.315 -
1.316 -
1.317 - def log_request(self, code='-', size='-'):
1.318 - self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
1.319 - # log_request()
1.320 -
1.321 -
1.322 - def log_error(self, format, *args):
1.323 - self.log.error("%s: %s" % (self.address_string(), format % args))
1.324 - # log_error()
1.325 -
1.326 -
1.327 - def log_message(self, format, *args):
1.328 - self.log.info("%s: %s" % (self.address_string(), format % args))
1.329 - # log_message()
1.330 -# RequestHandler
1.331 -
1.332 -
1.333 -
1.334 -class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
1.335 - log = log.getLogger("gms.server")
1.336 - run = True
1.337 - _transcoders = {}
1.338 - _lock = threading.RLock()
1.339 -
1.340 - def serve_forever(self):
1.341 - self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
1.342 - self.socket.getsockname())
1.343 - try:
1.344 - while self.run:
1.345 - self.handle_request()
1.346 - except KeyboardInterrupt, e:
1.347 - pass
1.348 -
1.349 - self.log.debug("Stopping all remaining transcoders...")
1.350 - self.stop_transcoders()
1.351 - self.log.debug("Transcoders stopped!")
1.352 - # serve_forever()
1.353 -
1.354 -
1.355 - def get_request(self):
1.356 - skt = self.socket
1.357 - old = skt.gettimeout()
1.358 - skt.settimeout(0.5)
1.359 - while self.run:
1.360 - try:
1.361 - r = skt.accept()
1.362 - skt.settimeout(old)
1.363 - return r
1.364 - except socket.timeout, e:
1.365 - pass
1.366 - raise socket.error("Not running")
1.367 - # get_request()
1.368 -
1.369 -
1.370 - def server_close(self):
1.371 - self.run = False
1.372 - self.stop_transcoders()
1.373 -
1.374 - BaseHTTPServer.HTTPServer.server_close(self)
1.375 - # server_close()
1.376 -
1.377 -
1.378 - def stop_transcoders(self):
1.379 - self._lock.acquire()
1.380 - for transcoder, request in self._transcoders.iteritems():
1.381 - self.log.info("Stop transcoder: %s, client=%s" %
1.382 - (transcoder, request.client_address))
1.383 - transcoder.stop()
1.384 - self._lock.release()
1.385 - # stop_transcoders()
1.386 -
1.387 -
1.388 - def get_transcoders(self):
1.389 - self._lock.acquire()
1.390 - try:
1.391 - return self._transcoders.items()
1.392 - finally:
1.393 - self._lock.release()
1.394 - # get_transcoders()
1.395 -
1.396 -
1.397 - def add_transcoders(self, request, transcoder):
1.398 - self._lock.acquire()
1.399 - try:
1.400 - self._transcoders[transcoder] = request
1.401 - finally:
1.402 - self._lock.release()
1.403 - # add_transcoders()
1.404 -
1.405 -
1.406 - def del_transcoders(self, request, transcoder):
1.407 - self._lock.acquire()
1.408 - try:
1.409 - del self._transcoders[transcoder]
1.410 - finally:
1.411 - self._lock.release()
1.412 - # del_transcoders()
1.413 -# Server
1.414 -
1.415 -
1.416 -
1.417 -def serve_forever(host="0.0.0.0", port=40000):
1.418 - addr = (host, port)
1.419 - RequestHandler.protocol_version = "HTTP/1.0"
1.420 - httpd = Server(addr, RequestHandler)
1.421 - httpd.serve_forever()
1.422 -# serve_forever()
1.423 -
1.424 -
1.425 -def load_plugins_transcoders(directory):
1.426 - RequestHandler.load_plugins_transcoders(directory)
1.427 -# load_plugins_transcoders()