# HG changeset patch # User morphbr # Date 1176908350 -3600 # Node ID ed34b1dab10357caadf0cf0e419bf14e70959d6e # Parent 1b897f699097eb297928ea2847ac8e9fdcc92e42 [svn r570] - Included new core for GMyth-Streamer (0.2) - New core based on code by Gustavo Barbieri diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/lib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/lib.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,36 @@ +import time +import logging +import os +import stat + +ext = ['mpg', 'avi', 'mp4', 'nuv', 'mpeg', 'mov'] + +def now(): + return time.strftime("%Y-%m-%d %H:%M:%S"); + +def log(msg): + logging.log(logging.DEBUG, msg) + new_msg = "[%s] %s" % (now(), msg) + return new_msg + + +bin_path_list = os.environ["PATH"].split(os.pathsep) +def which(prg): + for d in bin_path_list: + path = os.path.join(d, prg) + if os.path.exists(path): + st = os.stat(path) + if st[stat.ST_MODE] & 0111: + return path + return "" + +def list_media_files(directory, file_list): + for root, dirs, files in os.walk(directory): + for name in files: + if os.path.splitext(name)[1].strip(".") in ext: + media = os.path.join(root,name) + if media not in file_list: + file_list.append(os.path.join(root,name)) + + return True + diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/main.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/main.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,185 @@ +#!/usr/bin/python + +import os +import lib +import sys +import imp +import ConfigParser +import logging as log + +log.basicConfig(level=log.DEBUG, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt='%Y-%m-%d %H:%M:%S') + +config = ConfigParser.ConfigParser() +config.read("stream.conf") + +def load_module(pathlist, name): + fp, path, desc = imp.find_module(name, pathlist) + try: + module = imp.load_module(name, fp, path, desc) + return module + finally: + if fp: + fp.close() + + +media_plugin = config.get("Media", "engine") +media_plugin_module = load_module(["./plugins/media"], media_plugin) +media = media_plugin_module.Media(config) + +comm_plugin = config.get("Comm", "engine") +comm_plugin_module = load_module(["./plugins/comm"], comm_plugin) +server = comm_plugin_module.Server(config) + +log.info("Starting GMyth-Stream server") + + +''' +PROTOCOL DESCRIPTION +===================== + +COMMAND OPTIONS + +-> SETUP DESCRIPTION +|-> used to setup transcoding and streaming parameters +|-> must be used before any "PLAY" command +|-> e.g: + +file://file_name mux vcodec vbitrate fps acodec abitrate width height options +dvd://title_number mux vcodec vbitrate fps acodec abitrate width height options + +-> PLAY DESCRIPTION + |-> used to start transcoding and streaming of file + |-> must be used just if SETUP was used before + |-> after it, _must_ send STOP + +-> STOP DESCRIPTION + |-> used to stop transcoding and streaming process + |-> must be used just if PLAY was used before + |-> must be used after PLAY + +-> QUIT DESCRIPTION + |-> used to quit the main loop (quit program) + +''' +nextport = 0 +setup = (False, "STOPPED") + +def do_setup(server, filename, mux, vcodec, vbitrate, fps, acodec, abitrate, + width, height, *options): + global nextport + global setup + + if setup[1] != "PLAYING": + nextport += 1 + ret = media.setup(filename, mux, vcodec, vbitrate, fps, acodec, + abitrate, width, height, nextport, options) + if ret[0]: + server.sendOk() + else: + server.sendNotOk(ret[1]) + + setup = (True, setup[1]) + + else: server.sendNotOk("You must STOP before SETingUP again") + + return True + +def do_play(server): + global setup + + if setup[0] and setup[1] == "STOPPED": + setup = (setup[0], "PLAYING") + ret = media.play() + if ret[0]: + server.sendOk("%d" % nextport) + else: + server.sendNotOk(ret[1]) + + else: + if setup[1] == "STOPPED": + server.sendNotOk("You must SETUP before PLAYing") + else: + server.sendNotOk("You must STOP before PLAYing again") + + return True + +def do_stop(server): + global setup + + media.stop() + setup = (False, "STOPPED") + server.sendOk() + return True + +def do_list(server, *directory): + file_list = [] + for j in directory: + lib.list_media_files(j, file_list) + + server.sendOk(file_list) + return True + +def do_quit(server): + server.finish = 1 + media.stop() + server.sendOk() + return True + + +mapping = { + "SETUP": do_setup, + "PLAY": do_play, + "STOP": do_stop, + "LIST": do_list, + "QUIT": do_quit, + } + + +def dispatch(server, msg): + pieces = msg.split() + if len(pieces) < 1: + log.error("Invalid client command format: %r" % msg) + server.sendNotOk("Invalid Format") + return False + + cmd = pieces[0] + f = mapping.get(cmd, None) + if not f: + log.error("Unknow command: %r" % msg) + server.sendNotOk("Unknow Command") + return False + + try: + return f(server, *pieces[1:]) + except Exception, e: + log.error("Could not execute %r: %s" % (msg, e)) + server.sendNotOk(str(e)) + return False + + + +while not server.finish: + conn, client, port = server.getRequest() + if nextport == 0: + nextport = port + + while not server.finish: + msg = server.getMsg() + if not msg: + break + + log.info("Client %s sent command: %r" % (client, msg)) + dispatch(server, msg) + + log.info("Closing connection with %s" % (client,)) + server.disconnect_client(conn) + try: + os.wait() + except Exception, e: + log.error(e) + +server.stop() +log.info("Server stopped. Closing...") + diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/comm/tcp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/comm/tcp.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,79 @@ +import lib +import time +import socket +import logging as log + +class Server(object): + + def __init__(self, config): + self.host = '0.0.0.0' + self.port = int(config.get("Comm", "port")) + self.finish = 0 + + addr = (self.host, self.port) + log.debug("Setup TCP server at %s:%s" % addr) + self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.tcp.bind(addr) + self.tcp.listen(1) + log.info("TCP server listening at %s:%s (sock=%d)" % + (self.host, self.port, self.tcp.fileno())) + + def getMsg(self): + bytes = [] + try: + while 1: + c = self.con.recv(1) + bytes.append(c) + if not c or c == "\n": + break + except Exception, e: + log.error("Error reading message from client: %s" % e) + return None + + if not bytes or bytes[-1] != "\n": + msg = "".join(bytes) + log.error("Invalid message from client: %r" % msg) + return None + + # remove \n and \r + bytes.pop() + if bytes[-1] == "\r": + bytes.pop() + + msg = "".join(bytes) + log.debug("RECV: %r" % msg) + return msg + + def sendMsg(self, msg): + log.debug("SEND: %r" % msg) + self.con.send(msg + "\n") + + def sendOk(self, payload=None): + self.sendMsg("OK %d" % bool(payload is not None)) + if payload is not None: + if not isinstance(payload, (tuple, list)): + payload = (payload,) + for e in payload: + self.sendMsg("+%s" % e) + self.sendMsg(".") + + def sendNotOk(self, reason=""): + self.sendMsg("NOTOK %r" % reason) + + def getRequest(self): + log.debug("Wait for client request at %s:%s (sock=%d)" % + (self.host, self.port, self.tcp.fileno())) + self.con, self.client = self.tcp.accept() + log.info("Incoming request from %s (con=%s)" % + (self.client, self.con.fileno())) + return (self.con, self.client, self.port) + + def disconnect_client(self, connection): + log.info("Closed request from %s (con=%s)" % + (self.client, self.con.fileno())) + connection.close() + + def stop(self): + log.debug("Stop") + self.tcp.close() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/comm/xmlrpc.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/comm/xmlrpc.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,102 @@ +import lib +import SimpleXMLRPCServer + + +class Handler: + + def __init__(self, recv_pool, send_pool): + self.recv_pool = recv_pool + self.send_pool = send_pool + self.getMsg = self.sendMsg + + def _listMethods(self): + return ['setup', 'play', 'stop', 'close', 'getMsg'] + + def _methodHelp(self, method): + + if method == 'setup': + return "Setup the Media: setup( filename, mux, vcodec, vbitrate,"\ + " fps, acodec, abitrate, width, height, port, options" + elif method == 'play': + return "Play the Media: play()" + elif method == 'stop': + return "Stop the Media: stop()" + elif method == 'close': + return "Close the connection: close()" + elif method == 'getMsg': + return "Return the first message in the pool: getMsg()" + else: + # By convention, return empty + # string if no help is available + return "" + + def setup(self, filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port, options): + + msg = "%s %s %s %s %s %s %s" % (filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port) + + if len(options) > 0: + for opt in options: + msg += " %s" % opt + + self.recv_pool.append("SETUP") + self.recv_pool.append(msg) + return self.sendMsg() + + def play(self): + self.recv_pool.append("PLAY") + return self.sendMsg() + + def stop(self): + self.recv_pool.append("STOP") + return self.sendMsg() + + def close(self): + self.recv_pool.append("CLOSE") + return self.sendMsg() + + def sendMsg(self): + if self.send_pool != []: + return self.send_pool.pop(0) + else: + return "" + + +class Server: + + def __init__(self, config): + self.host = 'localhost' + self.port = int(config.get("Comm", "port")) + self.finish = 0 + self.recv_pool = [] + self.send_pool = [] + + self.handler = Handler(self.recv_pool, self.send_pool) + + self.xmlrpc = SimpleXMLRPCServer.SimpleXMLRPCServer((self.host, self.port)) + self.xmlrpc.register_instance(self.handler) + + + def getMsg(self, size): + if self.recv_pool != []: + return self.recv_pool.pop(0) + else: + return "" + + def sendMsg(self, msg): + self.send_pool.append(msg) + + def Ack(self, command): + msg = "[%s] Command %s received" % (lib.now(), command) + self.sendMsg(msg + "\n") + + def getRequest(self): + self.xmlrpc.handle_request() + return (0, "RPC Client") + + def disconnect_client(self, connection): + connection = 0 + + def stop(self): + self.xmlrpc.server_close() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/ffmpeg.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/media/ffmpeg.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,91 @@ +import os +import sys +import lib +import time +import socket +import ConfigParser + +class Media: + + def __init__(self, config): + + self.config = config + self.socket = None + self.child_pid = None + + def setup(self, filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port): + + self.filename = filename + self.mux = mux + self.vcodec = vcodec + self.vbitrate = int(vbitrate) + self.fps = int(fps) + self.acodec = acodec + self.abitrate = int(abitrate) + self.width = int(width) + self.height = int(height) + + self.port = int(port) + + # good one: /tmp/mpg/cpm.mpg mpeg mpeg1video 400 25 mp2 192 320 240 5000 + self.path = self.config.get("FFmpeg", "path") + self.path += " -i %s -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % ( + self.filename, self.mux, self.vcodec, self.vbitrate,\ + self.fps, self.acodec, self.abitrate, self.width, self.height) + + if (self.socket != None): + del(self.socket) + + self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind( ('', self.port) ) + self.socket.settimeout(10) + self.socket.listen(1) + + def play(self): + + lib.log("Starting FFmpeg: %s" % self.path) + + # exec FFmpeg and get stdout + child_stdin, child_stdout = os.popen2(self.path) + child_stdin.close() + + self.child_pid = os.fork() + if (self.child_pid == 0): + #child + + conn,addr= self.socket.accept() + lib.log("Sending Data to client: %s" % addr[0]) + data = child_stdout.read(1024) + conn.settimeout(5) + retry = 0 + + while( data != "" and retry < 5): + try: + conn.send(data) + except socket.error: + lib.log("Socket error (maybe timeout ?)") + retry = retry + 1 + + data = child_stdout.read(1024) + + if (retry < 5): + lib.log("Finished sending Data to client: %s" % addr[0]) + else: + lib.log("Client timed out") + + child_stdout.close() + #conn.close() + #sys.exit() + + + def stop(self): + + if (self.socket != None): + lib.log("Closing socket") + self.socket.close() + + lib.log("Trying to stop FFmpeg process") + if (self.child_pid != None): + os.kill(self.child_pid, 9) diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/gstreamer-rtp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/media/gstreamer-rtp.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,218 @@ +import pygst +pygst.require("0.10") +import gst +import gobject + +class Media: + class StreamData: + stream_count = 0 + + def __init__ (self, pipe, abin, vbin): + + self.stream_count += 1 + self.Id = self.stream_count + self.Pipe = pipe + self.Abin = abin + self.Vbin = vbin + self.Loop = gobject.MainLoop() + self.ACaps = "" + self.VCaps = "" + self.Ready = False + + + def __init__(self, config): + # set gstreamer basic options + self.config = config + self.pipe = None + self.streams = [] + + + def setup(self, filename, mux, vcodec, vbitrate, + fps, acodec, abitrate, width, height, port, options): + + ## Pipelines + self.pipe = gst.Pipeline () + uri = "file://" + filename + print "Opening Uri:" + uri + src = gst.element_make_from_uri (gst.URI_SRC, uri, "src") + if (src is None): + return None + + decode = gst.element_factory_make ("decodebin", "decode") + if (decode is None): + return None + + + #video encode + #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000 + vbin = gst.Bin () + vqueue = gst.element_factory_make ("queue", "vqueue") + vscale = gst.element_factory_make ("videoscale", "vscale") + vrate = gst.element_factory_make ("videorate", "vrate") + vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode") + vpay = gst.element_factory_make ("rtpmp4vpay", "vpay") + vsink = gst.element_factory_make ("udpsink", "vsink") + + if (None in [vbin, vqueue, vscale, vrate, vencode, vpay, vsink]): + print "Fail to create video encode elements." + return None + + vscale_pad = vscale.get_pad("sink") + if (vscale_pad is None): + print "Fail to get vscale sink pad." + return None + + vscale_caps = gst.caps_from_string ("video/x-raw-yuv, width=%s, height=%s" % (width, height)) + if (vscale_caps is None): + print "Fail to create video caps" + return None + + if (not vscale_pad.set_caps (vscale_caps)): + print "Fail to set video output caps" + return None + + vencode.set_property ("bitrate", 256000) + vencode.set_property ("me-method", 2) + + vsink.set_property ("host", "224.0.0.1") + vsink.set_property ("port", 5000) + + vbin.add (vqueue, vscale, vrate, vencode, vpay, vsink) + if (not gst.element_link_many (vqueue, vscale, vrate, vencode, vpay, vsink)): + print "Fail to link video elements" + return None + + vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink"))) + + #audio encode + #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002 + abin = gst.Bin () + aqueue = gst.element_factory_make ("queue", "vqueue") + aconvert = gst.element_factory_make ("audioconvert", "aconvert") + aencode = gst.element_factory_make ("faac", "aencode") + apay = gst.element_factory_make ("rtpmp4gpay", "apay") + asink = gst.element_factory_make ("udpsink", "asink") + + if (None in [abin, aqueue, aconvert, aencode, apay, asink]): + print "Fail to create video encode elements." + return None + + asink.set_property ("host", "224.0.0.1") + asink.set_property ("port", 5002) + + abin.add (aqueue, aconvert, aencode, apay, asink) + if (not gst.element_link_many (aqueue, aconvert, aencode, apay, asink)): + print "Fail to link video elements" + return None + + abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink"))) + + self.pipe.add (src, decode, abin, vbin) + gst.element_link_many (src, decode) + + stream_data = self.StreamData (self.pipe, abin, vbin) + + bus = self.pipe.get_bus() + bus.add_signal_watch() + bus.connect("message", self.__on_bus_message, stream_data) + + decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data) + decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data) + + + self.pipe.set_state (gst.STATE_PAUSED) + print "Running Pipe" + stream_data.Loop.run () + print "End run" + + a_caps = stream_data.ACaps + v_caps = stream_data.VCaps + stream_id = stream_data.Id + + self.streams.append (stream_data) + + def play(self): + + print "Trying to play pipeline: %s" % self.pipe + try: + if (self.pipe): + self.pipe.set_state(gst.STATE_PLAYING) + except gobject.GError, e: + print "Error: " + str(e) + + + def stop(self): + + print "Trying to stop pipeline: %s" % self.pipe + try: + if (self.pipeline): + self.pipeline.set_state(gst.STATE_NULL) + except gobject.GError, e: + print "Error: " + str(e) + + def __on_bus_message (self, bus, message, stream_data): + + t = message.type + if (t == gst.MESSAGE_STATE_CHANGED): + oldstate = -1 + newstate = -1 + pending = -1 + oldstate, newstate, pending = message.parse_state_changed () + if ((oldstate == gst.STATE_READY) and \ + (newstate == gst.STATE_PAUSED) and \ + (pending == gst.STATE_VOID_PENDING) and \ + (stream_data.Ready == False)): + state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () + if ((current_state == gst.STATE_PAUSED) and \ + (pending_state == gst.STATE_VOID_PENDING)): + print "Pipe paused" + self.__fill_sink_pads (stream_data) + stream_data.Loop.quit () + stream_data.Ready = True + elif (t == gst.MESSAGE_ERROR): + err, debug = message.parse_error() + print "Error: %s" % err, debug + stream_data.Loop.quit () + stream_data.Ready = False + + return True + + + def __fill_sink_pads (self, stream_data): + + asink = stream_data.Abin.get_by_name ("asink") + vsink = stream_data.Vbin.get_by_name ("vsink") + + asink_pad = asink.get_pad ("sink") + stream_data.ACaps = asink_pad.get_negotiated_caps().to_string() + print "ACAPS " + stream_data.ACaps + + vsink_pad = vsink.get_pad ("sink") + stream_data.VCaps = vsink_pad.get_negotiated_caps().to_string() + print "ACAPS " + stream_data.VCaps + + + + def __on_decode_unknown_type (self, decode, pad, caps, stream_data): + + print "Unknown Type" + return None + + def __on_decode_new_pad (self, decode, pad, arg1, stream_data): + + caps = pad.get_caps().to_string() + print "New pad " + caps + if (caps.rfind ("audio") != -1): + apad = stream_data.Abin.get_pad ("sink") + if (pad.link (apad) != gst.PAD_LINK_OK): + print "Error on link audio pad" + return None + elif (caps.rfind ("video") != -1): + vpad = stream_data.Vbin.get_pad ("sink") + if (pad.link (vpad) != gst.PAD_LINK_OK): + print "Error on link video pad" + return None + else: + print "Invalid caps" + + diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/gstreamer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/media/gstreamer.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,290 @@ +#vim:ts=4:sw=4:et +import pygst +pygst.require("0.10") +import gst +import gobject +import socket +import time +from threading import Thread + +class Media: + class StreamListener(Thread): + def __init__ (self, stream_data): + Thread.__init__(self) + self.stream = stream_data + print "Thread Created" + + def run (self): + #Create socket + print "Waiting connection" + self.stream.Socket.listen(1) + self.stream.Connection, self.stream.Addr = self.stream.Socket.accept () + print "Connection requested" + self.stream.Sink.set_property ("fd", self.stream.Connection.fileno()) + self.stream.Pipe.set_state(gst.STATE_PLAYING) + print "PLAYING" + + + class StreamData: + stream_count = 0 + + def __init__ (self, pipe, abin, vbin, sink): + self.stream_count += 1 + self.Id = self.stream_count + self.Pipe = pipe + self.Abin = abin + self.Vbin = vbin + self.Sink = sink + self.Loop = gobject.MainLoop() + self.ACaps = "" + self.VCaps = "" + self.Ready = False + self.Socket = None + self.Connection = None + self.Addr = None + + def __init__(self, config): + # set gstreamer basic options + self.config = config + self.streams = [] + self.socket = None + self.connection = None + self.addr = None + self.ready = False + self.current = None + + + def setup(self, uri, mux, vcodec, vbitrate, + fps, acodec, abitrate, width, height, port, options): + + ## Pipelines + pipe = gst.Pipeline () + print "Opening Uri:" + uri + src = gst.element_make_from_uri (gst.URI_SRC, uri, "src") + #src = gst.element_factory_make ("gnomevfssrc", "src") + src.set_property ("location", uri) + if (src is None): + print "Fail to create src element" + return None + + print ("Create source") + decode = gst.element_factory_make ("decodebin", "decode") + if (decode is None): + print "Fail to create decodebin" + return None + + print ("Create source") + mux = gst.element_factory_make ("avimux", "mux") + if (mux is None): + print "Fail to create mux" + return None + + sink = gst.element_factory_make ("fdsink", "sink") + if (sink is None): + print "Fail to create fdsink" + return None + + print ("Create source") + + #video encode + #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000 + vbin = gst.Bin () + vqueue = gst.element_factory_make ("queue", "vqueue") + colorspace = gst.element_factory_make ("ffmpegcolorspace", "") + vrate = gst.element_factory_make ("videorate", "vrate") + vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode") + #vencode = gst.element_factory_make ("ffenc_msmpeg4v1", "vencode") + vqueue_src = gst.element_factory_make ("queue", "vqueue_src") + + #if (int(vbitrate) > 0): + vencode.set_property ("bitrate", 200) + #vencode.set_property ("quant-type", 1) + vencode.set_property ("pass", 2) + vencode.set_property ("quantizer", 5) + #vencode.set_property ("me-method", 1) + + + if (None in [vbin, vqueue, vrate, vencode, vqueue_src]): + print "Fail to create video encode elements." + return None + + vbin.add (vqueue) + if ((int(width) > 0) and (int(height) > 0)): + print ("formating output to %d / %d" % (int(width), int(height))) + + vscale = gst.element_factory_make ("ffvideoscale", "vscale") + + vbin.add (vscale); + if (not vqueue.link (vscale)): + print "Fail to link video elements" + return None + + vbin.add (colorspace) + + if (not vscale.link (colorspace, \ + gst.caps_from_string ("video/x-raw-yuv,width=(int)%d,height=(int)%d" % (int(width), int(height))))): + print "Fail to link video elements" + return None + else: + vbin.add (colorspace) + vqueue.link (colorspace) + + vbin.add (vrate, vencode, vqueue_src) + if (not colorspace.link (vrate)): + print "Fail to colorspace with vrate" + return None + + + if (not vrate.link (vencode, \ + gst.caps_from_string ("video/x-raw-yuv,framerate=(fraction)10/1"))): + print "Fail to link vrate element" + return None + + if (not vencode.link (vqueue_src)): + print "Fail to link video encode with queue" + return None + + vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink"))) + vbin.add_pad (gst.GhostPad ("src", vqueue_src.get_pad ("src"))) + + #audio encode + #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002 + abin = gst.Bin () + aqueue = gst.element_factory_make ("queue", "aqueue") + aconvert = gst.element_factory_make ("audioconvert", "aconvert") + arate = gst.element_factory_make ("audioresample", "arate") + #aencode = gst.element_factory_make ("ffenc_ac3", "aencode") + aencode = gst.element_factory_make ("queue", "aencode") + #aencode = gst.element_factory_make ("lame", "aencode") + #aencode = gst.element_factory_make ("ffenc_mp2", "aencode") + aqueue_src = gst.element_factory_make ("queue", "aqueue_src") + + if (None in [abin, aqueue, arate, aencode, aqueue_src]): + print "Fail to create video encode elements." + return None + + abin.add (aqueue, aconvert, arate, aencode, aqueue_src) + if (not gst.element_link_many (aqueue, aconvert, arate, aencode, aqueue_src)): + print "Fail to link video elements" + return None + + abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink"))) + abin.add_pad (gst.GhostPad ("src", aqueue_src.get_pad ("src"))) + + #Finish Pipeline + pipe.add (src, decode, abin, vbin, mux, sink) + gst.element_link_many (src, decode) + gst.element_link_many (mux, sink) + + #Linking decode with mux + mux_audio = mux.get_pad ("audio_0") + mux_video = mux.get_pad ("video_0") + + audio_pad = abin.get_pad ("src") + video_pad = vbin.get_pad ("src") + + if (audio_pad.link (mux_audio) != gst.PAD_LINK_OK): + print "Fail to link audio with mux" + return None + + if (video_pad.link (mux_video) != gst.PAD_LINK_OK): + print "Fail to link audio with mux" + return None + + stream_data = self.StreamData (pipe, abin, vbin, sink) + bus = pipe.get_bus() + bus.add_signal_watch() + bus.connect ("message", self.__on_bus_message, stream_data) + + decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data) + decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data) + + print ("Create source") + pipe.set_state (gst.STATE_PAUSED) + print "Running Pipe" + stream_data.Loop.run () + print "End run" + + + #Create socket + stream_data.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + print "Bind on port %d" % port + stream_data.Socket.bind(('', int (port))) + self.streams.append (stream_data) + return (True, "") + + def play(self): + print "Play" + stream = self.streams[0] + self.current = self.StreamListener(stream) + self.current.start () + time.sleep (1) + return (True, "") + + def stop(self): + self.current.join () + self.current = None + stream = self.streams[0] + stream.Pipe.set_state(gst.STATE_NULL) + del (stream.Pipe) + stream.Pipe = None + stream.Abin = None + stream.Vbin = None + stream.Sink = None + if (stream.Connection != None): + stream.Connection.close () + + self.streams = [] + time.sleep (5) + return (True, "") + + + def __on_bus_message (self, bus, message, stream_data): + + t = message.type + if (t == gst.MESSAGE_STATE_CHANGED): + oldstate = -1 + newstate = -1 + pending = -1 + oldstate, newstate, pending = message.parse_state_changed () + if ((oldstate == gst.STATE_READY) and \ + (newstate == gst.STATE_PAUSED) and \ + (pending == gst.STATE_VOID_PENDING) and \ + (stream_data.Ready == False)): + state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () + if ((current_state == gst.STATE_PAUSED) and \ + (pending_state == gst.STATE_VOID_PENDING)): + print "Pipe paused" + stream_data.Loop.quit () + stream_data.Ready = True + elif (t == gst.MESSAGE_ERROR): + err, debug = message.parse_error() + print "Error: %s" % err, debug + stream_data.Loop.quit () + stream_data.Ready = False + + return True + + def __on_decode_unknown_type (self, decode, pad, caps, stream_data): + + print "Unknown Type" + return None + + def __on_decode_new_pad (self, decode, pad, arg1, stream_data): + + caps = pad.get_caps().to_string() + print "New pad " + caps + if (caps.rfind ("audio") != -1): + apad = stream_data.Abin.get_pad ("sink") + if (pad.link (apad) != gst.PAD_LINK_OK): + print "Error on link audio pad" + return None + elif (caps.rfind ("video") != -1): + vpad = stream_data.Vbin.get_pad ("sink") + if (pad.link (vpad) != gst.PAD_LINK_OK): + print "Error on link video pad" + return None + else: + print "Invalid caps" + print "Linked" + diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/mencoder.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/media/mencoder.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,411 @@ +from __future__ import division + +import os +import sys +import lib +import time +import shlex +import signal +import socket +import ConfigParser +import logging as log + +from select import * +from subprocess import * + +class Media(object): + + def __init__(self, config): + + self.config = config + self.do_cleanup() + + # __init__() + + + def do_cleanup(self): + self.path = "" + self.args = [] + self.language = None + self.subtitle = None + self.mpegopts = None + self.socket = None + self.child_pid = None + self.mplayer = None + self.mencoder_pid = None + self.mplayer_pid = None + self.audio_opts = None + self.video_opts = None + self.gst_pipe = None + self.gst_pid = None + self.transcode_local = None + + # do_cleanup() + + + def setup_opts(self, options): + + for opt in options: + + if opt == "local": + self.mplayer = lib.which("mplayer") + + elif opt.find("language=") >= 0: + try: + lan = opt.split("=")[1] + if len(lan) < 2: + self.language = lan + except Exception, e: + log.error("Bad language option: %s" % opt) + + elif opt.find("subtitle=") >= 0: + try: + sub = opt.split("=")[1] + if len(sub) < 2: + self.language = sub + except Exception, e: + log.error("Bad subtitle option: %s" % opt) + + elif opt.find("format=") >= 0: + try: + self.mpegopts = opt.split("=")[1] + except Exception, e: + log.error("Bad format option: %s" % opt) + + elif opt.find("outfile=") >= 0: + try: + self.transcode_local = opt.split("=")[1] + except Exception, e: + log.error("Bad outfile option: %s" % opt) + + # setup_opts() + + + def run_mplayer(self): + msg = self.filename + + if self.kind == "dvd": + msg = "dvd://" + msg + + self.mplayer_pid = Popen([self.mplayer, self.filename, "1> %s" % os.devnull,\ + "2> %s" % os.devnull], stdout=PIPE, close_fds=True) + + # run_mplayer() + + + def setup_mencoder(self): + self.path = self.config.get("Mencoder", "path") + mp = Popen([self.path], stdout=PIPE, close_fds=True) + + version = mp.stdout.read().split("MEncoder ")[1].split(" (C)")[0].split("-")[-1] + + if version > "4.1.1": self.mencoder_old = False + else: self.mencoder_old = True + + os.kill(mp.pid, signal.SIGKILL) + log.info("Mencoder version: %s" % version) + + if self.mencoder_old: + try: + self.fifo = self.config.get("Mencoder", "fifo_path") + os.mkfifo(self.fifo) + except Exception, e: + log.info("Fifo: %s" % e) + else: + self.fifo = "-" + + # setup_mencoder() + + + def setup_audio(self): + + if self.acodec == "mp3lame": + return "-oac mp3lame -lameopts cbr:br=%s vol=5" % self.abitrate + else: + return "-oac lavc -lavcopts acodec=%s:abitrate=%s" % (\ + self.acodec, self.abitrate) + + # setup_audio() + + + def setup_video(self): + + video = "" + + video += " -of %s" % self.mux + video += " -ofps %s" % self.fps + + if self.vcodec == "nuv" or self.vcodec == "xvid"\ + or self.vcodec == "qtvideo" or self.vcodec == "copy": + video += " -ovc %s" % self.vcodec + else: + video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % ( + self.vcodec, self.vbitrate) + + if self.mux == "mpeg" and self.mpegopts is not None: + video += " -mpegopts format=%s" % self.mpegopts + + video += " -vf scale=%s:%s" % (self.width, self.height) + + return video + + # setup_video() + + + def arg_append(self, args, options): + l = shlex.split(options) + for i in l: + args.append(i) + + # arg_append() + + + def setup_args(self, args): + + args.append(self.path) + + #args.append(self.filename) + args.append("-") + + if self.language != None: + self.arg_append(args, "-alang %s" % self.language) + + if self.subtitle != None: + self.arg_append(args, "-slang %s" % self.subtitle) + self.arg_append(args, "-subfps %s" % self.fps) + + self.arg_append(args, "-idx") + self.arg_append(args, self.audio_opts) + self.arg_append(args, self.video_opts) + + self.arg_append(args, "-really-quiet") + self.arg_append(args, "-o %s" % self.fifo) + self.arg_append(args, "2> %s" % os.devnull) + + # setup_args() + + + def setup_filename(self, filename): + try: + self.kind, self.filename = filename.split("://") + except: + return (False, "Wrong filename protocol") + + if self.kind == "file": + if not os.path.exists(self.filename): + msg = "File requested does not exist. SETUP failed." + log.error(msg) + return (False, msg) + + elif self.kind == "dvd": + self.filename = "dvd://" + filename + + elif self.kind == "myth": + self.filename = filename + self.gst_pipe = os.pipe() + print self.gst_pipe[0] + print self.gst_pipe[1] + + return (True, "") + + # setup_filename() + + + def setup_socket(self): + if self.socket != None: + self.socket = None + + self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + try: + self.socket.bind( ('', self.port) ) + self.socket.listen(1) + except Exception, e: + log.error("Could not create socket: %s" % e) + return (False, e) + + return (True, "") + + # setup_socket() + + + ''' + MENCODER SETUP DESCRIPTION + =========================== + + -> mux, vcodecs and acodecs + |-> mencoder (-of | -ovc | -oac) help + + -> if used mpeg as mux: + |-> to setup format: format=%s as an option at the end + + ''' + + + # good one: /tmp/dvb.mpg avi mpeg4 400 25 mp3lame 192 320 240 + # file:///tmp/dvb.mpg mpeg mpeg1video 400 25 mp2 192 320 240 format=mpeg1 + # dvd://4 mpeg mpeg1video 400 25 mp3lame 192 400 240 language=en local + # file:///tmp/mpg/bad_day.mpg avi mpeg4 400 25 mp3 192 320 240 + + def setup(self, filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port, options): + + if self.args != []: + self.do_cleanup() + + self.mux = mux + self.vcodec = vcodec + self.vbitrate = vbitrate + self.fps = fps + self.acodec = acodec + self.abitrate = abitrate + self.width = width + self.height = height + self.port = int(port) + + self.setup_mencoder() + + ret_val = self.setup_filename(filename) + + if not ret_val[0]: + return ret_val + + self.setup_opts(options) + self.audio_opts = self.setup_audio() + self.video_opts = self.setup_video() + self.setup_args(self.args) + + ret_val = self.setup_socket() + return ret_val + + # setup() + + def play_loop(self, conn): + data = self.pout.read(4096) + + conn.settimeout(5) + retry = 0 + + if not self.transcode_local: + while data != "" and retry < 5: + try: + conn.send(data) + r, w, x = select([conn], [], [], 0) + if conn in r: + back = conn.recv(1024) + if back == "OK" and self.mplayer and not self.mplayer_pid: + self.run_mplayer() + + except socket.error, e: + log.error("Socket error: %s" % e) + retry += 1 + + data = self.pout.read(4096) + + else: + local = open(self.transcode_local, "w") + total = os.path.getsize(self.filename) + partial = 4096 + + while data != "": + try: + local.write(data) + except Exception, e: + log.error("Write error: %s" % e) + + data = self.pout.read(4096) + partial += len(data) + conn.send("%.2f\n" % (partial * 100 / total) ) + + local.close() + conn.send("DONE\n") + + return retry + + # play_loop() + + + def play(self): + + if self.gst_pipe: + try: + gst = [ lib.which("gst-launch-0.10"), "--gst-debug-level=0" ] + self.arg_append(gst, "mythtvsrc location=%s" % self.filename) + self.arg_append(gst, "! fdsink fd=2") + self.gst_pid = Popen(gst, close_fds=True) + log.info("Running Gstreamer: %s" % gst); + except Exception, e: + msg = "Could not init Gstreamer: %s" % e + log.error(msg) + return (False, msg) + + + log.info("Starting Mencoder: %s" % self.args ) + try: + if not self.gst_pipe: + self.stdin = open(self.filename) + else: + self.stdin = self.gst_pid.stdout + + self.mencoder_pid = Popen(self.args, stdin=self.stdin, stdout=PIPE, close_fds=True) + except Exception, e: + msg = "Could not init Mencoder: %s" % e + log.error(msg) + return (False, msg) + + if self.mencoder_old: self.pout = open(self.fifo) + else: self.pout = self.mencoder_pid.stdout + + self.child_pid = os.fork() + + if self.child_pid == 0: + conn, addr = self.socket.accept() + + log.info("Sending Data to client: %s" % addr[0]) + retry = self.play_loop(conn) + + if retry < 5: + log.info("Finished sending Data to client: %s" % addr[0]) + else: + log.error("Client timed out, retried more than %s times" % retry) + + os.kill(self.mencoder_pid.pid, signal.SIGKILL) + sys.exit(0) + + return (True, "") + + # play() + + + def stop(self): + try: + + if self.mencoder_pid: + os.kill(self.mencoder_pid.pid, signal.SIGTERM) + self.mencoder_pid = None + + if self.mplayer_pid: + os.kill(self.mplayer_pid.pid, signal.SIGTERM) + self.mplayer_pid = None + + if self.socket: + self.socket.close() + self.socket = None + + if self.child_pid: + os.kill(self.child_pid, signal.SIGTERM) + self.child_pid = None + + if self.gst_pid: + os.kill(self.gst_pid.pid, signal.SIGTERM) + self.gst_pid = None + + self.do_cleanup() + + os.wait() + + except Exception, e: + log.error("Stop error: %s" % e) + + # stop() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/vlc.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/plugins/media/vlc.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,80 @@ +import os +import sys +import time +import socket +import ConfigParser + +class Media: + + def __init__(self, config): + + self.config = config + self.pipe = "" + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + self.path = config.get("Vlc", "path") + self.host = config.get("Vlc", "host") + self.port = int(config.get("Vlc", "port")) + self.pwd = config.get("Vlc", "pwd") + + # exec VLC + pid = os.fork() + if (pid == 0): + #child + print "ESTOU EM CHILD" + self.path += " -I telnet -d 1> /dev/null 2> /dev/null &" + os.system(self.path) + sys.exit(0) + else: + print "ESTOU EM PARENT 1" + time.sleep(3) + print "ESTOU EM PARENT 2" + self.sock.connect( (self.host, self.port) ) + self.sock.send("%s\n" % self.pwd) + + + def insert_file(self, filename): + + self.sock.send("setup output0 input %s\n" % filename) + + + + def setup(self, filename, mux, vcodec, vbitrate,\ + fps, acodec, abitrate, width, height, port): + + self.filename = filename + self.mux = mux + self.vcodec = vcodec + self.vbitrate = int(vbitrate) + self.fps = int(fps) + self.acodec = acodec + self.abitrate = int(abitrate) + self.width = int(width) + self.height = int(height) + + self.port = int(port) + + + self.pipe = "#transcode{vcodec=%s,vb=%d,"\ + "fps=25.0,scale=1,acodec=mpga,"\ + "ab=64,channels=1,width=%d,height=%d}"\ + ":duplicate{dst=std{access=http,"\ + "mux=mpeg1,dst=:%d}}" % (self.vcodec, self.vbitrate,\ + self.widht, self.height,\ + self.port) + + self.sock.send("setup output0 broadcast %s\n" % self.pipe) + self.insert_file(self.filename) + + def play(self): + + print "Trying to play: %s" % self.pipe + self.sock.send("control output0 play\n") + + + def stop(self): + + print "Trying to stop: %s" % self.pipe + self.sock.send("control output0 stop\n") + + diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/stream.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/stream.conf Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,23 @@ +[Comm] +engine = tcp +port = 50000 + + +[Media] +engine = mencoder + + +[Vlc] +path = /usr/local/bin/vlc +host = 127.0.0.1 +port = 4212 +pwd = admin + + +[FFmpeg] +path = /usr/local/bin/ffmpeg + + +[Mencoder] +path = /usr/local/bin/mencoder +fifo_path = /tmp/teste diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/tests/client.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.1/tests/client.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,51 @@ +import os +import sys +import time +import socket + + +if len(sys.argv) < 2: + HOST = 'localhost' + PORT = 50000 +elif len(sys.argv) == 2: + HOST = sys.argv[1] + PORT = 50000 +else: + HOST = sys.argv[1] + PORT = int(sys.argv[2]) + +socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) +socket.settimeout(10) + +try: + socket.connect( (HOST,PORT) ) +except: + print "\n--> Could not connect to ('%s':'%d')\n" % (HOST,PORT) + sys.exit(-1) + + +mplayer = os.popen("which mplayer").read().strip() +mplayer += " -idx - -vo x11 1> /dev/null" +pin, pout = os.popen2(mplayer) + +#teste = open("teste.avi", "w") + +data = socket.recv(4096) +i = 0 + +while (data != ""): + pin.write(data) + #teste.write(data) + data = socket.recv(4096) + #if (i == 500): + # socket.send("OK") + i += 1 + +pin.close() +socket.close() +#teste.close() + +# from select import select +# r, w, x = select([pout], []. [], 0) +# if pout in r: +# pout.read(32) diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/gms.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.2/gms.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +import sys +import os +import logging as log +from lib.server import serve_forever, load_plugins_transcoders + +log_level = log.INFO +for p in sys.argv[1:]: + if p == "-v" or p == "--verbose": + log_level -= 10 + +log.basicConfig(level=log_level, + format=("### %(asctime)s %(name)-18s %(levelname)-8s " + "\t%(message)s"), + datefmt="%Y-%m-%d %H:%M:%S") + +pd = os.path.join("plugins", "transcoders") +load_plugins_transcoders(pd) +serve_forever() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/lib/server.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.2/lib/server.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,464 @@ +#!/usr/bin/env python + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.3" + +import os +import threading +import SocketServer +import BaseHTTPServer +import socket +import urlparse +import cgi +import lib.utils as utils +import logging as log + +__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever", + "load_plugins_transcoders") + +class Transcoder(object): + log = log.getLogger("gmyth-stream.transcoder") + priority = 0 # negative values have higher priorities + name = None # to be used in requests + + def __init__(self, params): + self.params = params + # __init__() + + + def params_first(self, key, default=None): + if default is None: + return self.params[key][0] + else: + try: + return self.params[key][0] + except: + return default + # params_first() + + + def get_mimetype(self): + mux = self.params_first("mux", "mpg") + + if mux == "mpeg": + return "video/mpeg" + elif mux == "avi": + return "video/x-msvideo" + else: + return "application/octet-stream" + # get_mimetype() + + + def start(self, outfile): + return True + # start() + + + def stop(self): + return Tru + # stop() + + + def __str__(self): + return '%s("%s", mux="%s", params=%s)' % \ + (self.__class__.__name__, + self.params_first("uri", "None"), + self.params_first("mux", "mpg"), + self.params) + # __str__() +# Transcoder + + + +class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + log = log.getLogger("gmyth-stream.request") + def_transcoder = None + transcoders = utils.PluginSet(Transcoder) + + @classmethod + def load_plugins_transcoders(cls, directory): + cls.transcoders.load_from_directory(directory) + + if cls.def_transcoder is None and cls.transcoders: + cls.def_transcoder = cls.transcoders[0].name + # load_plugins_transcoders() + + + def do_dispatch(self, body): + self.url = self.path + + pieces = urlparse.urlparse(self.path) + self.path = pieces[2] + self.query = cgi.parse_qs(pieces[4]) + + if self.path == "/": + self.serve_main(body) + elif self.path == "/shutdown.do": + self.serve_shutdown(body) + elif self.path == "/stop-transcoder.do": + self.serve_stop_transcoder(body) + elif self.path == "/status.do": + self.serve_status(body) + elif self.path == "/play.do": + self.serve_play(body) + elif self.path == "/stream.do": + self.serve_stream(body) + else: + self.send_error(404, "File not found") + # do_dispatch() + + + def do_GET(self): + self.do_dispatch(True) + # do_GET() + + + def do_HEAD(self): + self.do_dispatch(False) + # do_HEAD() + + + def _nav_items(self): + self.wfile.write("""\ +
  • Play
  • +
  • Status
  • +
  • Stop transcoders
  • +
  • Shutdown Server
  • +""") + # _nav_items() + + + def serve_main(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server + +

    Welcome to Catota Server

    + + + +""") + # serve_main() + + + def serve_play(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server + +

    Play

    +
    + :// + +
    + + + +""") + # serve_play() + + + def serve_shutdown(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server Exited + +

    Catota is not running anymore

    + + +""") + self.server.server_close() + # serve_shutdown() + + + def serve_stop_all_transcoders(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.server.stop_transcoders() + self.wfile.write("""\ + + Catota Server Stopped Transcoders + +

    Catota stopped running transcoders

    + + + + """) + # serve_stop_all_transcoders() + + + def serve_stop_selected_transcoders(self, body, requests): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server Stopped Transcoders + +

    Catota stopped running transcoders:

    + + + + +""") + # serve_stop_selected_transcoders() + + + def serve_stop_transcoder(self, body): + req = self.query.get("request", None) + if req and "all" in req: + self.serve_stop_all_transcoders(body) + elif req: + self.serve_stop_selected_transcoders(body, req) + else: + self.serve_status(body) + # serve_stop_transcoder() + + + def serve_status(self, body): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header('Connection', 'close') + self.end_headers() + if body: + self.wfile.write("""\ + + Catota Server Status + +

    Catota Status

    +""") + tl = self.server.get_transcoders() + if not tl: + self.wfile.write("

    No running transcoder.

    \n") + else: + self.wfile.write("

    Running transcoders:

    \n") + self.wfile.write("""\ + + + + +""") + # serve_status() + + + def _get_transcoder(self): + # get transcoder option: mencoder is the default + request_transcoders = self.query.get("transcoder", ["mencoder"]) + + for t in request_transcoders: + transcoder = self.transcoders.get(t) + if transcoder: + return transcoder + + if not transcoder: + return self.transcoders[self.def_transcoder] + # _get_transcoder() + + + def serve_stream(self, body): + transcoder = self._get_transcoder() + try: + obj = transcoder(self.query) + except Exception, e: + self.send_error(500, str(e)) + return + + self.send_response(200) + self.send_header("Content-Type", obj.get_mimetype()) + self.send_header('Connection', 'close') + self.end_headers() + + if body: + self.server.add_transcoders(self, obj) + obj.start(self.wfile) + self.server.del_transcoders(self, obj) + # serve_stream() + + + def log_request(self, code='-', size='-'): + self.log.info('"%s" %s %s', self.requestline, str(code), str(size)) + # log_request() + + + def log_error(self, format, *args): + self.log.error("%s: %s" % (self.address_string(), format % args)) + # log_error() + + + def log_message(self, format, *args): + self.log.info("%s: %s" % (self.address_string(), format % args)) + # log_message() +# RequestHandler + + + +class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): + log = log.getLogger("gmyth-streamer.server") + run = True + _transcoders = {} + _lock = threading.RLock() + + def serve_forever(self): + self.log.info("GMyth-Streamer serving HTTP on %s:%s" % + self.socket.getsockname()) + try: + while self.run: + self.handle_request() + except KeyboardInterrupt, e: + pass + + self.log.debug("Stopping all remaining transcoders...") + self.stop_transcoders() + self.log.debug("Transcoders stopped!") + # serve_forever() + + + def get_request(self): + skt = self.socket + old = skt.gettimeout() + skt.settimeout(0.5) + while self.run: + try: + r = skt.accept() + skt.settimeout(old) + return r + except socket.timeout, e: + pass + raise socket.error("Not running") + # get_request() + + + def server_close(self): + self.run = False + self.stop_transcoders() + + BaseHTTPServer.HTTPServer.server_close(self) + # server_close() + + + def stop_transcoders(self): + self._lock.acquire() + for transcoder, request in self._transcoders.iteritems(): + self.log.info("Stop transcoder: %s, client=%s" % + (transcoder, request.client_address)) + transcoder.stop() + self._lock.release() + # stop_transcoders() + + + def get_transcoders(self): + self._lock.acquire() + try: + return self._transcoders.items() + finally: + self._lock.release() + # get_transcoders() + + + def add_transcoders(self, request, transcoder): + self._lock.acquire() + try: + self._transcoders[transcoder] = request + finally: + self._lock.release() + # add_transcoders() + + + def del_transcoders(self, request, transcoder): + self._lock.acquire() + try: + del self._transcoders[transcoder] + finally: + self._lock.release() + # del_transcoders() +# Server + + + +def serve_forever(host="0.0.0.0", port=40000): + addr = (host, port) + + RequestHandler.protocol_version = "HTTP/1.0" + httpd = Server(addr, RequestHandler) + httpd.serve_forever() +# serve_forever() + + +def load_plugins_transcoders(directory): + RequestHandler.load_plugins_transcoders(directory) +# load_plugins_transcoders() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/lib/utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.2/lib/utils.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,148 @@ +#!/usr/bin/env + +__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" +__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" +__license__ = "GPL" +__version__ = "0.3" + +import os +import stat +import sys +import logging +import urllib +import gobject +import imp + +log = logging.getLogger("gmyth-stream.utils") + +__all__ = ("which", "load_plugins", "PluginSet") + +def which(app): + """function to implement which(1) unix command""" + pl = os.environ["PATH"].split(os.pathsep) + for p in pl: + path = os.path.join(p, app) + if os.path.isfile(path): + st = os.stat(path) + if st[stat.ST_MODE] & 0111: + return path + return "" +# which() + + +def _load_module(pathlist, name): + fp, path, desc = imp.find_module(name, pathlist) + try: + module = imp.load_module(name, fp, path, desc) + return module + finally: + if fp: + fp.close() +# _load_module() + + +class PluginSet(object): + def __init__(self, basetype, *items): + self.basetype = basetype + self.map = {} + self.list = [] + + for i in items: + self._add(i) + self._sort() + # __init__() + + + def _add(self, item): + self.map[item.name] = item + self.list.append(item) + # _add() + + + def add(self, item): + self._add() + self._sort() + # add() + + + def __getitem__(self, spec): + if isinstance(spec, basestring): + return self.map[spec] + else: + return self.list[spec] + # __getitem__() + + + def get(self, name, default=None): + self.map.get(name, default) + # get() + + + def __iter__(self): + return self.list.__iter__() + # __iter__() + + + def __len__(self): + return len(self.list) + # __len__() + + + def _sort(self): + self.list.sort(lambda a, b: cmp(a.priority, b.priority)) + # _sort() + + + def update(self, pluginset): + self.map.update(pluginset.map) + self.list.extend(pluginset.list) + self._sort() + # update() + + + def load_from_directory(self, directory): + for i in load_plugins(directory, self.basetype): + self._add(i) + self._sort() + # load_from_directory() + + + def __str__(self): + lst = [] + for o in self.list: + lst.append('"%s" (%s)' % (o.name, o.__name__)) + + return "%s(basetype=%s, items=[%s])" % \ + (self.__class__.__name__, + self.basetype.__name__, + ", ".join(lst)) + # __str__() +# PluginSet + + +def load_plugins(directory, basetype): + tn = basetype.__name__ + log.debug("Loading plugins from %s, type=%s" % (directory, tn)) + + + plugins = [] + for d in os.listdir(directory): + if not d.endswith(".py"): + continue + + name = d[0: -3] + if name == "__init__": + continue + + directory.replace(os.path.sep, ".") + mod = _load_module([directory], name) + for sym in dir(mod): + cls = getattr(mod, sym) + if isinstance(cls, type) and issubclass(cls, basetype) and \ + cls != basetype: + plugins.append(cls) + log.info("Loaded %s (%s) from %s" % \ + (cls.__name__, tn, os.path.join(directory, d))) + + return plugins +# load_plugins() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/plugins/transcoders/gstreamer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.2/plugins/transcoders/gstreamer.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,305 @@ +#vim:ts=4:sw=4:et +import pygst +pygst.require("0.10") +import gst +import gobject +import time +import lib.utils as utils +import lib.server as server + +from threading import Thread + +__all__ = ("TranscoderGstreamer",) + +class TranscoderGstreamer(server.Transcoder): + gstreamer_path = utils.which("gst-launch-0.10") + name = "gstreamer" + priority = -2 + + # StreamListener() + + class StreamData: + stream_count = 0 + + def __init__(self, log, pipe, abin, vbin, sink): + self.log = log + self.stream_count += 1 + self.Id = self.stream_count + self.Pipe = pipe + self.Abin = abin + self.Vbin = vbin + self.Sink = sink + self.Loop = gobject.MainLoop() + self.ACaps = "" + self.VCaps = "" + self.Ready = False + self.Connection = None + self.Addr = None + # __init__() + + # StreamData() + + + def __init__(self, params): + server.Transcoder.__init__(self, params) + # set gstreamer basic options + self.connection = None + self.addr = None + self.ready = False + self.quit = False + + self.log.info("Params for Gstreamer: %s" % self.params) + # __init__() + + + def _create_start_elements(self, uri): + self.log.info("Opening Uri:" + uri) + src = gst.element_make_from_uri(gst.URI_SRC, uri, "src") + decode = gst.element_factory_make("decodebin", "decode") + mux = gst.element_factory_make("avimux", "mux") + sink = gst.element_factory_make("fdsink", "sink") + + return [src, decode, mux, sink] + # _create_start_elements() + + + def _setup_video_encode(self, vbin, width, height): + vqueue = gst.element_factory_make("queue", "vqueue") + colorspace = gst.element_factory_make("ffmpegcolorspace", "") + vrate = gst.element_factory_make("videorate", "vrate") + vencode = gst.element_factory_make("ffenc_mpeg4", "vencode") + vqueue_src = gst.element_factory_make("queue", "vqueue_src") + + vencode.set_property("bitrate", 200) + + if None in [vbin, vqueue, vrate, vencode, vqueue_src]: + self.log.info("Fail to create video encode elements.") + return False + + vbin.add(vqueue) + if int(width) > 0 and int(height) > 0: + self.log.info(("Formating output to %d / %d" %(int(width), int(height)))) + + vscale = gst.element_factory_make("ffvideoscale", "vscale") + + vbin.add(vscale); + if not vqueue.link(vscale): + self.log.info("Fail to link video elements") + return False + + vbin.add(colorspace) + + if not vscale.link(colorspace, \ + gst.caps_from_string("video/x-raw-yuv,width=(int)%d,height=(int)%d" %(\ + int(width), int(height)))): + self.log.info("Fail to link video elements") + return False + else: + vbin.add(colorspace) + vqueue.link(colorspace) + + vbin.add(vrate, vencode, vqueue_src) + if not colorspace.link(vrate): + self.log.info("Fail to colorspace with vrate") + return False + + if not vrate.link(vencode, \ + gst.caps_from_string("video/x-raw-yuv,framerate=(fraction)10/1")): + self.log.info("Fail to link vrate element") + return False + + if not vencode.link(vqueue_src): + self.log.info("Fail to link video encode with queue") + return False + + vbin.add_pad(gst.GhostPad("sink", vqueue.get_pad("sink"))) + vbin.add_pad(gst.GhostPad("src", vqueue_src.get_pad("src"))) + + return True + # _setup_video_encode() + + + def _setup_audio_encode(self, abin): + aqueue = gst.element_factory_make("queue", "aqueue") + aconvert = gst.element_factory_make("audioconvert", "aconvert") + arate = gst.element_factory_make("audioresample", "arate") + aencode = gst.element_factory_make("queue", "aencode") + aqueue_src = gst.element_factory_make("queue", "aqueue_src") + + if None in [abin, aqueue, arate, aencode, aqueue_src]: + self.log.info("Fail to create video encode elements.") + return False + + abin.add(aqueue, aconvert, arate, aencode, aqueue_src) + + if not gst.element_link_many(aqueue, aconvert, arate, aencode, aqueue_src): + self.log.info("Fail to link video elements") + return False + + abin.add_pad(gst.GhostPad("sink", aqueue.get_pad("sink"))) + abin.add_pad(gst.GhostPad("src", aqueue_src.get_pad("src"))) + + return True + # _setup_audio_encode() + + + def setup(self, uri, mux, vcodec, vbitrate, + fps, acodec, abitrate, width, height, options): + + ## Pipelines + pipe = gst.Pipeline() + src, decode, mux, sink = self._create_start_elements(uri) + + if None in [src, decode, mux, sink]: + self.log.info("Problems with while starting basic elements"); + return False + + #video encode + #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! + #ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 + #port=5000 + + vbin = gst.Bin() + if not self._setup_video_encode(vbin, width, height): + return False + + #audio encode + #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! + #udpsink name=upd_audio host=224.0.0.1 port=5002 + + abin = gst.Bin() + if not self._setup_audio_encode(abin): + return False + + #Finish Pipeline + pipe.add(src, decode, abin, vbin, mux, sink) + gst.element_link_many(src, decode) + gst.element_link_many(mux, sink) + + #Linking decode with mux + mux_audio = mux.get_pad("audio_0") + mux_video = mux.get_pad("video_0") + + audio_pad = abin.get_pad("src") + video_pad = vbin.get_pad("src") + + if audio_pad.link(mux_audio) != gst.PAD_LINK_OK: + self.log.info("Fail to link audio with mux") + return False + + if video_pad.link(mux_video) != gst.PAD_LINK_OK: + self.log.info("Fail to link audio with mux") + return False + + self.stream_data = self.StreamData(self.log, pipe, abin, vbin, sink) + bus = pipe.get_bus() + bus.add_signal_watch() + bus.connect("message", self.__on_bus_message, self.stream_data) + + decode.connect("new-decoded-pad", self.__on_decode_new_pad, self.stream_data) + decode.connect("unknown-type", self.__on_decode_unknown_type, self.stream_data) + + self.log.info("Setting PIPELINE state to PAUSED") + pipe.set_state(gst.STATE_PAUSED) + self.log.info("Running Loop") + self.stream_data.Loop.run() + # setup() + + def __on_bus_message(self, bus, message, stream_data): + + t = message.type + + if t == gst.MESSAGE_STATE_CHANGED: + oldstate = -1 + newstate = -1 + pending = -1 + + oldstate, newstate, pending = message.parse_state_changed() + + if oldstate == gst.STATE_READY and \ + newstate == gst.STATE_PAUSED and \ + pending == gst.STATE_VOID_PENDING and \ + stream_data.Ready == False: + + state_changed_status, current_state, pending_state = stream_data.Pipe.get_state() + if current_state == gst.STATE_PAUSED and \ + pending_state == gst.STATE_VOID_PENDING: + self.log.info("Pipe paused") + stream_data.Loop.quit() + stream_data.Ready = True + + elif t == gst.MESSAGE_EOS: + self.log.info("Pipe finished") + stream_data.Loop.quit() + self.quit = True + + elif t == gst.MESSAGE_ERROR: + err, debug = message.parse_error() + self.log.error("Error: %s %s" %(err, debug)) + stream_data.Loop.quit() + stream_data.Ready = False + + return True + # __on_bus_message() + + def __on_decode_unknown_type(self, decode, pad, caps, stream_data): + self.log.info("Unknown Type") + return None + # __on_decode_unknown_type + + def __on_decode_new_pad(self, decode, pad, arg1, stream_data): + + caps = pad.get_caps().to_string() + self.log.info("New pad " + caps) + if caps.rfind("audio") != -1: + apad = stream_data.Abin.get_pad("sink") + if pad.link(apad) != gst.PAD_LINK_OK: + self.log.info("Error on link audio pad") + return None + elif caps.rfind("video") != -1: + vpad = stream_data.Vbin.get_pad("sink") + if pad.link(vpad) != gst.PAD_LINK_OK: + self.log.info("Error on link video pad") + return None + else: + self.log.info("Invalid caps") + self.log.info("Linked") + # __on_decode_new_pad + + + def start(self, outfd): + params_first = self.params_first + + self.setup(params_first("uri", ""), params_first("mux", "avi"), + params_first("vcodec", "ffenc_h263p"), params_first("vbitrate", 256000), + params_first("fps", 25), params_first("acodec", "faac"), + params_first("abitrate", 192000), params_first("width", 320), + params_first("height", 240), params_first("options", "")) + + self.log.info("Play %s", outfd.fileno()) + self.stream_data.Sink.set_property("fd", outfd.fileno()) + self.log.info("Setting Pipeline state to PLAYING") + self.stream_data.Pipe.set_state(gst.STATE_PLAYING) + + # keep playing until EOS + self.log.info("QUIT: %s" % self.quit) + + i = 0 + loop = gobject.MainLoop() + loop.run() + + self.log.info("quit loop") + + return True + # start() + + def stop(self): + self.log.info("Stop stream_data: %s" % self.stream_data) + + if self.stream_data: + self.stream_data.Pipe.set_state(gst.STATE_NULL) + self.quit = True + + del self.stream_data + self.stream_data = None + # stop diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/plugins/transcoders/mencoder.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmyth-stream/server/0.2/plugins/transcoders/mencoder.py Wed Apr 18 15:59:10 2007 +0100 @@ -0,0 +1,123 @@ +import lib.utils as utils +import lib.server as server +import os +import signal +import subprocess + +__all__ = ("TranscoderMencoder",) + +class TranscoderMencoder(server.Transcoder): + mencoder_path = utils.which("mencoder") + def_mencoder_outfile = os.path.join(os.path.sep, "tmp", + "mencoder-fifo-%(uid)s-%(pid)s") + name = "mencoder" + priority = -1 + + def __init__(self, params): + server.Transcoder.__init__(self, params) + self.proc = None + self.args = None + + vars = {"uid": os.getuid(), "pid": os.getpid()} + mencoder_outfile_base = self.def_mencoder_outfile % vars + mencoder_outfile = mencoder_outfile_base + i = 0 + while os.path.exists(mencoder_outfile): + i += 1 + mencoder_outfile = mencoder_outfile_base + ".%s" % i + + self.mencoder_outfile = mencoder_outfile + os.mkfifo(self.mencoder_outfile) + + args = [self.mencoder_path, "-really-quiet", + "-o", self.mencoder_outfile] + + params_first = self.params_first + + type = params_first("type") + location = params_first("location") + args.append("%s://%s" % (type, location)) + + mux = params_first("mux", "avi") + args.extend(["-of", mux]) + + acodec = params_first("acodec", "mp3") + abitrate = params_first("abitrate", "128") + if acodec == "mp3lame": + args.extend(["-oac", "mp3lame", "-lameopts", + "cbr:br=%s" % abitrate]) + else: + args.extend(["-oac", "lavc", "-lavcopts", + "acodec=%s:abitrate=%s" % (acodec, abitrate)]) + + vcodec = params_first("vcodec", "mpeg4") + vbitrate = params_first("vbitrate", "400") + args.extend(["-ovc", "lavc", "-lavcopts", + "vcodec=%s:vbitrate=%s" % (vcodec, vbitrate)]) + + fps = params_first("fps", "24") + args.extend(["-ofps", fps]) + + width = params_first("width", "320") + height = params_first("height", "240") + args.extend(["-vf", "scale=%s:%s" % (width, height)]) + + self.args = args + # __init__() + + + def _unlink_fifo(self): + try: + os.unlink(self.mencoder_outfile) + except Exception, e: + pass + # _unlink_fifo() + + + def start(self, outfd): + cmd = " ".join(self.args) + self.log.info("Mencoder: %s" % cmd) + + try: + self.proc = subprocess.Popen(self.args, close_fds=True) + except Exception, e: + self.log.error("Error executing mencoder: %s" % cmd) + return False + + try: + fifo_read = open(self.mencoder_outfile) + except Exception, e: + self.log.error("Error opening fifo: %s" % cmd) + return False + + try: + while self.proc and self.proc.poll() == None: + d = fifo_read.read(1024) + outfd.write(d) + except Exception, e: + self.log.error("Problems handling data: %s" % e) + self._unlink_fifo() + return False + + self._unlink_fifo() + return True + # start() + + + def stop(self): + if self.proc: + try: + os.kill(self.proc.pid, signal.SIGTERM) + except OSError, e: + pass + + try: + self.proc.wait() + except Exception, e: + pass + + self.proc = None + + self._unlink_fifo() + # stop() +# TranscoderMencoder diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/lib.py --- a/gmyth-stream/server/lib.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -import time -import logging -import os -import stat - -ext = ['mpg', 'avi', 'mp4', 'nuv', 'mpeg', 'mov'] - -def now(): - return time.strftime("%Y-%m-%d %H:%M:%S"); - -def log(msg): - logging.log(logging.DEBUG, msg) - new_msg = "[%s] %s" % (now(), msg) - return new_msg - - -bin_path_list = os.environ["PATH"].split(os.pathsep) -def which(prg): - for d in bin_path_list: - path = os.path.join(d, prg) - if os.path.exists(path): - st = os.stat(path) - if st[stat.ST_MODE] & 0111: - return path - return "" - -def list_media_files(directory, file_list): - for root, dirs, files in os.walk(directory): - for name in files: - if os.path.splitext(name)[1].strip(".") in ext: - media = os.path.join(root,name) - if media not in file_list: - file_list.append(os.path.join(root,name)) - - return True - diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/main.py --- a/gmyth-stream/server/main.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,185 +0,0 @@ -#!/usr/bin/python - -import os -import lib -import sys -import imp -import ConfigParser -import logging as log - -log.basicConfig(level=log.DEBUG, - format="%(asctime)s %(levelname)-8s %(message)s", - datefmt='%Y-%m-%d %H:%M:%S') - -config = ConfigParser.ConfigParser() -config.read("stream.conf") - -def load_module(pathlist, name): - fp, path, desc = imp.find_module(name, pathlist) - try: - module = imp.load_module(name, fp, path, desc) - return module - finally: - if fp: - fp.close() - - -media_plugin = config.get("Media", "engine") -media_plugin_module = load_module(["./plugins/media"], media_plugin) -media = media_plugin_module.Media(config) - -comm_plugin = config.get("Comm", "engine") -comm_plugin_module = load_module(["./plugins/comm"], comm_plugin) -server = comm_plugin_module.Server(config) - -log.info("Starting GMyth-Stream server") - - -''' -PROTOCOL DESCRIPTION -===================== - -COMMAND OPTIONS - --> SETUP DESCRIPTION -|-> used to setup transcoding and streaming parameters -|-> must be used before any "PLAY" command -|-> e.g: - -file://file_name mux vcodec vbitrate fps acodec abitrate width height options -dvd://title_number mux vcodec vbitrate fps acodec abitrate width height options - --> PLAY DESCRIPTION - |-> used to start transcoding and streaming of file - |-> must be used just if SETUP was used before - |-> after it, _must_ send STOP - --> STOP DESCRIPTION - |-> used to stop transcoding and streaming process - |-> must be used just if PLAY was used before - |-> must be used after PLAY - --> QUIT DESCRIPTION - |-> used to quit the main loop (quit program) - -''' -nextport = 0 -setup = (False, "STOPPED") - -def do_setup(server, filename, mux, vcodec, vbitrate, fps, acodec, abitrate, - width, height, *options): - global nextport - global setup - - if setup[1] != "PLAYING": - nextport += 1 - ret = media.setup(filename, mux, vcodec, vbitrate, fps, acodec, - abitrate, width, height, nextport, options) - if ret[0]: - server.sendOk() - else: - server.sendNotOk(ret[1]) - - setup = (True, setup[1]) - - else: server.sendNotOk("You must STOP before SETingUP again") - - return True - -def do_play(server): - global setup - - if setup[0] and setup[1] == "STOPPED": - setup = (setup[0], "PLAYING") - ret = media.play() - if ret[0]: - server.sendOk("%d" % nextport) - else: - server.sendNotOk(ret[1]) - - else: - if setup[1] == "STOPPED": - server.sendNotOk("You must SETUP before PLAYing") - else: - server.sendNotOk("You must STOP before PLAYing again") - - return True - -def do_stop(server): - global setup - - media.stop() - setup = (False, "STOPPED") - server.sendOk() - return True - -def do_list(server, *directory): - file_list = [] - for j in directory: - lib.list_media_files(j, file_list) - - server.sendOk(file_list) - return True - -def do_quit(server): - server.finish = 1 - media.stop() - server.sendOk() - return True - - -mapping = { - "SETUP": do_setup, - "PLAY": do_play, - "STOP": do_stop, - "LIST": do_list, - "QUIT": do_quit, - } - - -def dispatch(server, msg): - pieces = msg.split() - if len(pieces) < 1: - log.error("Invalid client command format: %r" % msg) - server.sendNotOk("Invalid Format") - return False - - cmd = pieces[0] - f = mapping.get(cmd, None) - if not f: - log.error("Unknow command: %r" % msg) - server.sendNotOk("Unknow Command") - return False - - try: - return f(server, *pieces[1:]) - except Exception, e: - log.error("Could not execute %r: %s" % (msg, e)) - server.sendNotOk(str(e)) - return False - - - -while not server.finish: - conn, client, port = server.getRequest() - if nextport == 0: - nextport = port - - while not server.finish: - msg = server.getMsg() - if not msg: - break - - log.info("Client %s sent command: %r" % (client, msg)) - dispatch(server, msg) - - log.info("Closing connection with %s" % (client,)) - server.disconnect_client(conn) - try: - os.wait() - except Exception, e: - log.error(e) - -server.stop() -log.info("Server stopped. Closing...") - diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/comm/tcp.py --- a/gmyth-stream/server/plugins/comm/tcp.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,79 +0,0 @@ -import lib -import time -import socket -import logging as log - -class Server(object): - - def __init__(self, config): - self.host = '0.0.0.0' - self.port = int(config.get("Comm", "port")) - self.finish = 0 - - addr = (self.host, self.port) - log.debug("Setup TCP server at %s:%s" % addr) - self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.tcp.bind(addr) - self.tcp.listen(1) - log.info("TCP server listening at %s:%s (sock=%d)" % - (self.host, self.port, self.tcp.fileno())) - - def getMsg(self): - bytes = [] - try: - while 1: - c = self.con.recv(1) - bytes.append(c) - if not c or c == "\n": - break - except Exception, e: - log.error("Error reading message from client: %s" % e) - return None - - if not bytes or bytes[-1] != "\n": - msg = "".join(bytes) - log.error("Invalid message from client: %r" % msg) - return None - - # remove \n and \r - bytes.pop() - if bytes[-1] == "\r": - bytes.pop() - - msg = "".join(bytes) - log.debug("RECV: %r" % msg) - return msg - - def sendMsg(self, msg): - log.debug("SEND: %r" % msg) - self.con.send(msg + "\n") - - def sendOk(self, payload=None): - self.sendMsg("OK %d" % bool(payload is not None)) - if payload is not None: - if not isinstance(payload, (tuple, list)): - payload = (payload,) - for e in payload: - self.sendMsg("+%s" % e) - self.sendMsg(".") - - def sendNotOk(self, reason=""): - self.sendMsg("NOTOK %r" % reason) - - def getRequest(self): - log.debug("Wait for client request at %s:%s (sock=%d)" % - (self.host, self.port, self.tcp.fileno())) - self.con, self.client = self.tcp.accept() - log.info("Incoming request from %s (con=%s)" % - (self.client, self.con.fileno())) - return (self.con, self.client, self.port) - - def disconnect_client(self, connection): - log.info("Closed request from %s (con=%s)" % - (self.client, self.con.fileno())) - connection.close() - - def stop(self): - log.debug("Stop") - self.tcp.close() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/comm/xmlrpc.py --- a/gmyth-stream/server/plugins/comm/xmlrpc.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,102 +0,0 @@ -import lib -import SimpleXMLRPCServer - - -class Handler: - - def __init__(self, recv_pool, send_pool): - self.recv_pool = recv_pool - self.send_pool = send_pool - self.getMsg = self.sendMsg - - def _listMethods(self): - return ['setup', 'play', 'stop', 'close', 'getMsg'] - - def _methodHelp(self, method): - - if method == 'setup': - return "Setup the Media: setup( filename, mux, vcodec, vbitrate,"\ - " fps, acodec, abitrate, width, height, port, options" - elif method == 'play': - return "Play the Media: play()" - elif method == 'stop': - return "Stop the Media: stop()" - elif method == 'close': - return "Close the connection: close()" - elif method == 'getMsg': - return "Return the first message in the pool: getMsg()" - else: - # By convention, return empty - # string if no help is available - return "" - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port, options): - - msg = "%s %s %s %s %s %s %s" % (filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port) - - if len(options) > 0: - for opt in options: - msg += " %s" % opt - - self.recv_pool.append("SETUP") - self.recv_pool.append(msg) - return self.sendMsg() - - def play(self): - self.recv_pool.append("PLAY") - return self.sendMsg() - - def stop(self): - self.recv_pool.append("STOP") - return self.sendMsg() - - def close(self): - self.recv_pool.append("CLOSE") - return self.sendMsg() - - def sendMsg(self): - if self.send_pool != []: - return self.send_pool.pop(0) - else: - return "" - - -class Server: - - def __init__(self, config): - self.host = 'localhost' - self.port = int(config.get("Comm", "port")) - self.finish = 0 - self.recv_pool = [] - self.send_pool = [] - - self.handler = Handler(self.recv_pool, self.send_pool) - - self.xmlrpc = SimpleXMLRPCServer.SimpleXMLRPCServer((self.host, self.port)) - self.xmlrpc.register_instance(self.handler) - - - def getMsg(self, size): - if self.recv_pool != []: - return self.recv_pool.pop(0) - else: - return "" - - def sendMsg(self, msg): - self.send_pool.append(msg) - - def Ack(self, command): - msg = "[%s] Command %s received" % (lib.now(), command) - self.sendMsg(msg + "\n") - - def getRequest(self): - self.xmlrpc.handle_request() - return (0, "RPC Client") - - def disconnect_client(self, connection): - connection = 0 - - def stop(self): - self.xmlrpc.server_close() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/ffmpeg.py --- a/gmyth-stream/server/plugins/media/ffmpeg.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,91 +0,0 @@ -import os -import sys -import lib -import time -import socket -import ConfigParser - -class Media: - - def __init__(self, config): - - self.config = config - self.socket = None - self.child_pid = None - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port): - - self.filename = filename - self.mux = mux - self.vcodec = vcodec - self.vbitrate = int(vbitrate) - self.fps = int(fps) - self.acodec = acodec - self.abitrate = int(abitrate) - self.width = int(width) - self.height = int(height) - - self.port = int(port) - - # good one: /tmp/mpg/cpm.mpg mpeg mpeg1video 400 25 mp2 192 320 240 5000 - self.path = self.config.get("FFmpeg", "path") - self.path += " -i %s -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % ( - self.filename, self.mux, self.vcodec, self.vbitrate,\ - self.fps, self.acodec, self.abitrate, self.width, self.height) - - if (self.socket != None): - del(self.socket) - - self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind( ('', self.port) ) - self.socket.settimeout(10) - self.socket.listen(1) - - def play(self): - - lib.log("Starting FFmpeg: %s" % self.path) - - # exec FFmpeg and get stdout - child_stdin, child_stdout = os.popen2(self.path) - child_stdin.close() - - self.child_pid = os.fork() - if (self.child_pid == 0): - #child - - conn,addr= self.socket.accept() - lib.log("Sending Data to client: %s" % addr[0]) - data = child_stdout.read(1024) - conn.settimeout(5) - retry = 0 - - while( data != "" and retry < 5): - try: - conn.send(data) - except socket.error: - lib.log("Socket error (maybe timeout ?)") - retry = retry + 1 - - data = child_stdout.read(1024) - - if (retry < 5): - lib.log("Finished sending Data to client: %s" % addr[0]) - else: - lib.log("Client timed out") - - child_stdout.close() - #conn.close() - #sys.exit() - - - def stop(self): - - if (self.socket != None): - lib.log("Closing socket") - self.socket.close() - - lib.log("Trying to stop FFmpeg process") - if (self.child_pid != None): - os.kill(self.child_pid, 9) diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/gstreamer-rtp.py --- a/gmyth-stream/server/plugins/media/gstreamer-rtp.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,218 +0,0 @@ -import pygst -pygst.require("0.10") -import gst -import gobject - -class Media: - class StreamData: - stream_count = 0 - - def __init__ (self, pipe, abin, vbin): - - self.stream_count += 1 - self.Id = self.stream_count - self.Pipe = pipe - self.Abin = abin - self.Vbin = vbin - self.Loop = gobject.MainLoop() - self.ACaps = "" - self.VCaps = "" - self.Ready = False - - - def __init__(self, config): - # set gstreamer basic options - self.config = config - self.pipe = None - self.streams = [] - - - def setup(self, filename, mux, vcodec, vbitrate, - fps, acodec, abitrate, width, height, port, options): - - ## Pipelines - self.pipe = gst.Pipeline () - uri = "file://" + filename - print "Opening Uri:" + uri - src = gst.element_make_from_uri (gst.URI_SRC, uri, "src") - if (src is None): - return None - - decode = gst.element_factory_make ("decodebin", "decode") - if (decode is None): - return None - - - #video encode - #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000 - vbin = gst.Bin () - vqueue = gst.element_factory_make ("queue", "vqueue") - vscale = gst.element_factory_make ("videoscale", "vscale") - vrate = gst.element_factory_make ("videorate", "vrate") - vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode") - vpay = gst.element_factory_make ("rtpmp4vpay", "vpay") - vsink = gst.element_factory_make ("udpsink", "vsink") - - if (None in [vbin, vqueue, vscale, vrate, vencode, vpay, vsink]): - print "Fail to create video encode elements." - return None - - vscale_pad = vscale.get_pad("sink") - if (vscale_pad is None): - print "Fail to get vscale sink pad." - return None - - vscale_caps = gst.caps_from_string ("video/x-raw-yuv, width=%s, height=%s" % (width, height)) - if (vscale_caps is None): - print "Fail to create video caps" - return None - - if (not vscale_pad.set_caps (vscale_caps)): - print "Fail to set video output caps" - return None - - vencode.set_property ("bitrate", 256000) - vencode.set_property ("me-method", 2) - - vsink.set_property ("host", "224.0.0.1") - vsink.set_property ("port", 5000) - - vbin.add (vqueue, vscale, vrate, vencode, vpay, vsink) - if (not gst.element_link_many (vqueue, vscale, vrate, vencode, vpay, vsink)): - print "Fail to link video elements" - return None - - vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink"))) - - #audio encode - #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002 - abin = gst.Bin () - aqueue = gst.element_factory_make ("queue", "vqueue") - aconvert = gst.element_factory_make ("audioconvert", "aconvert") - aencode = gst.element_factory_make ("faac", "aencode") - apay = gst.element_factory_make ("rtpmp4gpay", "apay") - asink = gst.element_factory_make ("udpsink", "asink") - - if (None in [abin, aqueue, aconvert, aencode, apay, asink]): - print "Fail to create video encode elements." - return None - - asink.set_property ("host", "224.0.0.1") - asink.set_property ("port", 5002) - - abin.add (aqueue, aconvert, aencode, apay, asink) - if (not gst.element_link_many (aqueue, aconvert, aencode, apay, asink)): - print "Fail to link video elements" - return None - - abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink"))) - - self.pipe.add (src, decode, abin, vbin) - gst.element_link_many (src, decode) - - stream_data = self.StreamData (self.pipe, abin, vbin) - - bus = self.pipe.get_bus() - bus.add_signal_watch() - bus.connect("message", self.__on_bus_message, stream_data) - - decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data) - decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data) - - - self.pipe.set_state (gst.STATE_PAUSED) - print "Running Pipe" - stream_data.Loop.run () - print "End run" - - a_caps = stream_data.ACaps - v_caps = stream_data.VCaps - stream_id = stream_data.Id - - self.streams.append (stream_data) - - def play(self): - - print "Trying to play pipeline: %s" % self.pipe - try: - if (self.pipe): - self.pipe.set_state(gst.STATE_PLAYING) - except gobject.GError, e: - print "Error: " + str(e) - - - def stop(self): - - print "Trying to stop pipeline: %s" % self.pipe - try: - if (self.pipeline): - self.pipeline.set_state(gst.STATE_NULL) - except gobject.GError, e: - print "Error: " + str(e) - - def __on_bus_message (self, bus, message, stream_data): - - t = message.type - if (t == gst.MESSAGE_STATE_CHANGED): - oldstate = -1 - newstate = -1 - pending = -1 - oldstate, newstate, pending = message.parse_state_changed () - if ((oldstate == gst.STATE_READY) and \ - (newstate == gst.STATE_PAUSED) and \ - (pending == gst.STATE_VOID_PENDING) and \ - (stream_data.Ready == False)): - state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () - if ((current_state == gst.STATE_PAUSED) and \ - (pending_state == gst.STATE_VOID_PENDING)): - print "Pipe paused" - self.__fill_sink_pads (stream_data) - stream_data.Loop.quit () - stream_data.Ready = True - elif (t == gst.MESSAGE_ERROR): - err, debug = message.parse_error() - print "Error: %s" % err, debug - stream_data.Loop.quit () - stream_data.Ready = False - - return True - - - def __fill_sink_pads (self, stream_data): - - asink = stream_data.Abin.get_by_name ("asink") - vsink = stream_data.Vbin.get_by_name ("vsink") - - asink_pad = asink.get_pad ("sink") - stream_data.ACaps = asink_pad.get_negotiated_caps().to_string() - print "ACAPS " + stream_data.ACaps - - vsink_pad = vsink.get_pad ("sink") - stream_data.VCaps = vsink_pad.get_negotiated_caps().to_string() - print "ACAPS " + stream_data.VCaps - - - - def __on_decode_unknown_type (self, decode, pad, caps, stream_data): - - print "Unknown Type" - return None - - def __on_decode_new_pad (self, decode, pad, arg1, stream_data): - - caps = pad.get_caps().to_string() - print "New pad " + caps - if (caps.rfind ("audio") != -1): - apad = stream_data.Abin.get_pad ("sink") - if (pad.link (apad) != gst.PAD_LINK_OK): - print "Error on link audio pad" - return None - elif (caps.rfind ("video") != -1): - vpad = stream_data.Vbin.get_pad ("sink") - if (pad.link (vpad) != gst.PAD_LINK_OK): - print "Error on link video pad" - return None - else: - print "Invalid caps" - - diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/gstreamer.py --- a/gmyth-stream/server/plugins/media/gstreamer.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,290 +0,0 @@ -#vim:ts=4:sw=4:et -import pygst -pygst.require("0.10") -import gst -import gobject -import socket -import time -from threading import Thread - -class Media: - class StreamListener(Thread): - def __init__ (self, stream_data): - Thread.__init__(self) - self.stream = stream_data - print "Thread Created" - - def run (self): - #Create socket - print "Waiting connection" - self.stream.Socket.listen(1) - self.stream.Connection, self.stream.Addr = self.stream.Socket.accept () - print "Connection requested" - self.stream.Sink.set_property ("fd", self.stream.Connection.fileno()) - self.stream.Pipe.set_state(gst.STATE_PLAYING) - print "PLAYING" - - - class StreamData: - stream_count = 0 - - def __init__ (self, pipe, abin, vbin, sink): - self.stream_count += 1 - self.Id = self.stream_count - self.Pipe = pipe - self.Abin = abin - self.Vbin = vbin - self.Sink = sink - self.Loop = gobject.MainLoop() - self.ACaps = "" - self.VCaps = "" - self.Ready = False - self.Socket = None - self.Connection = None - self.Addr = None - - def __init__(self, config): - # set gstreamer basic options - self.config = config - self.streams = [] - self.socket = None - self.connection = None - self.addr = None - self.ready = False - self.current = None - - - def setup(self, uri, mux, vcodec, vbitrate, - fps, acodec, abitrate, width, height, port, options): - - ## Pipelines - pipe = gst.Pipeline () - print "Opening Uri:" + uri - src = gst.element_make_from_uri (gst.URI_SRC, uri, "src") - #src = gst.element_factory_make ("gnomevfssrc", "src") - src.set_property ("location", uri) - if (src is None): - print "Fail to create src element" - return None - - print ("Create source") - decode = gst.element_factory_make ("decodebin", "decode") - if (decode is None): - print "Fail to create decodebin" - return None - - print ("Create source") - mux = gst.element_factory_make ("avimux", "mux") - if (mux is None): - print "Fail to create mux" - return None - - sink = gst.element_factory_make ("fdsink", "sink") - if (sink is None): - print "Fail to create fdsink" - return None - - print ("Create source") - - #video encode - #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000 - vbin = gst.Bin () - vqueue = gst.element_factory_make ("queue", "vqueue") - colorspace = gst.element_factory_make ("ffmpegcolorspace", "") - vrate = gst.element_factory_make ("videorate", "vrate") - vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode") - #vencode = gst.element_factory_make ("ffenc_msmpeg4v1", "vencode") - vqueue_src = gst.element_factory_make ("queue", "vqueue_src") - - #if (int(vbitrate) > 0): - vencode.set_property ("bitrate", 200) - #vencode.set_property ("quant-type", 1) - vencode.set_property ("pass", 2) - vencode.set_property ("quantizer", 5) - #vencode.set_property ("me-method", 1) - - - if (None in [vbin, vqueue, vrate, vencode, vqueue_src]): - print "Fail to create video encode elements." - return None - - vbin.add (vqueue) - if ((int(width) > 0) and (int(height) > 0)): - print ("formating output to %d / %d" % (int(width), int(height))) - - vscale = gst.element_factory_make ("ffvideoscale", "vscale") - - vbin.add (vscale); - if (not vqueue.link (vscale)): - print "Fail to link video elements" - return None - - vbin.add (colorspace) - - if (not vscale.link (colorspace, \ - gst.caps_from_string ("video/x-raw-yuv,width=(int)%d,height=(int)%d" % (int(width), int(height))))): - print "Fail to link video elements" - return None - else: - vbin.add (colorspace) - vqueue.link (colorspace) - - vbin.add (vrate, vencode, vqueue_src) - if (not colorspace.link (vrate)): - print "Fail to colorspace with vrate" - return None - - - if (not vrate.link (vencode, \ - gst.caps_from_string ("video/x-raw-yuv,framerate=(fraction)10/1"))): - print "Fail to link vrate element" - return None - - if (not vencode.link (vqueue_src)): - print "Fail to link video encode with queue" - return None - - vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink"))) - vbin.add_pad (gst.GhostPad ("src", vqueue_src.get_pad ("src"))) - - #audio encode - #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002 - abin = gst.Bin () - aqueue = gst.element_factory_make ("queue", "aqueue") - aconvert = gst.element_factory_make ("audioconvert", "aconvert") - arate = gst.element_factory_make ("audioresample", "arate") - #aencode = gst.element_factory_make ("ffenc_ac3", "aencode") - aencode = gst.element_factory_make ("queue", "aencode") - #aencode = gst.element_factory_make ("lame", "aencode") - #aencode = gst.element_factory_make ("ffenc_mp2", "aencode") - aqueue_src = gst.element_factory_make ("queue", "aqueue_src") - - if (None in [abin, aqueue, arate, aencode, aqueue_src]): - print "Fail to create video encode elements." - return None - - abin.add (aqueue, aconvert, arate, aencode, aqueue_src) - if (not gst.element_link_many (aqueue, aconvert, arate, aencode, aqueue_src)): - print "Fail to link video elements" - return None - - abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink"))) - abin.add_pad (gst.GhostPad ("src", aqueue_src.get_pad ("src"))) - - #Finish Pipeline - pipe.add (src, decode, abin, vbin, mux, sink) - gst.element_link_many (src, decode) - gst.element_link_many (mux, sink) - - #Linking decode with mux - mux_audio = mux.get_pad ("audio_0") - mux_video = mux.get_pad ("video_0") - - audio_pad = abin.get_pad ("src") - video_pad = vbin.get_pad ("src") - - if (audio_pad.link (mux_audio) != gst.PAD_LINK_OK): - print "Fail to link audio with mux" - return None - - if (video_pad.link (mux_video) != gst.PAD_LINK_OK): - print "Fail to link audio with mux" - return None - - stream_data = self.StreamData (pipe, abin, vbin, sink) - bus = pipe.get_bus() - bus.add_signal_watch() - bus.connect ("message", self.__on_bus_message, stream_data) - - decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data) - decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data) - - print ("Create source") - pipe.set_state (gst.STATE_PAUSED) - print "Running Pipe" - stream_data.Loop.run () - print "End run" - - - #Create socket - stream_data.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - print "Bind on port %d" % port - stream_data.Socket.bind(('', int (port))) - self.streams.append (stream_data) - return (True, "") - - def play(self): - print "Play" - stream = self.streams[0] - self.current = self.StreamListener(stream) - self.current.start () - time.sleep (1) - return (True, "") - - def stop(self): - self.current.join () - self.current = None - stream = self.streams[0] - stream.Pipe.set_state(gst.STATE_NULL) - del (stream.Pipe) - stream.Pipe = None - stream.Abin = None - stream.Vbin = None - stream.Sink = None - if (stream.Connection != None): - stream.Connection.close () - - self.streams = [] - time.sleep (5) - return (True, "") - - - def __on_bus_message (self, bus, message, stream_data): - - t = message.type - if (t == gst.MESSAGE_STATE_CHANGED): - oldstate = -1 - newstate = -1 - pending = -1 - oldstate, newstate, pending = message.parse_state_changed () - if ((oldstate == gst.STATE_READY) and \ - (newstate == gst.STATE_PAUSED) and \ - (pending == gst.STATE_VOID_PENDING) and \ - (stream_data.Ready == False)): - state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () - if ((current_state == gst.STATE_PAUSED) and \ - (pending_state == gst.STATE_VOID_PENDING)): - print "Pipe paused" - stream_data.Loop.quit () - stream_data.Ready = True - elif (t == gst.MESSAGE_ERROR): - err, debug = message.parse_error() - print "Error: %s" % err, debug - stream_data.Loop.quit () - stream_data.Ready = False - - return True - - def __on_decode_unknown_type (self, decode, pad, caps, stream_data): - - print "Unknown Type" - return None - - def __on_decode_new_pad (self, decode, pad, arg1, stream_data): - - caps = pad.get_caps().to_string() - print "New pad " + caps - if (caps.rfind ("audio") != -1): - apad = stream_data.Abin.get_pad ("sink") - if (pad.link (apad) != gst.PAD_LINK_OK): - print "Error on link audio pad" - return None - elif (caps.rfind ("video") != -1): - vpad = stream_data.Vbin.get_pad ("sink") - if (pad.link (vpad) != gst.PAD_LINK_OK): - print "Error on link video pad" - return None - else: - print "Invalid caps" - print "Linked" - diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/mencoder.py --- a/gmyth-stream/server/plugins/media/mencoder.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,411 +0,0 @@ -from __future__ import division - -import os -import sys -import lib -import time -import shlex -import signal -import socket -import ConfigParser -import logging as log - -from select import * -from subprocess import * - -class Media(object): - - def __init__(self, config): - - self.config = config - self.do_cleanup() - - # __init__() - - - def do_cleanup(self): - self.path = "" - self.args = [] - self.language = None - self.subtitle = None - self.mpegopts = None - self.socket = None - self.child_pid = None - self.mplayer = None - self.mencoder_pid = None - self.mplayer_pid = None - self.audio_opts = None - self.video_opts = None - self.gst_pipe = None - self.gst_pid = None - self.transcode_local = None - - # do_cleanup() - - - def setup_opts(self, options): - - for opt in options: - - if opt == "local": - self.mplayer = lib.which("mplayer") - - elif opt.find("language=") >= 0: - try: - lan = opt.split("=")[1] - if len(lan) < 2: - self.language = lan - except Exception, e: - log.error("Bad language option: %s" % opt) - - elif opt.find("subtitle=") >= 0: - try: - sub = opt.split("=")[1] - if len(sub) < 2: - self.language = sub - except Exception, e: - log.error("Bad subtitle option: %s" % opt) - - elif opt.find("format=") >= 0: - try: - self.mpegopts = opt.split("=")[1] - except Exception, e: - log.error("Bad format option: %s" % opt) - - elif opt.find("outfile=") >= 0: - try: - self.transcode_local = opt.split("=")[1] - except Exception, e: - log.error("Bad outfile option: %s" % opt) - - # setup_opts() - - - def run_mplayer(self): - msg = self.filename - - if self.kind == "dvd": - msg = "dvd://" + msg - - self.mplayer_pid = Popen([self.mplayer, self.filename, "1> %s" % os.devnull,\ - "2> %s" % os.devnull], stdout=PIPE, close_fds=True) - - # run_mplayer() - - - def setup_mencoder(self): - self.path = self.config.get("Mencoder", "path") - mp = Popen([self.path], stdout=PIPE, close_fds=True) - - version = mp.stdout.read().split("MEncoder ")[1].split(" (C)")[0].split("-")[-1] - - if version > "4.1.1": self.mencoder_old = False - else: self.mencoder_old = True - - os.kill(mp.pid, signal.SIGKILL) - log.info("Mencoder version: %s" % version) - - if self.mencoder_old: - try: - self.fifo = self.config.get("Mencoder", "fifo_path") - os.mkfifo(self.fifo) - except Exception, e: - log.info("Fifo: %s" % e) - else: - self.fifo = "-" - - # setup_mencoder() - - - def setup_audio(self): - - if self.acodec == "mp3lame": - return "-oac mp3lame -lameopts cbr:br=%s vol=5" % self.abitrate - else: - return "-oac lavc -lavcopts acodec=%s:abitrate=%s" % (\ - self.acodec, self.abitrate) - - # setup_audio() - - - def setup_video(self): - - video = "" - - video += " -of %s" % self.mux - video += " -ofps %s" % self.fps - - if self.vcodec == "nuv" or self.vcodec == "xvid"\ - or self.vcodec == "qtvideo" or self.vcodec == "copy": - video += " -ovc %s" % self.vcodec - else: - video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % ( - self.vcodec, self.vbitrate) - - if self.mux == "mpeg" and self.mpegopts is not None: - video += " -mpegopts format=%s" % self.mpegopts - - video += " -vf scale=%s:%s" % (self.width, self.height) - - return video - - # setup_video() - - - def arg_append(self, args, options): - l = shlex.split(options) - for i in l: - args.append(i) - - # arg_append() - - - def setup_args(self, args): - - args.append(self.path) - - #args.append(self.filename) - args.append("-") - - if self.language != None: - self.arg_append(args, "-alang %s" % self.language) - - if self.subtitle != None: - self.arg_append(args, "-slang %s" % self.subtitle) - self.arg_append(args, "-subfps %s" % self.fps) - - self.arg_append(args, "-idx") - self.arg_append(args, self.audio_opts) - self.arg_append(args, self.video_opts) - - self.arg_append(args, "-really-quiet") - self.arg_append(args, "-o %s" % self.fifo) - self.arg_append(args, "2> %s" % os.devnull) - - # setup_args() - - - def setup_filename(self, filename): - try: - self.kind, self.filename = filename.split("://") - except: - return (False, "Wrong filename protocol") - - if self.kind == "file": - if not os.path.exists(self.filename): - msg = "File requested does not exist. SETUP failed." - log.error(msg) - return (False, msg) - - elif self.kind == "dvd": - self.filename = "dvd://" + filename - - elif self.kind == "myth": - self.filename = filename - self.gst_pipe = os.pipe() - print self.gst_pipe[0] - print self.gst_pipe[1] - - return (True, "") - - # setup_filename() - - - def setup_socket(self): - if self.socket != None: - self.socket = None - - self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - self.socket.bind( ('', self.port) ) - self.socket.listen(1) - except Exception, e: - log.error("Could not create socket: %s" % e) - return (False, e) - - return (True, "") - - # setup_socket() - - - ''' - MENCODER SETUP DESCRIPTION - =========================== - - -> mux, vcodecs and acodecs - |-> mencoder (-of | -ovc | -oac) help - - -> if used mpeg as mux: - |-> to setup format: format=%s as an option at the end - - ''' - - - # good one: /tmp/dvb.mpg avi mpeg4 400 25 mp3lame 192 320 240 - # file:///tmp/dvb.mpg mpeg mpeg1video 400 25 mp2 192 320 240 format=mpeg1 - # dvd://4 mpeg mpeg1video 400 25 mp3lame 192 400 240 language=en local - # file:///tmp/mpg/bad_day.mpg avi mpeg4 400 25 mp3 192 320 240 - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port, options): - - if self.args != []: - self.do_cleanup() - - self.mux = mux - self.vcodec = vcodec - self.vbitrate = vbitrate - self.fps = fps - self.acodec = acodec - self.abitrate = abitrate - self.width = width - self.height = height - self.port = int(port) - - self.setup_mencoder() - - ret_val = self.setup_filename(filename) - - if not ret_val[0]: - return ret_val - - self.setup_opts(options) - self.audio_opts = self.setup_audio() - self.video_opts = self.setup_video() - self.setup_args(self.args) - - ret_val = self.setup_socket() - return ret_val - - # setup() - - def play_loop(self, conn): - data = self.pout.read(4096) - - conn.settimeout(5) - retry = 0 - - if not self.transcode_local: - while data != "" and retry < 5: - try: - conn.send(data) - r, w, x = select([conn], [], [], 0) - if conn in r: - back = conn.recv(1024) - if back == "OK" and self.mplayer and not self.mplayer_pid: - self.run_mplayer() - - except socket.error, e: - log.error("Socket error: %s" % e) - retry += 1 - - data = self.pout.read(4096) - - else: - local = open(self.transcode_local, "w") - total = os.path.getsize(self.filename) - partial = 4096 - - while data != "": - try: - local.write(data) - except Exception, e: - log.error("Write error: %s" % e) - - data = self.pout.read(4096) - partial += len(data) - conn.send("%.2f\n" % (partial * 100 / total) ) - - local.close() - conn.send("DONE\n") - - return retry - - # play_loop() - - - def play(self): - - if self.gst_pipe: - try: - gst = [ lib.which("gst-launch-0.10"), "--gst-debug-level=0" ] - self.arg_append(gst, "mythtvsrc location=%s" % self.filename) - self.arg_append(gst, "! fdsink fd=2") - self.gst_pid = Popen(gst, close_fds=True) - log.info("Running Gstreamer: %s" % gst); - except Exception, e: - msg = "Could not init Gstreamer: %s" % e - log.error(msg) - return (False, msg) - - - log.info("Starting Mencoder: %s" % self.args ) - try: - if not self.gst_pipe: - self.stdin = open(self.filename) - else: - self.stdin = self.gst_pid.stdout - - self.mencoder_pid = Popen(self.args, stdin=self.stdin, stdout=PIPE, close_fds=True) - except Exception, e: - msg = "Could not init Mencoder: %s" % e - log.error(msg) - return (False, msg) - - if self.mencoder_old: self.pout = open(self.fifo) - else: self.pout = self.mencoder_pid.stdout - - self.child_pid = os.fork() - - if self.child_pid == 0: - conn, addr = self.socket.accept() - - log.info("Sending Data to client: %s" % addr[0]) - retry = self.play_loop(conn) - - if retry < 5: - log.info("Finished sending Data to client: %s" % addr[0]) - else: - log.error("Client timed out, retried more than %s times" % retry) - - os.kill(self.mencoder_pid.pid, signal.SIGKILL) - sys.exit(0) - - return (True, "") - - # play() - - - def stop(self): - try: - - if self.mencoder_pid: - os.kill(self.mencoder_pid.pid, signal.SIGTERM) - self.mencoder_pid = None - - if self.mplayer_pid: - os.kill(self.mplayer_pid.pid, signal.SIGTERM) - self.mplayer_pid = None - - if self.socket: - self.socket.close() - self.socket = None - - if self.child_pid: - os.kill(self.child_pid, signal.SIGTERM) - self.child_pid = None - - if self.gst_pid: - os.kill(self.gst_pid.pid, signal.SIGTERM) - self.gst_pid = None - - self.do_cleanup() - - os.wait() - - except Exception, e: - log.error("Stop error: %s" % e) - - # stop() diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/vlc.py --- a/gmyth-stream/server/plugins/media/vlc.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,80 +0,0 @@ -import os -import sys -import time -import socket -import ConfigParser - -class Media: - - def __init__(self, config): - - self.config = config - self.pipe = "" - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - self.path = config.get("Vlc", "path") - self.host = config.get("Vlc", "host") - self.port = int(config.get("Vlc", "port")) - self.pwd = config.get("Vlc", "pwd") - - # exec VLC - pid = os.fork() - if (pid == 0): - #child - print "ESTOU EM CHILD" - self.path += " -I telnet -d 1> /dev/null 2> /dev/null &" - os.system(self.path) - sys.exit(0) - else: - print "ESTOU EM PARENT 1" - time.sleep(3) - print "ESTOU EM PARENT 2" - self.sock.connect( (self.host, self.port) ) - self.sock.send("%s\n" % self.pwd) - - - def insert_file(self, filename): - - self.sock.send("setup output0 input %s\n" % filename) - - - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port): - - self.filename = filename - self.mux = mux - self.vcodec = vcodec - self.vbitrate = int(vbitrate) - self.fps = int(fps) - self.acodec = acodec - self.abitrate = int(abitrate) - self.width = int(width) - self.height = int(height) - - self.port = int(port) - - - self.pipe = "#transcode{vcodec=%s,vb=%d,"\ - "fps=25.0,scale=1,acodec=mpga,"\ - "ab=64,channels=1,width=%d,height=%d}"\ - ":duplicate{dst=std{access=http,"\ - "mux=mpeg1,dst=:%d}}" % (self.vcodec, self.vbitrate,\ - self.widht, self.height,\ - self.port) - - self.sock.send("setup output0 broadcast %s\n" % self.pipe) - self.insert_file(self.filename) - - def play(self): - - print "Trying to play: %s" % self.pipe - self.sock.send("control output0 play\n") - - - def stop(self): - - print "Trying to stop: %s" % self.pipe - self.sock.send("control output0 stop\n") - - diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/stream.conf --- a/gmyth-stream/server/stream.conf Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,23 +0,0 @@ -[Comm] -engine = tcp -port = 50000 - - -[Media] -engine = mencoder - - -[Vlc] -path = /usr/local/bin/vlc -host = 127.0.0.1 -port = 4212 -pwd = admin - - -[FFmpeg] -path = /usr/local/bin/ffmpeg - - -[Mencoder] -path = /usr/local/bin/mencoder -fifo_path = /tmp/teste diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/tests/client.py --- a/gmyth-stream/server/tests/client.py Wed Apr 18 15:47:40 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -import os -import sys -import time -import socket - - -if len(sys.argv) < 2: - HOST = 'localhost' - PORT = 50000 -elif len(sys.argv) == 2: - HOST = sys.argv[1] - PORT = 50000 -else: - HOST = sys.argv[1] - PORT = int(sys.argv[2]) - -socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) -socket.settimeout(10) - -try: - socket.connect( (HOST,PORT) ) -except: - print "\n--> Could not connect to ('%s':'%d')\n" % (HOST,PORT) - sys.exit(-1) - - -mplayer = os.popen("which mplayer").read().strip() -mplayer += " -idx - -vo x11 1> /dev/null" -pin, pout = os.popen2(mplayer) - -#teste = open("teste.avi", "w") - -data = socket.recv(4096) -i = 0 - -while (data != ""): - pin.write(data) - #teste.write(data) - data = socket.recv(4096) - #if (i == 500): - # socket.send("OK") - i += 1 - -pin.close() -socket.close() -#teste.close() - -# from select import select -# r, w, x = select([pout], []. [], 0) -# if pout in r: -# pout.read(32)