# HG changeset patch # User morphbr # Date 1188378025 -3600 # Node ID 0e159c5e2d320ce3210b0da6bc6630b924a32194 # Parent 0a4e6b811acc686eb32fb7f1411088ef396f1893 [svn r836] - Cleanup of svn (deleted old versions of gms as they are on repository) diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/lib.py --- a/gmyth-stream/server/0.1/lib.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -import time -import logging -import os -import stat - -ext = ['mpg', 'avi', 'mp4', 'nuv', 'mpeg', 'mov'] - -def now(): - return time.strftime("%Y-%m-%d %H:%M:%S"); - -def log(msg): - logging.log(logging.DEBUG, msg) - new_msg = "[%s] %s" % (now(), msg) - return new_msg - - -bin_path_list = os.environ["PATH"].split(os.pathsep) -def which(prg): - for d in bin_path_list: - path = os.path.join(d, prg) - if os.path.exists(path): - st = os.stat(path) - if st[stat.ST_MODE] & 0111: - return path - return "" - -def list_media_files(directory, file_list): - for root, dirs, files in os.walk(directory): - for name in files: - if os.path.splitext(name)[1].strip(".") in ext: - media = os.path.join(root,name) - if media not in file_list: - file_list.append(os.path.join(root,name)) - - return True - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/main.py --- a/gmyth-stream/server/0.1/main.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,185 +0,0 @@ -#!/usr/bin/python - -import os -import lib -import sys -import imp -import ConfigParser -import logging as log - -log.basicConfig(level=log.DEBUG, - format="%(asctime)s %(levelname)-8s %(message)s", - datefmt='%Y-%m-%d %H:%M:%S') - -config = ConfigParser.ConfigParser() -config.read("stream.conf") - -def load_module(pathlist, name): - fp, path, desc = imp.find_module(name, pathlist) - try: - module = imp.load_module(name, fp, path, desc) - return module - finally: - if fp: - fp.close() - - -media_plugin = config.get("Media", "engine") -media_plugin_module = load_module(["./plugins/media"], media_plugin) -media = media_plugin_module.Media(config) - -comm_plugin = config.get("Comm", "engine") -comm_plugin_module = load_module(["./plugins/comm"], comm_plugin) -server = comm_plugin_module.Server(config) - -log.info("Starting GMyth-Stream server") - - -''' -PROTOCOL DESCRIPTION -===================== - -COMMAND OPTIONS - --> SETUP DESCRIPTION -|-> used to setup transcoding and streaming parameters -|-> must be used before any "PLAY" command -|-> e.g: - -file://file_name mux vcodec vbitrate fps acodec abitrate width height options -dvd://title_number mux vcodec vbitrate fps acodec abitrate width height options - --> PLAY DESCRIPTION - |-> used to start transcoding and streaming of file - |-> must be used just if SETUP was used before - |-> after it, _must_ send STOP - --> STOP DESCRIPTION - |-> used to stop transcoding and streaming process - |-> must be used just if PLAY was used before - |-> must be used after PLAY - --> QUIT DESCRIPTION - |-> used to quit the main loop (quit program) - -''' -nextport = 0 -setup = (False, "STOPPED") - -def do_setup(server, filename, mux, vcodec, vbitrate, fps, acodec, abitrate, - width, height, *options): - global nextport - global setup - - if setup[1] != "PLAYING": - nextport += 1 - ret = media.setup(filename, mux, vcodec, vbitrate, fps, acodec, - abitrate, width, height, nextport, options) - if ret[0]: - server.sendOk() - else: - server.sendNotOk(ret[1]) - - setup = (True, setup[1]) - - else: server.sendNotOk("You must STOP before SETingUP again") - - return True - -def do_play(server): - global setup - - if setup[0] and setup[1] == "STOPPED": - setup = (setup[0], "PLAYING") - ret = media.play() - if ret[0]: - server.sendOk("%d" % nextport) - else: - server.sendNotOk(ret[1]) - - else: - if setup[1] == "STOPPED": - server.sendNotOk("You must SETUP before PLAYing") - else: - server.sendNotOk("You must STOP before PLAYing again") - - return True - -def do_stop(server): - global setup - - media.stop() - setup = (False, "STOPPED") - server.sendOk() - return True - -def do_list(server, *directory): - file_list = [] - for j in directory: - lib.list_media_files(j, file_list) - - server.sendOk(file_list) - return True - -def do_quit(server): - server.finish = 1 - media.stop() - server.sendOk() - return True - - -mapping = { - "SETUP": do_setup, - "PLAY": do_play, - "STOP": do_stop, - "LIST": do_list, - "QUIT": do_quit, - } - - -def dispatch(server, msg): - pieces = msg.split() - if len(pieces) < 1: - log.error("Invalid client command format: %r" % msg) - server.sendNotOk("Invalid Format") - return False - - cmd = pieces[0] - f = mapping.get(cmd, None) - if not f: - log.error("Unknow command: %r" % msg) - server.sendNotOk("Unknow Command") - return False - - try: - return f(server, *pieces[1:]) - except Exception, e: - log.error("Could not execute %r: %s" % (msg, e)) - server.sendNotOk(str(e)) - return False - - - -while not server.finish: - conn, client, port = server.getRequest() - if nextport == 0: - nextport = port - - while not server.finish: - msg = server.getMsg() - if not msg: - break - - log.info("Client %s sent command: %r" % (client, msg)) - dispatch(server, msg) - - log.info("Closing connection with %s" % (client,)) - server.disconnect_client(conn) - try: - os.wait() - except Exception, e: - log.error(e) - -server.stop() -log.info("Server stopped. Closing...") - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/comm/tcp.py --- a/gmyth-stream/server/0.1/plugins/comm/tcp.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,79 +0,0 @@ -import lib -import time -import socket -import logging as log - -class Server(object): - - def __init__(self, config): - self.host = '0.0.0.0' - self.port = int(config.get("Comm", "port")) - self.finish = 0 - - addr = (self.host, self.port) - log.debug("Setup TCP server at %s:%s" % addr) - self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.tcp.bind(addr) - self.tcp.listen(1) - log.info("TCP server listening at %s:%s (sock=%d)" % - (self.host, self.port, self.tcp.fileno())) - - def getMsg(self): - bytes = [] - try: - while 1: - c = self.con.recv(1) - bytes.append(c) - if not c or c == "\n": - break - except Exception, e: - log.error("Error reading message from client: %s" % e) - return None - - if not bytes or bytes[-1] != "\n": - msg = "".join(bytes) - log.error("Invalid message from client: %r" % msg) - return None - - # remove \n and \r - bytes.pop() - if bytes[-1] == "\r": - bytes.pop() - - msg = "".join(bytes) - log.debug("RECV: %r" % msg) - return msg - - def sendMsg(self, msg): - log.debug("SEND: %r" % msg) - self.con.send(msg + "\n") - - def sendOk(self, payload=None): - self.sendMsg("OK %d" % bool(payload is not None)) - if payload is not None: - if not isinstance(payload, (tuple, list)): - payload = (payload,) - for e in payload: - self.sendMsg("+%s" % e) - self.sendMsg(".") - - def sendNotOk(self, reason=""): - self.sendMsg("NOTOK %r" % reason) - - def getRequest(self): - log.debug("Wait for client request at %s:%s (sock=%d)" % - (self.host, self.port, self.tcp.fileno())) - self.con, self.client = self.tcp.accept() - log.info("Incoming request from %s (con=%s)" % - (self.client, self.con.fileno())) - return (self.con, self.client, self.port) - - def disconnect_client(self, connection): - log.info("Closed request from %s (con=%s)" % - (self.client, self.con.fileno())) - connection.close() - - def stop(self): - log.debug("Stop") - self.tcp.close() diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/comm/xmlrpc.py --- a/gmyth-stream/server/0.1/plugins/comm/xmlrpc.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,102 +0,0 @@ -import lib -import SimpleXMLRPCServer - - -class Handler: - - def __init__(self, recv_pool, send_pool): - self.recv_pool = recv_pool - self.send_pool = send_pool - self.getMsg = self.sendMsg - - def _listMethods(self): - return ['setup', 'play', 'stop', 'close', 'getMsg'] - - def _methodHelp(self, method): - - if method == 'setup': - return "Setup the Media: setup( filename, mux, vcodec, vbitrate,"\ - " fps, acodec, abitrate, width, height, port, options" - elif method == 'play': - return "Play the Media: play()" - elif method == 'stop': - return "Stop the Media: stop()" - elif method == 'close': - return "Close the connection: close()" - elif method == 'getMsg': - return "Return the first message in the pool: getMsg()" - else: - # By convention, return empty - # string if no help is available - return "" - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port, options): - - msg = "%s %s %s %s %s %s %s" % (filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port) - - if len(options) > 0: - for opt in options: - msg += " %s" % opt - - self.recv_pool.append("SETUP") - self.recv_pool.append(msg) - return self.sendMsg() - - def play(self): - self.recv_pool.append("PLAY") - return self.sendMsg() - - def stop(self): - self.recv_pool.append("STOP") - return self.sendMsg() - - def close(self): - self.recv_pool.append("CLOSE") - return self.sendMsg() - - def sendMsg(self): - if self.send_pool != []: - return self.send_pool.pop(0) - else: - return "" - - -class Server: - - def __init__(self, config): - self.host = 'localhost' - self.port = int(config.get("Comm", "port")) - self.finish = 0 - self.recv_pool = [] - self.send_pool = [] - - self.handler = Handler(self.recv_pool, self.send_pool) - - self.xmlrpc = SimpleXMLRPCServer.SimpleXMLRPCServer((self.host, self.port)) - self.xmlrpc.register_instance(self.handler) - - - def getMsg(self, size): - if self.recv_pool != []: - return self.recv_pool.pop(0) - else: - return "" - - def sendMsg(self, msg): - self.send_pool.append(msg) - - def Ack(self, command): - msg = "[%s] Command %s received" % (lib.now(), command) - self.sendMsg(msg + "\n") - - def getRequest(self): - self.xmlrpc.handle_request() - return (0, "RPC Client") - - def disconnect_client(self, connection): - connection = 0 - - def stop(self): - self.xmlrpc.server_close() diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/media/ffmpeg.py --- a/gmyth-stream/server/0.1/plugins/media/ffmpeg.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,91 +0,0 @@ -import os -import sys -import lib -import time -import socket -import ConfigParser - -class Media: - - def __init__(self, config): - - self.config = config - self.socket = None - self.child_pid = None - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port): - - self.filename = filename - self.mux = mux - self.vcodec = vcodec - self.vbitrate = int(vbitrate) - self.fps = int(fps) - self.acodec = acodec - self.abitrate = int(abitrate) - self.width = int(width) - self.height = int(height) - - self.port = int(port) - - # good one: /tmp/mpg/cpm.mpg mpeg mpeg1video 400 25 mp2 192 320 240 5000 - self.path = self.config.get("FFmpeg", "path") - self.path += " -i %s -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % ( - self.filename, self.mux, self.vcodec, self.vbitrate,\ - self.fps, self.acodec, self.abitrate, self.width, self.height) - - if (self.socket != None): - del(self.socket) - - self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind( ('', self.port) ) - self.socket.settimeout(10) - self.socket.listen(1) - - def play(self): - - lib.log("Starting FFmpeg: %s" % self.path) - - # exec FFmpeg and get stdout - child_stdin, child_stdout = os.popen2(self.path) - child_stdin.close() - - self.child_pid = os.fork() - if (self.child_pid == 0): - #child - - conn,addr= self.socket.accept() - lib.log("Sending Data to client: %s" % addr[0]) - data = child_stdout.read(1024) - conn.settimeout(5) - retry = 0 - - while( data != "" and retry < 5): - try: - conn.send(data) - except socket.error: - lib.log("Socket error (maybe timeout ?)") - retry = retry + 1 - - data = child_stdout.read(1024) - - if (retry < 5): - lib.log("Finished sending Data to client: %s" % addr[0]) - else: - lib.log("Client timed out") - - child_stdout.close() - #conn.close() - #sys.exit() - - - def stop(self): - - if (self.socket != None): - lib.log("Closing socket") - self.socket.close() - - lib.log("Trying to stop FFmpeg process") - if (self.child_pid != None): - os.kill(self.child_pid, 9) diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/media/gstreamer-rtp.py --- a/gmyth-stream/server/0.1/plugins/media/gstreamer-rtp.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,218 +0,0 @@ -import pygst -pygst.require("0.10") -import gst -import gobject - -class Media: - class StreamData: - stream_count = 0 - - def __init__ (self, pipe, abin, vbin): - - self.stream_count += 1 - self.Id = self.stream_count - self.Pipe = pipe - self.Abin = abin - self.Vbin = vbin - self.Loop = gobject.MainLoop() - self.ACaps = "" - self.VCaps = "" - self.Ready = False - - - def __init__(self, config): - # set gstreamer basic options - self.config = config - self.pipe = None - self.streams = [] - - - def setup(self, filename, mux, vcodec, vbitrate, - fps, acodec, abitrate, width, height, port, options): - - ## Pipelines - self.pipe = gst.Pipeline () - uri = "file://" + filename - print "Opening Uri:" + uri - src = gst.element_make_from_uri (gst.URI_SRC, uri, "src") - if (src is None): - return None - - decode = gst.element_factory_make ("decodebin", "decode") - if (decode is None): - return None - - - #video encode - #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000 - vbin = gst.Bin () - vqueue = gst.element_factory_make ("queue", "vqueue") - vscale = gst.element_factory_make ("videoscale", "vscale") - vrate = gst.element_factory_make ("videorate", "vrate") - vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode") - vpay = gst.element_factory_make ("rtpmp4vpay", "vpay") - vsink = gst.element_factory_make ("udpsink", "vsink") - - if (None in [vbin, vqueue, vscale, vrate, vencode, vpay, vsink]): - print "Fail to create video encode elements." - return None - - vscale_pad = vscale.get_pad("sink") - if (vscale_pad is None): - print "Fail to get vscale sink pad." - return None - - vscale_caps = gst.caps_from_string ("video/x-raw-yuv, width=%s, height=%s" % (width, height)) - if (vscale_caps is None): - print "Fail to create video caps" - return None - - if (not vscale_pad.set_caps (vscale_caps)): - print "Fail to set video output caps" - return None - - vencode.set_property ("bitrate", 256000) - vencode.set_property ("me-method", 2) - - vsink.set_property ("host", "224.0.0.1") - vsink.set_property ("port", 5000) - - vbin.add (vqueue, vscale, vrate, vencode, vpay, vsink) - if (not gst.element_link_many (vqueue, vscale, vrate, vencode, vpay, vsink)): - print "Fail to link video elements" - return None - - vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink"))) - - #audio encode - #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002 - abin = gst.Bin () - aqueue = gst.element_factory_make ("queue", "vqueue") - aconvert = gst.element_factory_make ("audioconvert", "aconvert") - aencode = gst.element_factory_make ("faac", "aencode") - apay = gst.element_factory_make ("rtpmp4gpay", "apay") - asink = gst.element_factory_make ("udpsink", "asink") - - if (None in [abin, aqueue, aconvert, aencode, apay, asink]): - print "Fail to create video encode elements." - return None - - asink.set_property ("host", "224.0.0.1") - asink.set_property ("port", 5002) - - abin.add (aqueue, aconvert, aencode, apay, asink) - if (not gst.element_link_many (aqueue, aconvert, aencode, apay, asink)): - print "Fail to link video elements" - return None - - abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink"))) - - self.pipe.add (src, decode, abin, vbin) - gst.element_link_many (src, decode) - - stream_data = self.StreamData (self.pipe, abin, vbin) - - bus = self.pipe.get_bus() - bus.add_signal_watch() - bus.connect("message", self.__on_bus_message, stream_data) - - decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data) - decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data) - - - self.pipe.set_state (gst.STATE_PAUSED) - print "Running Pipe" - stream_data.Loop.run () - print "End run" - - a_caps = stream_data.ACaps - v_caps = stream_data.VCaps - stream_id = stream_data.Id - - self.streams.append (stream_data) - - def play(self): - - print "Trying to play pipeline: %s" % self.pipe - try: - if (self.pipe): - self.pipe.set_state(gst.STATE_PLAYING) - except gobject.GError, e: - print "Error: " + str(e) - - - def stop(self): - - print "Trying to stop pipeline: %s" % self.pipe - try: - if (self.pipeline): - self.pipeline.set_state(gst.STATE_NULL) - except gobject.GError, e: - print "Error: " + str(e) - - def __on_bus_message (self, bus, message, stream_data): - - t = message.type - if (t == gst.MESSAGE_STATE_CHANGED): - oldstate = -1 - newstate = -1 - pending = -1 - oldstate, newstate, pending = message.parse_state_changed () - if ((oldstate == gst.STATE_READY) and \ - (newstate == gst.STATE_PAUSED) and \ - (pending == gst.STATE_VOID_PENDING) and \ - (stream_data.Ready == False)): - state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () - if ((current_state == gst.STATE_PAUSED) and \ - (pending_state == gst.STATE_VOID_PENDING)): - print "Pipe paused" - self.__fill_sink_pads (stream_data) - stream_data.Loop.quit () - stream_data.Ready = True - elif (t == gst.MESSAGE_ERROR): - err, debug = message.parse_error() - print "Error: %s" % err, debug - stream_data.Loop.quit () - stream_data.Ready = False - - return True - - - def __fill_sink_pads (self, stream_data): - - asink = stream_data.Abin.get_by_name ("asink") - vsink = stream_data.Vbin.get_by_name ("vsink") - - asink_pad = asink.get_pad ("sink") - stream_data.ACaps = asink_pad.get_negotiated_caps().to_string() - print "ACAPS " + stream_data.ACaps - - vsink_pad = vsink.get_pad ("sink") - stream_data.VCaps = vsink_pad.get_negotiated_caps().to_string() - print "ACAPS " + stream_data.VCaps - - - - def __on_decode_unknown_type (self, decode, pad, caps, stream_data): - - print "Unknown Type" - return None - - def __on_decode_new_pad (self, decode, pad, arg1, stream_data): - - caps = pad.get_caps().to_string() - print "New pad " + caps - if (caps.rfind ("audio") != -1): - apad = stream_data.Abin.get_pad ("sink") - if (pad.link (apad) != gst.PAD_LINK_OK): - print "Error on link audio pad" - return None - elif (caps.rfind ("video") != -1): - vpad = stream_data.Vbin.get_pad ("sink") - if (pad.link (vpad) != gst.PAD_LINK_OK): - print "Error on link video pad" - return None - else: - print "Invalid caps" - - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/media/gstreamer.py --- a/gmyth-stream/server/0.1/plugins/media/gstreamer.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,290 +0,0 @@ -#vim:ts=4:sw=4:et -import pygst -pygst.require("0.10") -import gst -import gobject -import socket -import time -from threading import Thread - -class Media: - class StreamListener(Thread): - def __init__ (self, stream_data): - Thread.__init__(self) - self.stream = stream_data - print "Thread Created" - - def run (self): - #Create socket - print "Waiting connection" - self.stream.Socket.listen(1) - self.stream.Connection, self.stream.Addr = self.stream.Socket.accept () - print "Connection requested" - self.stream.Sink.set_property ("fd", self.stream.Connection.fileno()) - self.stream.Pipe.set_state(gst.STATE_PLAYING) - print "PLAYING" - - - class StreamData: - stream_count = 0 - - def __init__ (self, pipe, abin, vbin, sink): - self.stream_count += 1 - self.Id = self.stream_count - self.Pipe = pipe - self.Abin = abin - self.Vbin = vbin - self.Sink = sink - self.Loop = gobject.MainLoop() - self.ACaps = "" - self.VCaps = "" - self.Ready = False - self.Socket = None - self.Connection = None - self.Addr = None - - def __init__(self, config): - # set gstreamer basic options - self.config = config - self.streams = [] - self.socket = None - self.connection = None - self.addr = None - self.ready = False - self.current = None - - - def setup(self, uri, mux, vcodec, vbitrate, - fps, acodec, abitrate, width, height, port, options): - - ## Pipelines - pipe = gst.Pipeline () - print "Opening Uri:" + uri - src = gst.element_make_from_uri (gst.URI_SRC, uri, "src") - #src = gst.element_factory_make ("gnomevfssrc", "src") - src.set_property ("location", uri) - if (src is None): - print "Fail to create src element" - return None - - print ("Create source") - decode = gst.element_factory_make ("decodebin", "decode") - if (decode is None): - print "Fail to create decodebin" - return None - - print ("Create source") - mux = gst.element_factory_make ("avimux", "mux") - if (mux is None): - print "Fail to create mux" - return None - - sink = gst.element_factory_make ("fdsink", "sink") - if (sink is None): - print "Fail to create fdsink" - return None - - print ("Create source") - - #video encode - #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000 - vbin = gst.Bin () - vqueue = gst.element_factory_make ("queue", "vqueue") - colorspace = gst.element_factory_make ("ffmpegcolorspace", "") - vrate = gst.element_factory_make ("videorate", "vrate") - vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode") - #vencode = gst.element_factory_make ("ffenc_msmpeg4v1", "vencode") - vqueue_src = gst.element_factory_make ("queue", "vqueue_src") - - #if (int(vbitrate) > 0): - vencode.set_property ("bitrate", 200) - #vencode.set_property ("quant-type", 1) - vencode.set_property ("pass", 2) - vencode.set_property ("quantizer", 5) - #vencode.set_property ("me-method", 1) - - - if (None in [vbin, vqueue, vrate, vencode, vqueue_src]): - print "Fail to create video encode elements." - return None - - vbin.add (vqueue) - if ((int(width) > 0) and (int(height) > 0)): - print ("formating output to %d / %d" % (int(width), int(height))) - - vscale = gst.element_factory_make ("ffvideoscale", "vscale") - - vbin.add (vscale); - if (not vqueue.link (vscale)): - print "Fail to link video elements" - return None - - vbin.add (colorspace) - - if (not vscale.link (colorspace, \ - gst.caps_from_string ("video/x-raw-yuv,width=(int)%d,height=(int)%d" % (int(width), int(height))))): - print "Fail to link video elements" - return None - else: - vbin.add (colorspace) - vqueue.link (colorspace) - - vbin.add (vrate, vencode, vqueue_src) - if (not colorspace.link (vrate)): - print "Fail to colorspace with vrate" - return None - - - if (not vrate.link (vencode, \ - gst.caps_from_string ("video/x-raw-yuv,framerate=(fraction)10/1"))): - print "Fail to link vrate element" - return None - - if (not vencode.link (vqueue_src)): - print "Fail to link video encode with queue" - return None - - vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink"))) - vbin.add_pad (gst.GhostPad ("src", vqueue_src.get_pad ("src"))) - - #audio encode - #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002 - abin = gst.Bin () - aqueue = gst.element_factory_make ("queue", "aqueue") - aconvert = gst.element_factory_make ("audioconvert", "aconvert") - arate = gst.element_factory_make ("audioresample", "arate") - #aencode = gst.element_factory_make ("ffenc_ac3", "aencode") - aencode = gst.element_factory_make ("queue", "aencode") - #aencode = gst.element_factory_make ("lame", "aencode") - #aencode = gst.element_factory_make ("ffenc_mp2", "aencode") - aqueue_src = gst.element_factory_make ("queue", "aqueue_src") - - if (None in [abin, aqueue, arate, aencode, aqueue_src]): - print "Fail to create video encode elements." - return None - - abin.add (aqueue, aconvert, arate, aencode, aqueue_src) - if (not gst.element_link_many (aqueue, aconvert, arate, aencode, aqueue_src)): - print "Fail to link video elements" - return None - - abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink"))) - abin.add_pad (gst.GhostPad ("src", aqueue_src.get_pad ("src"))) - - #Finish Pipeline - pipe.add (src, decode, abin, vbin, mux, sink) - gst.element_link_many (src, decode) - gst.element_link_many (mux, sink) - - #Linking decode with mux - mux_audio = mux.get_pad ("audio_0") - mux_video = mux.get_pad ("video_0") - - audio_pad = abin.get_pad ("src") - video_pad = vbin.get_pad ("src") - - if (audio_pad.link (mux_audio) != gst.PAD_LINK_OK): - print "Fail to link audio with mux" - return None - - if (video_pad.link (mux_video) != gst.PAD_LINK_OK): - print "Fail to link audio with mux" - return None - - stream_data = self.StreamData (pipe, abin, vbin, sink) - bus = pipe.get_bus() - bus.add_signal_watch() - bus.connect ("message", self.__on_bus_message, stream_data) - - decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data) - decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data) - - print ("Create source") - pipe.set_state (gst.STATE_PAUSED) - print "Running Pipe" - stream_data.Loop.run () - print "End run" - - - #Create socket - stream_data.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - print "Bind on port %d" % port - stream_data.Socket.bind(('', int (port))) - self.streams.append (stream_data) - return (True, "") - - def play(self): - print "Play" - stream = self.streams[0] - self.current = self.StreamListener(stream) - self.current.start () - time.sleep (1) - return (True, "") - - def stop(self): - self.current.join () - self.current = None - stream = self.streams[0] - stream.Pipe.set_state(gst.STATE_NULL) - del (stream.Pipe) - stream.Pipe = None - stream.Abin = None - stream.Vbin = None - stream.Sink = None - if (stream.Connection != None): - stream.Connection.close () - - self.streams = [] - time.sleep (5) - return (True, "") - - - def __on_bus_message (self, bus, message, stream_data): - - t = message.type - if (t == gst.MESSAGE_STATE_CHANGED): - oldstate = -1 - newstate = -1 - pending = -1 - oldstate, newstate, pending = message.parse_state_changed () - if ((oldstate == gst.STATE_READY) and \ - (newstate == gst.STATE_PAUSED) and \ - (pending == gst.STATE_VOID_PENDING) and \ - (stream_data.Ready == False)): - state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () - if ((current_state == gst.STATE_PAUSED) and \ - (pending_state == gst.STATE_VOID_PENDING)): - print "Pipe paused" - stream_data.Loop.quit () - stream_data.Ready = True - elif (t == gst.MESSAGE_ERROR): - err, debug = message.parse_error() - print "Error: %s" % err, debug - stream_data.Loop.quit () - stream_data.Ready = False - - return True - - def __on_decode_unknown_type (self, decode, pad, caps, stream_data): - - print "Unknown Type" - return None - - def __on_decode_new_pad (self, decode, pad, arg1, stream_data): - - caps = pad.get_caps().to_string() - print "New pad " + caps - if (caps.rfind ("audio") != -1): - apad = stream_data.Abin.get_pad ("sink") - if (pad.link (apad) != gst.PAD_LINK_OK): - print "Error on link audio pad" - return None - elif (caps.rfind ("video") != -1): - vpad = stream_data.Vbin.get_pad ("sink") - if (pad.link (vpad) != gst.PAD_LINK_OK): - print "Error on link video pad" - return None - else: - print "Invalid caps" - print "Linked" - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/media/mencoder.py --- a/gmyth-stream/server/0.1/plugins/media/mencoder.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,411 +0,0 @@ -from __future__ import division - -import os -import sys -import lib -import time -import shlex -import signal -import socket -import ConfigParser -import logging as log - -from select import * -from subprocess import * - -class Media(object): - - def __init__(self, config): - - self.config = config - self.do_cleanup() - - # __init__() - - - def do_cleanup(self): - self.path = "" - self.args = [] - self.language = None - self.subtitle = None - self.mpegopts = None - self.socket = None - self.child_pid = None - self.mplayer = None - self.mencoder_pid = None - self.mplayer_pid = None - self.audio_opts = None - self.video_opts = None - self.gst_pipe = None - self.gst_pid = None - self.transcode_local = None - - # do_cleanup() - - - def setup_opts(self, options): - - for opt in options: - - if opt == "local": - self.mplayer = lib.which("mplayer") - - elif opt.find("language=") >= 0: - try: - lan = opt.split("=")[1] - if len(lan) < 2: - self.language = lan - except Exception, e: - log.error("Bad language option: %s" % opt) - - elif opt.find("subtitle=") >= 0: - try: - sub = opt.split("=")[1] - if len(sub) < 2: - self.language = sub - except Exception, e: - log.error("Bad subtitle option: %s" % opt) - - elif opt.find("format=") >= 0: - try: - self.mpegopts = opt.split("=")[1] - except Exception, e: - log.error("Bad format option: %s" % opt) - - elif opt.find("outfile=") >= 0: - try: - self.transcode_local = opt.split("=")[1] - except Exception, e: - log.error("Bad outfile option: %s" % opt) - - # setup_opts() - - - def run_mplayer(self): - msg = self.filename - - if self.kind == "dvd": - msg = "dvd://" + msg - - self.mplayer_pid = Popen([self.mplayer, self.filename, "1> %s" % os.devnull,\ - "2> %s" % os.devnull], stdout=PIPE, close_fds=True) - - # run_mplayer() - - - def setup_mencoder(self): - self.path = self.config.get("Mencoder", "path") - mp = Popen([self.path], stdout=PIPE, close_fds=True) - - version = mp.stdout.read().split("MEncoder ")[1].split(" (C)")[0].split("-")[-1] - - if version > "4.1.1": self.mencoder_old = False - else: self.mencoder_old = True - - os.kill(mp.pid, signal.SIGKILL) - log.info("Mencoder version: %s" % version) - - if self.mencoder_old: - try: - self.fifo = self.config.get("Mencoder", "fifo_path") - os.mkfifo(self.fifo) - except Exception, e: - log.info("Fifo: %s" % e) - else: - self.fifo = "-" - - # setup_mencoder() - - - def setup_audio(self): - - if self.acodec == "mp3lame": - return "-oac mp3lame -lameopts cbr:br=%s vol=5" % self.abitrate - else: - return "-oac lavc -lavcopts acodec=%s:abitrate=%s" % (\ - self.acodec, self.abitrate) - - # setup_audio() - - - def setup_video(self): - - video = "" - - video += " -of %s" % self.mux - video += " -ofps %s" % self.fps - - if self.vcodec == "nuv" or self.vcodec == "xvid"\ - or self.vcodec == "qtvideo" or self.vcodec == "copy": - video += " -ovc %s" % self.vcodec - else: - video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % ( - self.vcodec, self.vbitrate) - - if self.mux == "mpeg" and self.mpegopts is not None: - video += " -mpegopts format=%s" % self.mpegopts - - video += " -vf scale=%s:%s" % (self.width, self.height) - - return video - - # setup_video() - - - def arg_append(self, args, options): - l = shlex.split(options) - for i in l: - args.append(i) - - # arg_append() - - - def setup_args(self, args): - - args.append(self.path) - - #args.append(self.filename) - args.append("-") - - if self.language != None: - self.arg_append(args, "-alang %s" % self.language) - - if self.subtitle != None: - self.arg_append(args, "-slang %s" % self.subtitle) - self.arg_append(args, "-subfps %s" % self.fps) - - self.arg_append(args, "-idx") - self.arg_append(args, self.audio_opts) - self.arg_append(args, self.video_opts) - - self.arg_append(args, "-really-quiet") - self.arg_append(args, "-o %s" % self.fifo) - self.arg_append(args, "2> %s" % os.devnull) - - # setup_args() - - - def setup_filename(self, filename): - try: - self.kind, self.filename = filename.split("://") - except: - return (False, "Wrong filename protocol") - - if self.kind == "file": - if not os.path.exists(self.filename): - msg = "File requested does not exist. SETUP failed." - log.error(msg) - return (False, msg) - - elif self.kind == "dvd": - self.filename = "dvd://" + filename - - elif self.kind == "myth": - self.filename = filename - self.gst_pipe = os.pipe() - print self.gst_pipe[0] - print self.gst_pipe[1] - - return (True, "") - - # setup_filename() - - - def setup_socket(self): - if self.socket != None: - self.socket = None - - self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - self.socket.bind( ('', self.port) ) - self.socket.listen(1) - except Exception, e: - log.error("Could not create socket: %s" % e) - return (False, e) - - return (True, "") - - # setup_socket() - - - ''' - MENCODER SETUP DESCRIPTION - =========================== - - -> mux, vcodecs and acodecs - |-> mencoder (-of | -ovc | -oac) help - - -> if used mpeg as mux: - |-> to setup format: format=%s as an option at the end - - ''' - - - # good one: /tmp/dvb.mpg avi mpeg4 400 25 mp3lame 192 320 240 - # file:///tmp/dvb.mpg mpeg mpeg1video 400 25 mp2 192 320 240 format=mpeg1 - # dvd://4 mpeg mpeg1video 400 25 mp3lame 192 400 240 language=en local - # file:///tmp/mpg/bad_day.mpg avi mpeg4 400 25 mp3 192 320 240 - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port, options): - - if self.args != []: - self.do_cleanup() - - self.mux = mux - self.vcodec = vcodec - self.vbitrate = vbitrate - self.fps = fps - self.acodec = acodec - self.abitrate = abitrate - self.width = width - self.height = height - self.port = int(port) - - self.setup_mencoder() - - ret_val = self.setup_filename(filename) - - if not ret_val[0]: - return ret_val - - self.setup_opts(options) - self.audio_opts = self.setup_audio() - self.video_opts = self.setup_video() - self.setup_args(self.args) - - ret_val = self.setup_socket() - return ret_val - - # setup() - - def play_loop(self, conn): - data = self.pout.read(4096) - - conn.settimeout(5) - retry = 0 - - if not self.transcode_local: - while data != "" and retry < 5: - try: - conn.send(data) - r, w, x = select([conn], [], [], 0) - if conn in r: - back = conn.recv(1024) - if back == "OK" and self.mplayer and not self.mplayer_pid: - self.run_mplayer() - - except socket.error, e: - log.error("Socket error: %s" % e) - retry += 1 - - data = self.pout.read(4096) - - else: - local = open(self.transcode_local, "w") - total = os.path.getsize(self.filename) - partial = 4096 - - while data != "": - try: - local.write(data) - except Exception, e: - log.error("Write error: %s" % e) - - data = self.pout.read(4096) - partial += len(data) - conn.send("%.2f\n" % (partial * 100 / total) ) - - local.close() - conn.send("DONE\n") - - return retry - - # play_loop() - - - def play(self): - - if self.gst_pipe: - try: - gst = [ lib.which("gst-launch-0.10"), "--gst-debug-level=0" ] - self.arg_append(gst, "mythtvsrc location=%s" % self.filename) - self.arg_append(gst, "! fdsink fd=2") - self.gst_pid = Popen(gst, close_fds=True) - log.info("Running Gstreamer: %s" % gst); - except Exception, e: - msg = "Could not init Gstreamer: %s" % e - log.error(msg) - return (False, msg) - - - log.info("Starting Mencoder: %s" % self.args ) - try: - if not self.gst_pipe: - self.stdin = open(self.filename) - else: - self.stdin = self.gst_pid.stdout - - self.mencoder_pid = Popen(self.args, stdin=self.stdin, stdout=PIPE, close_fds=True) - except Exception, e: - msg = "Could not init Mencoder: %s" % e - log.error(msg) - return (False, msg) - - if self.mencoder_old: self.pout = open(self.fifo) - else: self.pout = self.mencoder_pid.stdout - - self.child_pid = os.fork() - - if self.child_pid == 0: - conn, addr = self.socket.accept() - - log.info("Sending Data to client: %s" % addr[0]) - retry = self.play_loop(conn) - - if retry < 5: - log.info("Finished sending Data to client: %s" % addr[0]) - else: - log.error("Client timed out, retried more than %s times" % retry) - - os.kill(self.mencoder_pid.pid, signal.SIGKILL) - sys.exit(0) - - return (True, "") - - # play() - - - def stop(self): - try: - - if self.mencoder_pid: - os.kill(self.mencoder_pid.pid, signal.SIGTERM) - self.mencoder_pid = None - - if self.mplayer_pid: - os.kill(self.mplayer_pid.pid, signal.SIGTERM) - self.mplayer_pid = None - - if self.socket: - self.socket.close() - self.socket = None - - if self.child_pid: - os.kill(self.child_pid, signal.SIGTERM) - self.child_pid = None - - if self.gst_pid: - os.kill(self.gst_pid.pid, signal.SIGTERM) - self.gst_pid = None - - self.do_cleanup() - - os.wait() - - except Exception, e: - log.error("Stop error: %s" % e) - - # stop() diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/plugins/media/vlc.py --- a/gmyth-stream/server/0.1/plugins/media/vlc.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,80 +0,0 @@ -import os -import sys -import time -import socket -import ConfigParser - -class Media: - - def __init__(self, config): - - self.config = config - self.pipe = "" - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - self.path = config.get("Vlc", "path") - self.host = config.get("Vlc", "host") - self.port = int(config.get("Vlc", "port")) - self.pwd = config.get("Vlc", "pwd") - - # exec VLC - pid = os.fork() - if (pid == 0): - #child - print "ESTOU EM CHILD" - self.path += " -I telnet -d 1> /dev/null 2> /dev/null &" - os.system(self.path) - sys.exit(0) - else: - print "ESTOU EM PARENT 1" - time.sleep(3) - print "ESTOU EM PARENT 2" - self.sock.connect( (self.host, self.port) ) - self.sock.send("%s\n" % self.pwd) - - - def insert_file(self, filename): - - self.sock.send("setup output0 input %s\n" % filename) - - - - def setup(self, filename, mux, vcodec, vbitrate,\ - fps, acodec, abitrate, width, height, port): - - self.filename = filename - self.mux = mux - self.vcodec = vcodec - self.vbitrate = int(vbitrate) - self.fps = int(fps) - self.acodec = acodec - self.abitrate = int(abitrate) - self.width = int(width) - self.height = int(height) - - self.port = int(port) - - - self.pipe = "#transcode{vcodec=%s,vb=%d,"\ - "fps=25.0,scale=1,acodec=mpga,"\ - "ab=64,channels=1,width=%d,height=%d}"\ - ":duplicate{dst=std{access=http,"\ - "mux=mpeg1,dst=:%d}}" % (self.vcodec, self.vbitrate,\ - self.widht, self.height,\ - self.port) - - self.sock.send("setup output0 broadcast %s\n" % self.pipe) - self.insert_file(self.filename) - - def play(self): - - print "Trying to play: %s" % self.pipe - self.sock.send("control output0 play\n") - - - def stop(self): - - print "Trying to stop: %s" % self.pipe - self.sock.send("control output0 stop\n") - - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/stream.conf --- a/gmyth-stream/server/0.1/stream.conf Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,23 +0,0 @@ -[Comm] -engine = tcp -port = 50000 - - -[Media] -engine = mencoder - - -[Vlc] -path = /usr/local/bin/vlc -host = 127.0.0.1 -port = 4212 -pwd = admin - - -[FFmpeg] -path = /usr/local/bin/ffmpeg - - -[Mencoder] -path = /usr/local/bin/mencoder -fifo_path = /tmp/teste diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.1/tests/client.py --- a/gmyth-stream/server/0.1/tests/client.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -import os -import sys -import time -import socket - - -if len(sys.argv) < 2: - HOST = 'localhost' - PORT = 50000 -elif len(sys.argv) == 2: - HOST = sys.argv[1] - PORT = 50000 -else: - HOST = sys.argv[1] - PORT = int(sys.argv[2]) - -socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) -socket.settimeout(10) - -try: - socket.connect( (HOST,PORT) ) -except: - print "\n--> Could not connect to ('%s':'%d')\n" % (HOST,PORT) - sys.exit(-1) - - -mplayer = os.popen("which mplayer").read().strip() -mplayer += " -idx - -vo x11 1> /dev/null" -pin, pout = os.popen2(mplayer) - -#teste = open("teste.avi", "w") - -data = socket.recv(4096) -i = 0 - -while (data != ""): - pin.write(data) - #teste.write(data) - data = socket.recv(4096) - #if (i == 500): - # socket.send("OK") - i += 1 - -pin.close() -socket.close() -#teste.close() - -# from select import select -# r, w, x = select([pout], []. [], 0) -# if pout in r: -# pout.read(32) diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/gms.py --- a/gmyth-stream/server/0.2/gms.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,20 +0,0 @@ -#!/usr/bin/env python - -import sys -import os -import logging as log -from lib.server import serve_forever, load_plugins_transcoders - -log_level = log.INFO -for p in sys.argv[1:]: - if p == "-v" or p == "--verbose": - log_level -= 10 - -log.basicConfig(level=log_level, - format=("### %(asctime)s %(name)-18s \t%(levelname)-8s " - "\t%(message)s"), - datefmt="%Y-%m-%d %H:%M:%S") - -pd = os.path.join("plugins", "transcoders") -load_plugins_transcoders(pd) -serve_forever() diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/html/index.html --- a/gmyth-stream/server/0.2/html/index.html Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,9 +0,0 @@ - - GMyth-Streamer Server - -

