diff -r 000000000000 -r ed34b1dab103 gmyth-stream/server/0.2/lib/server.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.2/lib/server.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,464 @@ +#!/usr/bin/env python + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.3" + +import os +import threading +import SocketServer +import BaseHTTPServer +import socket +import urlparse +import cgi +import lib.utils as utils +import logging as log + +__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever", + "load_plugins_transcoders") + +class Transcoder(object): + log = log.getLogger("gmyth-stream.transcoder") + priority = 0 # negative values have higher priorities + name = None # to be used in requests + + def __init__(self, params): + self.params = params + # __init__() + + + def params_first(self, key, default=None): + if default is None: + return self.params[key][0] + else: + try: + return self.params[key][0] + except: + return default + # params_first() + + + def get_mimetype(self): + mux = self.params_first("mux", "mpg") + + if mux == "mpeg": + return "video/mpeg" + elif mux == "avi": + return "video/x-msvideo" + else: + return "application/octet-stream" + # get_mimetype() + + + def start(self, outfile): + return True + # start() + + + def stop(self): + return Tru + # stop() + + + def __str__(self): + return '%s("%s", mux="%s", params=%s)' % \ + (self.__class__.__name__, + self.params_first("uri", "None"), + self.params_first("mux", "mpg"), + self.params) + # __str__() +# Transcoder + + + +class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + log = log.getLogger("gmyth-stream.request") + def_transcoder = None + transcoders = utils.PluginSet(Transcoder) + + @classmethod + def load_plugins_transcoders(cls, directory): + cls.transcoders.load_from_directory(directory) + + if cls.def_transcoder is None and cls.transcoders: + cls.def_transcoder = cls.transcoders[0].name + # load_plugins_transcoders() + + + def do_dispatch(self, body): + self.url = self.path + + pieces = urlparse.urlparse(self.path) + self.path = pieces[2] + self.query = cgi.parse_qs(pieces[4]) + + if self.path == "/": + self.serve_main(body) + elif self.path == "/shutdown.do": + self.serve_shutdown(body) + elif self.path == "/stop-transcoder.do": + self.serve_stop_transcoder(body) + elif self.path == "/status.do": + self.serve_status(body) + elif self.path == "/play.do": + self.serve_play(body) + elif self.path == "/stream.do": + self.serve_stream(body) + else: + self.send_error(404, "File not found") + # do_dispatch() + + + def do_GET(self): + self.do_dispatch(True) + # do_GET() + + + def do_HEAD(self): + self.do_dispatch(False) + # do_HEAD() + + + def _nav_items(self): + self.wfile.write("""\ +
  • Play
  • +
  • Status
  • +
  • Stop transcoders
  • +
  • Shutdown Server
  • +""") + # _nav_items() + + + def serve_main(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server + +

    Welcome to Catota Server

    + + + +""") + # serve_main() + + + def serve_play(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server + +

    Play

    +
    + :// + +
    + + + +""") + # serve_play() + + + def serve_shutdown(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server Exited + +

    Catota is not running anymore

    + + +""") + self.server.server_close() + # serve_shutdown() + + + def serve_stop_all_transcoders(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.server.stop_transcoders() + self.wfile.write("""\ + + Catota Server Stopped Transcoders + +

    Catota stopped running transcoders

    + + + + """) + # serve_stop_all_transcoders() + + + def serve_stop_selected_transcoders(self, body, requests): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server Stopped Transcoders + +

    Catota stopped running transcoders:

    + + + + +""") + # serve_stop_selected_transcoders() + + + def serve_stop_transcoder(self, body): + req = self.query.get("request", None) + if req and "all" in req: + self.serve_stop_all_transcoders(body) + elif req: + self.serve_stop_selected_transcoders(body, req) + else: + self.serve_status(body) + # serve_stop_transcoder() + + + def serve_status(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server Status + +

    Catota Status

    +""") + tl = self.server.get_transcoders() + if not tl: + self.wfile.write("

    No running transcoder.

    \n") + else: + self.wfile.write("

    Running transcoders:

    \n") + self.wfile.write("""\ + + + + +""") + # serve_status() + + + def _get_transcoder(self): + # get transcoder option: mencoder is the default + request_transcoders = self.query.get("transcoder", ["mencoder"]) + + for t in request_transcoders: + transcoder = self.transcoders.get(t) + if transcoder: + return transcoder + + if not transcoder: + return self.transcoders[self.def_transcoder] + # _get_transcoder() + + + def serve_stream(self, body): + transcoder = self._get_transcoder() + try: + obj = transcoder(self.query) + except Exception, e: + self.send_error(500, str(e)) + return + + self.send_response(200) + self.send_header("Content-Type", obj.get_mimetype()) + self.send_header('Connection', 'close') + self.end_headers() + + if body: + self.server.add_transcoders(self, obj) + obj.start(self.wfile) + self.server.del_transcoders(self, obj) + # serve_stream() + + + def log_request(self, code='-', size='-'): + self.log.info('"%s" %s %s', self.requestline, str(code), str(size)) + # log_request() + + + def log_error(self, format, *args): + self.log.error("%s: %s" % (self.address_string(), format % args)) + # log_error() + + + def log_message(self, format, *args): + self.log.info("%s: %s" % (self.address_string(), format % args)) + # log_message() +# RequestHandler + + + +class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): + log = log.getLogger("gmyth-streamer.server") + run = True + _transcoders = {} + _lock = threading.RLock() + + def serve_forever(self): + self.log.info("GMyth-Streamer serving HTTP on %s:%s" % + self.socket.getsockname()) + try: + while self.run: + self.handle_request() + except KeyboardInterrupt, e: + pass + + self.log.debug("Stopping all remaining transcoders...") + self.stop_transcoders() + self.log.debug("Transcoders stopped!") + # serve_forever() + + + def get_request(self): + skt = self.socket + old = skt.gettimeout() + skt.settimeout(0.5) + while self.run: + try: + r = skt.accept() + skt.settimeout(old) + return r + except socket.timeout, e: + pass + raise socket.error("Not running") + # get_request() + + + def server_close(self): + self.run = False + self.stop_transcoders() + + BaseHTTPServer.HTTPServer.server_close(self) + # server_close() + + + def stop_transcoders(self): + self._lock.acquire() + for transcoder, request in self._transcoders.iteritems(): + self.log.info("Stop transcoder: %s, client=%s" % + (transcoder, request.client_address)) + transcoder.stop() + self._lock.release() + # stop_transcoders() + + + def get_transcoders(self): + self._lock.acquire() + try: + return self._transcoders.items() + finally: + self._lock.release() + # get_transcoders() + + + def add_transcoders(self, request, transcoder): + self._lock.acquire() + try: + self._transcoders[transcoder] = request + finally: + self._lock.release() + # add_transcoders() + + + def del_transcoders(self, request, transcoder): + self._lock.acquire() + try: + del self._transcoders[transcoder] + finally: + self._lock.release() + # del_transcoders() +# Server + + + +def serve_forever(host="0.0.0.0", port=40000): + addr = (host, port) + + RequestHandler.protocol_version = "HTTP/1.0" + httpd = Server(addr, RequestHandler) + httpd.serve_forever() +# serve_forever() + + +def load_plugins_transcoders(directory): + RequestHandler.load_plugins_transcoders(directory) +# load_plugins_transcoders()