Welcome to GMyth-Streamer Server

- - - \ No newline at end of file diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/html/menu.html --- a/gmyth-stream/server/0.2/html/menu.html Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -
  • %(name)s
  • diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/html/shutdown.html --- a/gmyth-stream/server/0.2/html/shutdown.html Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,6 +0,0 @@ - - GMyth-Streamer Server Exited - -

    GMyth-Streamer is not running anymore

    - - \ No newline at end of file diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/html/status.html --- a/gmyth-stream/server/0.2/html/status.html Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,14 +0,0 @@ - - GMyth-Streamer Server Status - -

    GMyth-Streamer Status

    - - - - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/html/stop_all.html --- a/gmyth-stream/server/0.2/html/stop_all.html Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,9 +0,0 @@ - - GMyth-Streamer Server Stopped Transcoders - -

    GMyth-Streamer stopped running transcoders

    - - - \ No newline at end of file diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/html/stop_selected.html --- a/gmyth-stream/server/0.2/html/stop_selected.html Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,12 +0,0 @@ - - GMyth-Streamer Server Stopped Transcoders - -

    GMyth-Streamer stopped running transcoders:

    - - - - diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/lib/server.py --- a/gmyth-stream/server/0.2/lib/server.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,424 +0,0 @@ -#!/usr/bin/env python - -__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" -__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" -__license__ = "GPL" -__version__ = "0.2" - -import os -import threading -import SocketServer -import BaseHTTPServer -import socket -import urlparse -import cgi -import lib.utils as utils -import logging as log - -__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever", - "load_plugins_transcoders") - -class Transcoder(object): - log = log.getLogger("gms.transcoder") - priority = 0 # negative values have higher priorities - name = None # to be used in requests - status = None - tid = -1 - - def __init__(self, params): - self.params = params - # __init__() - - - def params_first(self, key, default=None): - if default is None: - return self.params[key][0] - else: - try: - return self.params[key][0] - except: - return default - # params_first() - - - def get_mimetype(self): - mux = self.params_first("mux", "mpg") - - if mux == "mpeg": - return "video/mpeg" - elif mux == "avi": - return "video/x-msvideo" - else: - return "application/octet-stream" - # get_mimetype() - - - def start(self, outfile): - return True - # start() - - - def stop(self): - return True - # stop() - - - def __str__(self): - return '%s( params=%s )' % \ - (self.__class__.__name__, - self.params) - # __str__() -# Transcoder - - - -class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): - log = log.getLogger("gms.request") - def_transcoder = None - transcoders = utils.PluginSet(Transcoder) - - menu = { - "Stop": "/stop-transcoder.do", - "Status": "/status.do", - "Version": "/version.do", - "Shutdown": "/shutdown.do" - } - - @classmethod - def load_plugins_transcoders(cls, directory): - cls.transcoders.load_from_directory(directory) - - if cls.def_transcoder is None and cls.transcoders: - cls.def_transcoder = cls.transcoders[0].name - # load_plugins_transcoders() - - - def do_dispatch(self, body): - self.url = self.path - - pieces = urlparse.urlparse(self.path) - self.path = pieces[2] - self.query = cgi.parse_qs(pieces[4]) - - if self.path == "/": - self.serve_main(body) - elif self.path == "/shutdown.do": - self.serve_shutdown(body) - elif self.path == "/stop-transcoder.do": - self.serve_stop_transcoder(body) - elif self.path == "/status.do": - self.serve_status(body) - elif self.path == "/version.do": - self.serve_version(body) - elif self.path == "/stream.do": - self.serve_stream(body) - else: - action = self.query.get("action", None) - if "stream.do" in action: - self.serve_stream(body) - else: - self.send_error(404, "File not found") - # do_dispatch() - - - def do_GET(self): - self.do_dispatch(True) - # do_GET() - - - def do_HEAD(self): - self.do_dispatch(False) - # do_HEAD() - - - def _nav_items(self): - ret = "" - for name, url in self.menu.items(): - ret += utils.getHTML("menu", {"name": name, "url": url}) - - return ret - # _nav_items() - - def serve_main(self, body): - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header('Connection', 'close') - self.end_headers() - if body: - self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()})) - # serve_main() - - def serve_version(self, body): - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header('Connection', 'close') - self.end_headers() - if body: - self.wfile.write("Version: %s" % __version__) - - - def serve_shutdown(self, body): - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header('Connection', 'close') - self.end_headers() - if body: - self.wfile.write(utils.getHTML("shutdown")) - self.server.server_close() - # serve_shutdown() - - - def serve_stop_all_transcoders(self, body): - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header('Connection', 'close') - self.end_headers() - if body: - self.server.stop_transcoders() - self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()})) - # serve_stop_all_transcoders() - - - def serve_stop_selected_transcoders(self, body, requests): - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header('Connection', 'close') - self.end_headers() - opts = "" - if body: - transcoders = self.server.get_transcoders() - - for req in requests: - try: - host, port = req.split(":") - except IndexError: - continue - - port = int(port) - addr = (host, port) - - for t, r in transcoders: - if r.client_address == addr: - try: - t.stop() - except Exception, e: - self.log.info("Plugin already stopped") - - opts += self._create_html_item("%s: %s:%s" % ( - t, addr[0], addr[1])) - - break - - self.wfile.write(utils.getHTML("stop_selected", - {"menu": self._nav_items(), - "opts": opts})) - # serve_stop_selected_transcoders() - - - def serve_stop_transcoder(self, body): - req = self.query.get("request", None) - if req and "all" in req: - self.serve_stop_all_transcoders(body) - elif req: - self.serve_stop_selected_transcoders(body, req) - else: - self.serve_status(body) - # serve_stop_transcoder() - - - def serve_status(self, body): - self.send_response(200) - self.send_header("Content-Type", "text/html") - self.send_header('Connection', 'close') - self.end_headers() - stopone = "" - - if body: - tl = self.server.get_transcoders() - if not tl: - running = "

    No running transcoder.

    \n" - stopall = "" - - elif self.query.get("ip") and self.query.get("file"): - for transcoder, request in tl: - filename = "%s" % self.query.get("file")[0] - tfilename = "%s" % transcoder.params_first("uri") - - if tfilename.find(filename) >= 0 and \ - request.client_address[0] == self.query.get("ip")[0]: - self.wfile.write("Status: %s %%" % transcoder.status) - return True - - return False - - else: - running = "

    Running transcoders:

    \n" - stopall = utils._create_html_item("" - "[STOP ALL]" % - self.menu["Stop"]) - - for transcoder, request in tl: - stopone += utils._create_html_item("%s: %s:%s" - "[STOP] - " - "Status: %s%%"\ - % ( - transcoder, request.client_address[0], - request.client_address[1], - self.menu["Stop"], request.client_address[0], - request.client_address[1], - transcoder.status) ) - - self.wfile.write(utils.getHTML("status", - {"menu": self._nav_items(), - "running": running, - "stopall": stopall, - "stopone": stopone})) - # serve_status() - - - def _get_transcoder(self): - # get transcoder option: mencoder is the default - request_transcoders = self.query.get("transcoder", ["mencoder"]) - - for t in request_transcoders: - transcoder = self.transcoders.get(t) - if transcoder: - return transcoder - - if not transcoder: - return self.transcoders[self.def_transcoder] - # _get_transcoder() - - - def serve_stream(self, body): - transcoder = self._get_transcoder() - try: - obj = transcoder(self.query) - except Exception, e: - self.send_error(500, str(e)) - return - - self.send_response(200) - self.send_header("Content-Type", obj.get_mimetype()) - self.send_header('Connection', 'close') - self.end_headers() - - if body: - self.server.add_transcoders(self, obj) - obj.start(self.wfile) - self.server.del_transcoders(self, obj) - # serve_stream() - - - def log_request(self, code='-', size='-'): - self.log.info('"%s" %s %s', self.requestline, str(code), str(size)) - # log_request() - - - def log_error(self, format, *args): - self.log.error("%s: %s" % (self.address_string(), format % args)) - # log_error() - - - def log_message(self, format, *args): - self.log.info("%s: %s" % (self.address_string(), format % args)) - # log_message() -# RequestHandler - - - -class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): - log = log.getLogger("gms.server") - run = True - _transcoders = {} - _lock = threading.RLock() - - def serve_forever(self): - self.log.info("GMyth-Streamer serving HTTP on %s:%s" % - self.socket.getsockname()) - try: - while self.run: - self.handle_request() - except KeyboardInterrupt, e: - pass - - self.log.debug("Stopping all remaining transcoders...") - self.stop_transcoders() - self.log.debug("Transcoders stopped!") - # serve_forever() - - - def get_request(self): - skt = self.socket - old = skt.gettimeout() - skt.settimeout(0.5) - while self.run: - try: - r = skt.accept() - skt.settimeout(old) - return r - except socket.timeout, e: - pass - raise socket.error("Not running") - # get_request() - - - def server_close(self): - self.run = False - self.stop_transcoders() - - BaseHTTPServer.HTTPServer.server_close(self) - # server_close() - - - def stop_transcoders(self): - self._lock.acquire() - for transcoder, request in self._transcoders.iteritems(): - self.log.info("Stop transcoder: %s, client=%s" % - (transcoder, request.client_address)) - transcoder.stop() - self._lock.release() - # stop_transcoders() - - - def get_transcoders(self): - self._lock.acquire() - try: - return self._transcoders.items() - finally: - self._lock.release() - # get_transcoders() - - - def add_transcoders(self, request, transcoder): - self._lock.acquire() - try: - self._transcoders[transcoder] = request - finally: - self._lock.release() - # add_transcoders() - - - def del_transcoders(self, request, transcoder): - self._lock.acquire() - try: - del self._transcoders[transcoder] - finally: - self._lock.release() - # del_transcoders() -# Server - - - -def serve_forever(host="0.0.0.0", port=40000): - addr = (host, port) - RequestHandler.protocol_version = "HTTP/1.0" - httpd = Server(addr, RequestHandler) - httpd.serve_forever() -# serve_forever() - - -def load_plugins_transcoders(directory): - RequestHandler.load_plugins_transcoders(directory) -# load_plugins_transcoders() diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/lib/utils.py --- a/gmyth-stream/server/0.2/lib/utils.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,178 +0,0 @@ -#!/usr/bin/env - -__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza" -__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br" -__license__ = "GPL" -__version__ = "0.3" - -import os -import stat -import sys -import logging -import urllib -import gobject -import imp - -log = logging.getLogger("gms.utils") - -__all__ = ("which", "load_plugins", "PluginSet", "getHTML") - -def which(app): - """Function to implement which(1) unix command""" - pl = os.environ["PATH"].split(os.pathsep) - for p in pl: - path = os.path.join(p, app) - if os.path.isfile(path): - st = os.stat(path) - if st[stat.ST_MODE] & 0111: - return path - return "" -# which() - - -def _load_module(pathlist, name): - fp, path, desc = imp.find_module(name, pathlist) - try: - module = imp.load_module(name, fp, path, desc) - return module - finally: - if fp: - fp.close() -# _load_module() - - -class PluginSet(object): - def __init__(self, basetype, *items): - self.basetype = basetype - self.map = {} - self.list = [] - - for i in items: - self._add(i) - self._sort() - # __init__() - - - def _add(self, item): - self.map[item.name] = item - self.list.append(item) - # _add() - - - def add(self, item): - self._add() - self._sort() - # add() - - - def __getitem__(self, spec): - if isinstance(spec, basestring): - return self.map[spec] - else: - return self.list[spec] - # __getitem__() - - - def get(self, name, default=None): - return self.map.get(name, default) - # get() - - - def __iter__(self): - return self.list.__iter__() - # __iter__() - - - def __len__(self): - return len(self.list) - # __len__() - - - def _sort(self): - self.list.sort(lambda a, b: cmp(a.priority, b.priority)) - # _sort() - - - def update(self, pluginset): - self.map.update(pluginset.map) - self.list.extend(pluginset.list) - self._sort() - # update() - - - def load_from_directory(self, directory): - for i in load_plugins(directory, self.basetype): - self._add(i) - self._sort() - # load_from_directory() - - - def __str__(self): - lst = [] - for o in self.list: - lst.append('"%s" (%s)' % (o.name, o.__name__)) - - return "%s(basetype=%s, items=[%s])" % \ - (self.__class__.__name__, - self.basetype.__name__, - ", ".join(lst)) - # __str__() -# PluginSet - - -def load_plugins(directory, basetype): - """Function to load plugins from a given directory""" - tn = basetype.__name__ - log.debug("Loading plugins from %s, type=%s" % (directory, tn)) - - - plugins = [] - for d in os.listdir(directory): - if not d.endswith(".py"): - continue - - name = d[0: -3] - if name == "__init__": - continue - - directory.replace(os.path.sep, ".") - mod = _load_module([directory], name) - for sym in dir(mod): - cls = getattr(mod, sym) - if isinstance(cls, type) and issubclass(cls, basetype) and \ - cls != basetype: - plugins.append(cls) - log.info("Loaded %s (%s) from %s" % \ - (cls.__name__, tn, os.path.join(directory, d))) - - return plugins -# load_plugins() - -def getHTML(html_file, params={}): - """This function parses a file 'html_file.html' with the given - parameters and returns a formated web-page""" - try: - filename = os.path.join(sys.path[0], "html", html_file + ".html") - html = open(filename).read() % params - return html - except Exception, e: - return "HTML format error. Wrong keys: %s" % e - -# getHTML - -def _create_html_item(opt): - return "
  • %s
  • \n" % opt -# _create_html_item - -def progress_bar(log, value, max, barsize): - chars = int(value * barsize / float(max)) - percent = int((value / float(max)) * 100) - sys.stdout.write("#" * chars) - sys.stdout.write(" " * (barsize - chars + 2)) - if value >= max: - sys.stdout.write("done. \n\n") - else: - sys.stdout.write("[%3i%%]\r" % (percent)) - sys.stdout.flush() - return percent -# progress_bar by osantana diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/plugins/transcoders/gmencoder.py --- a/gmyth-stream/server/0.2/plugins/transcoders/gmencoder.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,91 +0,0 @@ -import os -import shlex -import signal -import subprocess -import time - -import select - -import lib.utils as utils -import lib.server as server - -__all__ = ("TranscoderGMencoder",) - -class TranscoderGMencoder(server.Transcoder): - gmencoder_path = utils.which("gmencoder") - name = "gmencoder" - priority = -1 - proc = None - - def __init__(self, params): - server.Transcoder.__init__(self, params) - self.opts = [] - # __init__() - - def _insert_param (self, name, value): - if (value != ""): - self.opts.append(name) - self.opts.append(value) - - def _parser_params (self): - self._insert_param("-i", \ - "%s://%s" % (self.params_first("type", "file"), self.params_first("uri", ""))) - self._insert_param("--video-encode", self.params_first("ve", "")) - self._insert_param("--video-opts", "bitrate=200,pass=2,quantizer=5") - self._insert_param("--video-fps", self.params_first("fps", "")) - self._insert_param("--video-width", self.params_first("width", "")) - self._insert_param("--video-height", self.params_first("height", "")) - self._insert_param("--audio-rate", "32000") - self._insert_param("--audio-encode", self.params_first("ae", "")) - # _parse_params - - def start(self, outfd): - self.opts.append (self.gmencoder_path) - self._parser_params () - self._insert_param ("-o", "fd://%d" % outfd.fileno()) - - cmd = " ".join(self.opts) - self.log.info ("GMencoder: %s", cmd) - - try: - self.proc = subprocess.Popen(self.opts, stdin=subprocess.PIPE, stdout=subprocess.PIPE) - except Exception, e: - self.log.error("Error executing GMencoder: %s" % e) - return False - - try: - while (self.proc and self.proc.poll() == None): - r, w, x = select.select([self.proc.stdout], [], [], 0) - if self.proc.stdout in r: - progress = self.proc.stdout.readline() - self.log.info ("stdout %s" % progress) - if (progress.find ("PROGRESS") >= 0): - self.status = progress.split (":")[1] - #if (progress.find ("DONE") >= 0): - # break - self.log.info ("Process exit") - except Exception, e: - self.log.error("Problems handling data: %s" % e) - return False - - return True - # start() - - - def stop(self): - if self.proc: - self.log.info ("STOPED GMencoder plugin") - try: - self.proc.stdin.write ("QUIT\n") - except Exception, e: - pass - - try: - self.proc.wait() - except Exception, e: - pass - - self.proc = None - # stop() - -# TranscoderGMencoder diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/plugins/transcoders/mencoder.py --- a/gmyth-stream/server/0.2/plugins/transcoders/mencoder.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,293 +0,0 @@ -import os -import shlex -import signal -import subprocess -import time -import fcntl - -import lib.utils as utils -import lib.server as server -import plugins.transcoders.mencoder_lib.mythtv as mythtv - -from select import select - -__all__ = ("TranscoderMencoder",) - -class TranscoderMencoder(server.Transcoder): - """Transcoder class that implements a transcoder using Mencoder""" - mencoder_path = utils.which("mencoder") - name = "mencoder" - priority = -1 - args = {} - proc = None - gmyth = None - - # only works with avi container - status = 0 - - def _setup_params(self): - params_first = self.params_first - - # general_opts - self.args["local"] = params_first("local", False) - self.args["language"] = params_first("language", False) - self.args["subtitle"] = params_first("subtitle", False) - self.args["format"] = params_first("format", "mpeg1") - self.args["outfile"] = params_first("outfile", "-") - - # input_opt - self.args["type"] = params_first("type", "file") - self.args["input"] = params_first("uri", "-") - - # audio_opts - self.args["acodec"] = params_first("acodec", "mp2") - self.args["abitrate"] = params_first("abitrate", 192) - self.args["volume"] = params_first("volume", 5) - - # video_opts - self.args["mux"] = params_first("mux", "mpeg") - self.args["fps"] = params_first("fps", 25) - self.args["vcodec"] = params_first("vcodec", "mpeg1video") - self.args["vbitrate"] = params_first("vbitrate", 400) - self.args["width"] = params_first("width", 320) - self.args["height"] = params_first("height", 240) - # _setup_params() - - - def _setup_audio(self): - if self.args["acodec"] == "mp3lame": - audio = "-oac mp3lame -lameopts cbr:br=%s vol=%s" % ( - self.args["abitrate"], self.args["volume"]) - else: - audio = "-oac lavc -lavcopts acodec=%s:abitrate=%s" % ( - self.args["acodec"], self.args["abitrate"]) - - return audio - # _setup_audio() - - - def _setup_video(self): - video = " -of %s" % self.args["mux"] - video += " -ofps %s" % self.args["fps"] - - vcodec = self.args["vcodec"] - if vcodec == "nuv" or vcodec == "xvid"\ - or vcodec == "qtvideo" or vcodec == "copy": - video += " -ovc %s" % vcodec - else: - video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % ( - vcodec, self.args["vbitrate"]) - - if self.args["mux"] == "mpeg": - video += " -mpegopts format=%s" % self.args["format"] - video += " -vf scale=%s:%s" % (self.args["width"], self.args["height"]) - - return video - # _setup_video() - - - def _arg_append(self, args, options): - for arg in shlex.split(options): - args.append(arg) - # arg_append() - - def _setup_mencoder_opts(self, args): - args.append(self.mencoder_path) - - if self.args["outfile"] == "-" and self.args["type"]: - args.append(self.args["input"]) - else: - args.append("-") - - if self.args["language"]: - self._arg_append(args, "-alang %s" % self.args["language"]) - - if self.args["subtitle"]: - self._arg_append(args, "-slang %s" % self.args["subtitle"]) - self._arg_append(args, "-subfps %s" % self.args["fps"]) - - self._arg_append(args, "-idx") - self._arg_append(args, "-cache 1024") - self._arg_append(args, self._setup_audio()) - self._arg_append(args, self._setup_video()) - - self._arg_append(args, "-really-quiet") - self._arg_append(args, "-o %s" % self.args["outfile"]) - self._arg_append(args, "2>%s" % os.devnull) - # _setup_args() - - def _setup_filename(self): - """This function setups the file to encode parsing the uri. - So, type can be: - * file - * dvd - * myth - - If the last one is detected we have to parse the uri to find args. - Then we store all the args inside a dictionary: self.args['gmyth-cat'] - """ - _type = self.args["type"] - - if _type == "file": - if not os.path.exists(self.args["input"]): - raise IOError,\ - "File requested does not exist: %s." % self.args["input"] - else: - self.args["input"] = "file://%s" % self.args["input"] - - elif _type == "dvd": - self.args["input"] = "dvd://".join(self.args["input"]) - - elif _type == "myth": - self.args["gmyth-cat"] = mythtv._setup_mythfilename(self) - # _setup_filename() - - - def __init__(self, params): - server.Transcoder.__init__(self, params) - self.mencoder_opts = [] - - try: - self._setup_params() - self._setup_filename() - self._setup_mencoder_opts(self.mencoder_opts) - except Exception, e: - self.log.error(e) - # __init__() - - - def _check_opened_file(self, stdw, _stdin): - loop = True - while loop: - try: - return open(self.args["outfile"]) - except: - os.write(stdw, _stdin.read(1024)) - # _check_opened_file - - - def _start_outfile(self, outfd): - finished = False - - # fix this (not necessary) - outfd.write("OK") - - # Configuring stdin - try: - _stdin = open(self.args["input"]) - size = int(os.path.getsize(self.args["input"])) - except Exception, e: - self.log.error("Mencoder stdin setup error: %s" % e) - return False - - self.status = 0 - total_read = 0 - - # Configuring pipes - stdr, stdw = os.pipe() - - if not self._run_mencoder(input=stdr): - return False - - stdout = self._check_opened_file(stdw, _stdin) - - try: - while self.proc and self.proc.poll() == None: - if not finished: - data_in = _stdin.read(4096) - if data_in != "": - os.write(stdw, data_in) - total_read += 4096 - d = stdout.read(4096) - self.status = utils.progress_bar(self.log, - int(total_read), - int(size), 50) - else: - finished = True - os.close(stdw) - - else: - d = stdout.read(4096) - - except Exception, e: - self.log.error("Problems handling data: %s" % e) - self.stop() - return False - - self.log.info("%s: Finished sending data to client" % repr(self)) - return True - # _start_outfile() - - def _start(self, outfd): - # Play a file on disk or DVD - if not self._run_mencoder(output=subprocess.PIPE): - return False - - try: - while self.proc and self.proc.poll() == None: - d = self.proc.stdout.read(1024) - outfd.write(d) - except Exception, e: - self.log.error("Problems handling data: %s" % e) - return False - - self.log.info("%s: Finished sending data to client" % repr(self)) - return True - # _start() - - def _run_mencoder(self, input=None, output=None): - try: - self.proc = subprocess.Popen(self.mencoder_opts, stdin=input, - stdout=output, close_fds=True) - except Exception, e: - self.log.error("Error executing mencoder: %s" % e) - return False - - return True - # _run_mencoder() - - def start(self, outfd): - cmd = " ".join(self.mencoder_opts) - self.log.debug("Mencoder: %s" % cmd) - - ret = False - - if self.args["outfile"] == "-" and \ - self.args["type"] in ["file", "dvd"]: - ret = self._start(outfd) - - elif self.args["type"] == "myth": - ret = mythtv.start_myth(self, outfd) - - else: - ret = self._start_outfile(outfd) - - self.stop() - - if not ret: - self.log.error("Problems while starting streaming.") - - return ret - # start() - - def _aux_stop(self, obj): - if obj: - try: - os.kill(obj.pid, signal.SIGKILL) - except OSError, e: - pass - - try: - obj.wait() - except Exception, e: - pass - - obj = None - # _aux_stop - - def stop(self): - self._aux_stop(self.proc) - self._aux_stop(self.gmyth) - # stop() - -# TranscoderMencoder diff -r 0a4e6b811acc -r 0e159c5e2d32 gmyth-stream/server/0.2/plugins/transcoders/mencoder_lib/mythtv.py --- a/gmyth-stream/server/0.2/plugins/transcoders/mencoder_lib/mythtv.py Tue Aug 28 15:41:35 2007 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,106 +0,0 @@ -import os -import subprocess -import fcntl - -import lib.utils as utils -import lib.server as server - -from select import select - -def _setup_mythfilename(self): - # mythtv:mythtv@192.168.3.110:6543/1002_20070426230000.nuv - try: - _mysql = self.args["input"].split("@")[0].split(":") - except IndexError, e: - _mysql = ["mythtv", "mythtv"] - - try: - _args = self.args["input"].split("@")[1].split(":") - except IndexError, e: - _args = self.args["input"].split(":") - - gmyth_dict = {} - gmyth_dict["mysql"] = _mysql - gmyth_dict["backend"] = _args[0] - gmyth_dict["port"] = _args[1].split("/", 1)[0] - - _tmp_file = _args[1].split("/", 1)[1] - - if _tmp_file.find("channel") >= 0: - gmyth_dict["kind"] = "c" - gmyth_dict["cfile"] = _tmp_file.split("=")[1] - else: - gmyth_dict["kind"] = "f" - gmyth_dict["cfile"] = _tmp_file - - self.args["input"] = "-" - return gmyth_dict -# _setup_mythfilename - -def _setup_mythfile(err): - size = err.readline().split("Size:")[1] - flags = fcntl.fcntl (err, fcntl.F_GETFL, 0) | os.O_NONBLOCK - fcntl.fcntl(err, fcntl.F_SETFL, flags) - return size -# _setup_mythfile - -def _setup_gmythcat(self): - gmyth_cat = utils.which("gmyth-cat") - if self.args.has_key("gmyth-cat"): - return [ utils.which("gmyth-cat"), - "-h", self.args["gmyth-cat"]["backend"], - "-p", self.args["gmyth-cat"]["port"], - "-" + self.args["gmyth-cat"]["kind"], - self.args["gmyth-cat"]["cfile"] - ] - else: - self.log.error("URI error") - return [] -# _setup_gmythcat - -def start_myth(self, outfd): - opts = _setup_gmythcat(self) - try: - self.gmyth = subprocess.Popen(opts, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True) - except Exception, e: - self.log.error("Error executing gmyth-cat: %s" % e) - return False - - if not self._run_mencoder(input=self.gmyth.stdout, - output=subprocess.PIPE): - return False - - if self.args["gmyth-cat"]["kind"] == "f": - try: - size = _setup_mythfile(self.gmyth.stderr) - self.log.debug("Size of file: %s" % size) - except Exception, e: - self.log.error("Problems getting size of file: %s" % e) - return False - - try: - while self.proc and self.proc.poll() == None: - r, w, x = select([self.gmyth.stderr, self.proc.stdout], - [], [], 0) - if self.proc.stdout in r: - d = self.proc.stdout.read(4096) - outfd.write(d) - - if self.gmyth.stderr in r: - partial = self.gmyth.stderr.read(50).split("\n")[-2] - if partial != "": - self.status = utils.progress_bar(self.log, - int(partial), - int(size), 50) - - except IndexError, e: - pass - except Exception, e: - self.log.error("Problems handling data: %s" % e) - return False - - self.log.info("Finished sending data") - return True -# _start_myth()