# HG changeset patch
# User morphbr
# Date 1176908350 -3600
# Node ID ed34b1dab10357caadf0cf0e419bf14e70959d6e
# Parent 1b897f699097eb297928ea2847ac8e9fdcc92e42
[svn r570] - Included new core for GMyth-Streamer (0.2)
- New core based on code by Gustavo Barbieri
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/lib.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/lib.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,36 @@
+import time
+import logging
+import os
+import stat
+
+ext = ['mpg', 'avi', 'mp4', 'nuv', 'mpeg', 'mov']
+
+def now():
+ return time.strftime("%Y-%m-%d %H:%M:%S");
+
+def log(msg):
+ logging.log(logging.DEBUG, msg)
+ new_msg = "[%s] %s" % (now(), msg)
+ return new_msg
+
+
+bin_path_list = os.environ["PATH"].split(os.pathsep)
+def which(prg):
+ for d in bin_path_list:
+ path = os.path.join(d, prg)
+ if os.path.exists(path):
+ st = os.stat(path)
+ if st[stat.ST_MODE] & 0111:
+ return path
+ return ""
+
+def list_media_files(directory, file_list):
+ for root, dirs, files in os.walk(directory):
+ for name in files:
+ if os.path.splitext(name)[1].strip(".") in ext:
+ media = os.path.join(root,name)
+ if media not in file_list:
+ file_list.append(os.path.join(root,name))
+
+ return True
+
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/main.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/main.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,185 @@
+#!/usr/bin/python
+
+import os
+import lib
+import sys
+import imp
+import ConfigParser
+import logging as log
+
+log.basicConfig(level=log.DEBUG,
+ format="%(asctime)s %(levelname)-8s %(message)s",
+ datefmt='%Y-%m-%d %H:%M:%S')
+
+config = ConfigParser.ConfigParser()
+config.read("stream.conf")
+
+def load_module(pathlist, name):
+ fp, path, desc = imp.find_module(name, pathlist)
+ try:
+ module = imp.load_module(name, fp, path, desc)
+ return module
+ finally:
+ if fp:
+ fp.close()
+
+
+media_plugin = config.get("Media", "engine")
+media_plugin_module = load_module(["./plugins/media"], media_plugin)
+media = media_plugin_module.Media(config)
+
+comm_plugin = config.get("Comm", "engine")
+comm_plugin_module = load_module(["./plugins/comm"], comm_plugin)
+server = comm_plugin_module.Server(config)
+
+log.info("Starting GMyth-Stream server")
+
+
+'''
+PROTOCOL DESCRIPTION
+=====================
+
+COMMAND OPTIONS
+
+-> SETUP DESCRIPTION
+|-> used to setup transcoding and streaming parameters
+|-> must be used before any "PLAY" command
+|-> e.g:
+
+file://file_name mux vcodec vbitrate fps acodec abitrate width height options
+dvd://title_number mux vcodec vbitrate fps acodec abitrate width height options
+
+-> PLAY DESCRIPTION
+ |-> used to start transcoding and streaming of file
+ |-> must be used just if SETUP was used before
+ |-> after it, _must_ send STOP
+
+-> STOP DESCRIPTION
+ |-> used to stop transcoding and streaming process
+ |-> must be used just if PLAY was used before
+ |-> must be used after PLAY
+
+-> QUIT DESCRIPTION
+ |-> used to quit the main loop (quit program)
+
+'''
+nextport = 0
+setup = (False, "STOPPED")
+
+def do_setup(server, filename, mux, vcodec, vbitrate, fps, acodec, abitrate,
+ width, height, *options):
+ global nextport
+ global setup
+
+ if setup[1] != "PLAYING":
+ nextport += 1
+ ret = media.setup(filename, mux, vcodec, vbitrate, fps, acodec,
+ abitrate, width, height, nextport, options)
+ if ret[0]:
+ server.sendOk()
+ else:
+ server.sendNotOk(ret[1])
+
+ setup = (True, setup[1])
+
+ else: server.sendNotOk("You must STOP before SETingUP again")
+
+ return True
+
+def do_play(server):
+ global setup
+
+ if setup[0] and setup[1] == "STOPPED":
+ setup = (setup[0], "PLAYING")
+ ret = media.play()
+ if ret[0]:
+ server.sendOk("%d" % nextport)
+ else:
+ server.sendNotOk(ret[1])
+
+ else:
+ if setup[1] == "STOPPED":
+ server.sendNotOk("You must SETUP before PLAYing")
+ else:
+ server.sendNotOk("You must STOP before PLAYing again")
+
+ return True
+
+def do_stop(server):
+ global setup
+
+ media.stop()
+ setup = (False, "STOPPED")
+ server.sendOk()
+ return True
+
+def do_list(server, *directory):
+ file_list = []
+ for j in directory:
+ lib.list_media_files(j, file_list)
+
+ server.sendOk(file_list)
+ return True
+
+def do_quit(server):
+ server.finish = 1
+ media.stop()
+ server.sendOk()
+ return True
+
+
+mapping = {
+ "SETUP": do_setup,
+ "PLAY": do_play,
+ "STOP": do_stop,
+ "LIST": do_list,
+ "QUIT": do_quit,
+ }
+
+
+def dispatch(server, msg):
+ pieces = msg.split()
+ if len(pieces) < 1:
+ log.error("Invalid client command format: %r" % msg)
+ server.sendNotOk("Invalid Format")
+ return False
+
+ cmd = pieces[0]
+ f = mapping.get(cmd, None)
+ if not f:
+ log.error("Unknow command: %r" % msg)
+ server.sendNotOk("Unknow Command")
+ return False
+
+ try:
+ return f(server, *pieces[1:])
+ except Exception, e:
+ log.error("Could not execute %r: %s" % (msg, e))
+ server.sendNotOk(str(e))
+ return False
+
+
+
+while not server.finish:
+ conn, client, port = server.getRequest()
+ if nextport == 0:
+ nextport = port
+
+ while not server.finish:
+ msg = server.getMsg()
+ if not msg:
+ break
+
+ log.info("Client %s sent command: %r" % (client, msg))
+ dispatch(server, msg)
+
+ log.info("Closing connection with %s" % (client,))
+ server.disconnect_client(conn)
+ try:
+ os.wait()
+ except Exception, e:
+ log.error(e)
+
+server.stop()
+log.info("Server stopped. Closing...")
+
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/comm/tcp.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/comm/tcp.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,79 @@
+import lib
+import time
+import socket
+import logging as log
+
+class Server(object):
+
+ def __init__(self, config):
+ self.host = '0.0.0.0'
+ self.port = int(config.get("Comm", "port"))
+ self.finish = 0
+
+ addr = (self.host, self.port)
+ log.debug("Setup TCP server at %s:%s" % addr)
+ self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.tcp.bind(addr)
+ self.tcp.listen(1)
+ log.info("TCP server listening at %s:%s (sock=%d)" %
+ (self.host, self.port, self.tcp.fileno()))
+
+ def getMsg(self):
+ bytes = []
+ try:
+ while 1:
+ c = self.con.recv(1)
+ bytes.append(c)
+ if not c or c == "\n":
+ break
+ except Exception, e:
+ log.error("Error reading message from client: %s" % e)
+ return None
+
+ if not bytes or bytes[-1] != "\n":
+ msg = "".join(bytes)
+ log.error("Invalid message from client: %r" % msg)
+ return None
+
+ # remove \n and \r
+ bytes.pop()
+ if bytes[-1] == "\r":
+ bytes.pop()
+
+ msg = "".join(bytes)
+ log.debug("RECV: %r" % msg)
+ return msg
+
+ def sendMsg(self, msg):
+ log.debug("SEND: %r" % msg)
+ self.con.send(msg + "\n")
+
+ def sendOk(self, payload=None):
+ self.sendMsg("OK %d" % bool(payload is not None))
+ if payload is not None:
+ if not isinstance(payload, (tuple, list)):
+ payload = (payload,)
+ for e in payload:
+ self.sendMsg("+%s" % e)
+ self.sendMsg(".")
+
+ def sendNotOk(self, reason=""):
+ self.sendMsg("NOTOK %r" % reason)
+
+ def getRequest(self):
+ log.debug("Wait for client request at %s:%s (sock=%d)" %
+ (self.host, self.port, self.tcp.fileno()))
+ self.con, self.client = self.tcp.accept()
+ log.info("Incoming request from %s (con=%s)" %
+ (self.client, self.con.fileno()))
+ return (self.con, self.client, self.port)
+
+ def disconnect_client(self, connection):
+ log.info("Closed request from %s (con=%s)" %
+ (self.client, self.con.fileno()))
+ connection.close()
+
+ def stop(self):
+ log.debug("Stop")
+ self.tcp.close()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/comm/xmlrpc.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/comm/xmlrpc.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,102 @@
+import lib
+import SimpleXMLRPCServer
+
+
+class Handler:
+
+ def __init__(self, recv_pool, send_pool):
+ self.recv_pool = recv_pool
+ self.send_pool = send_pool
+ self.getMsg = self.sendMsg
+
+ def _listMethods(self):
+ return ['setup', 'play', 'stop', 'close', 'getMsg']
+
+ def _methodHelp(self, method):
+
+ if method == 'setup':
+ return "Setup the Media: setup( filename, mux, vcodec, vbitrate,"\
+ " fps, acodec, abitrate, width, height, port, options"
+ elif method == 'play':
+ return "Play the Media: play()"
+ elif method == 'stop':
+ return "Stop the Media: stop()"
+ elif method == 'close':
+ return "Close the connection: close()"
+ elif method == 'getMsg':
+ return "Return the first message in the pool: getMsg()"
+ else:
+ # By convention, return empty
+ # string if no help is available
+ return ""
+
+ def setup(self, filename, mux, vcodec, vbitrate,\
+ fps, acodec, abitrate, width, height, port, options):
+
+ msg = "%s %s %s %s %s %s %s" % (filename, mux, vcodec, vbitrate,\
+ fps, acodec, abitrate, width, height, port)
+
+ if len(options) > 0:
+ for opt in options:
+ msg += " %s" % opt
+
+ self.recv_pool.append("SETUP")
+ self.recv_pool.append(msg)
+ return self.sendMsg()
+
+ def play(self):
+ self.recv_pool.append("PLAY")
+ return self.sendMsg()
+
+ def stop(self):
+ self.recv_pool.append("STOP")
+ return self.sendMsg()
+
+ def close(self):
+ self.recv_pool.append("CLOSE")
+ return self.sendMsg()
+
+ def sendMsg(self):
+ if self.send_pool != []:
+ return self.send_pool.pop(0)
+ else:
+ return ""
+
+
+class Server:
+
+ def __init__(self, config):
+ self.host = 'localhost'
+ self.port = int(config.get("Comm", "port"))
+ self.finish = 0
+ self.recv_pool = []
+ self.send_pool = []
+
+ self.handler = Handler(self.recv_pool, self.send_pool)
+
+ self.xmlrpc = SimpleXMLRPCServer.SimpleXMLRPCServer((self.host, self.port))
+ self.xmlrpc.register_instance(self.handler)
+
+
+ def getMsg(self, size):
+ if self.recv_pool != []:
+ return self.recv_pool.pop(0)
+ else:
+ return ""
+
+ def sendMsg(self, msg):
+ self.send_pool.append(msg)
+
+ def Ack(self, command):
+ msg = "[%s] Command %s received" % (lib.now(), command)
+ self.sendMsg(msg + "\n")
+
+ def getRequest(self):
+ self.xmlrpc.handle_request()
+ return (0, "RPC Client")
+
+ def disconnect_client(self, connection):
+ connection = 0
+
+ def stop(self):
+ self.xmlrpc.server_close()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/ffmpeg.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/media/ffmpeg.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,91 @@
+import os
+import sys
+import lib
+import time
+import socket
+import ConfigParser
+
+class Media:
+
+ def __init__(self, config):
+
+ self.config = config
+ self.socket = None
+ self.child_pid = None
+
+ def setup(self, filename, mux, vcodec, vbitrate,\
+ fps, acodec, abitrate, width, height, port):
+
+ self.filename = filename
+ self.mux = mux
+ self.vcodec = vcodec
+ self.vbitrate = int(vbitrate)
+ self.fps = int(fps)
+ self.acodec = acodec
+ self.abitrate = int(abitrate)
+ self.width = int(width)
+ self.height = int(height)
+
+ self.port = int(port)
+
+ # good one: /tmp/mpg/cpm.mpg mpeg mpeg1video 400 25 mp2 192 320 240 5000
+ self.path = self.config.get("FFmpeg", "path")
+ self.path += " -i %s -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % (
+ self.filename, self.mux, self.vcodec, self.vbitrate,\
+ self.fps, self.acodec, self.abitrate, self.width, self.height)
+
+ if (self.socket != None):
+ del(self.socket)
+
+ self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind( ('', self.port) )
+ self.socket.settimeout(10)
+ self.socket.listen(1)
+
+ def play(self):
+
+ lib.log("Starting FFmpeg: %s" % self.path)
+
+ # exec FFmpeg and get stdout
+ child_stdin, child_stdout = os.popen2(self.path)
+ child_stdin.close()
+
+ self.child_pid = os.fork()
+ if (self.child_pid == 0):
+ #child
+
+ conn,addr= self.socket.accept()
+ lib.log("Sending Data to client: %s" % addr[0])
+ data = child_stdout.read(1024)
+ conn.settimeout(5)
+ retry = 0
+
+ while( data != "" and retry < 5):
+ try:
+ conn.send(data)
+ except socket.error:
+ lib.log("Socket error (maybe timeout ?)")
+ retry = retry + 1
+
+ data = child_stdout.read(1024)
+
+ if (retry < 5):
+ lib.log("Finished sending Data to client: %s" % addr[0])
+ else:
+ lib.log("Client timed out")
+
+ child_stdout.close()
+ #conn.close()
+ #sys.exit()
+
+
+ def stop(self):
+
+ if (self.socket != None):
+ lib.log("Closing socket")
+ self.socket.close()
+
+ lib.log("Trying to stop FFmpeg process")
+ if (self.child_pid != None):
+ os.kill(self.child_pid, 9)
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/gstreamer-rtp.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/media/gstreamer-rtp.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,218 @@
+import pygst
+pygst.require("0.10")
+import gst
+import gobject
+
+class Media:
+ class StreamData:
+ stream_count = 0
+
+ def __init__ (self, pipe, abin, vbin):
+
+ self.stream_count += 1
+ self.Id = self.stream_count
+ self.Pipe = pipe
+ self.Abin = abin
+ self.Vbin = vbin
+ self.Loop = gobject.MainLoop()
+ self.ACaps = ""
+ self.VCaps = ""
+ self.Ready = False
+
+
+ def __init__(self, config):
+ # set gstreamer basic options
+ self.config = config
+ self.pipe = None
+ self.streams = []
+
+
+ def setup(self, filename, mux, vcodec, vbitrate,
+ fps, acodec, abitrate, width, height, port, options):
+
+ ## Pipelines
+ self.pipe = gst.Pipeline ()
+ uri = "file://" + filename
+ print "Opening Uri:" + uri
+ src = gst.element_make_from_uri (gst.URI_SRC, uri, "src")
+ if (src is None):
+ return None
+
+ decode = gst.element_factory_make ("decodebin", "decode")
+ if (decode is None):
+ return None
+
+
+ #video encode
+ #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000
+ vbin = gst.Bin ()
+ vqueue = gst.element_factory_make ("queue", "vqueue")
+ vscale = gst.element_factory_make ("videoscale", "vscale")
+ vrate = gst.element_factory_make ("videorate", "vrate")
+ vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode")
+ vpay = gst.element_factory_make ("rtpmp4vpay", "vpay")
+ vsink = gst.element_factory_make ("udpsink", "vsink")
+
+ if (None in [vbin, vqueue, vscale, vrate, vencode, vpay, vsink]):
+ print "Fail to create video encode elements."
+ return None
+
+ vscale_pad = vscale.get_pad("sink")
+ if (vscale_pad is None):
+ print "Fail to get vscale sink pad."
+ return None
+
+ vscale_caps = gst.caps_from_string ("video/x-raw-yuv, width=%s, height=%s" % (width, height))
+ if (vscale_caps is None):
+ print "Fail to create video caps"
+ return None
+
+ if (not vscale_pad.set_caps (vscale_caps)):
+ print "Fail to set video output caps"
+ return None
+
+ vencode.set_property ("bitrate", 256000)
+ vencode.set_property ("me-method", 2)
+
+ vsink.set_property ("host", "224.0.0.1")
+ vsink.set_property ("port", 5000)
+
+ vbin.add (vqueue, vscale, vrate, vencode, vpay, vsink)
+ if (not gst.element_link_many (vqueue, vscale, vrate, vencode, vpay, vsink)):
+ print "Fail to link video elements"
+ return None
+
+ vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink")))
+
+ #audio encode
+ #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002
+ abin = gst.Bin ()
+ aqueue = gst.element_factory_make ("queue", "vqueue")
+ aconvert = gst.element_factory_make ("audioconvert", "aconvert")
+ aencode = gst.element_factory_make ("faac", "aencode")
+ apay = gst.element_factory_make ("rtpmp4gpay", "apay")
+ asink = gst.element_factory_make ("udpsink", "asink")
+
+ if (None in [abin, aqueue, aconvert, aencode, apay, asink]):
+ print "Fail to create video encode elements."
+ return None
+
+ asink.set_property ("host", "224.0.0.1")
+ asink.set_property ("port", 5002)
+
+ abin.add (aqueue, aconvert, aencode, apay, asink)
+ if (not gst.element_link_many (aqueue, aconvert, aencode, apay, asink)):
+ print "Fail to link video elements"
+ return None
+
+ abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink")))
+
+ self.pipe.add (src, decode, abin, vbin)
+ gst.element_link_many (src, decode)
+
+ stream_data = self.StreamData (self.pipe, abin, vbin)
+
+ bus = self.pipe.get_bus()
+ bus.add_signal_watch()
+ bus.connect("message", self.__on_bus_message, stream_data)
+
+ decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data)
+ decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data)
+
+
+ self.pipe.set_state (gst.STATE_PAUSED)
+ print "Running Pipe"
+ stream_data.Loop.run ()
+ print "End run"
+
+ a_caps = stream_data.ACaps
+ v_caps = stream_data.VCaps
+ stream_id = stream_data.Id
+
+ self.streams.append (stream_data)
+
+ def play(self):
+
+ print "Trying to play pipeline: %s" % self.pipe
+ try:
+ if (self.pipe):
+ self.pipe.set_state(gst.STATE_PLAYING)
+ except gobject.GError, e:
+ print "Error: " + str(e)
+
+
+ def stop(self):
+
+ print "Trying to stop pipeline: %s" % self.pipe
+ try:
+ if (self.pipeline):
+ self.pipeline.set_state(gst.STATE_NULL)
+ except gobject.GError, e:
+ print "Error: " + str(e)
+
+ def __on_bus_message (self, bus, message, stream_data):
+
+ t = message.type
+ if (t == gst.MESSAGE_STATE_CHANGED):
+ oldstate = -1
+ newstate = -1
+ pending = -1
+ oldstate, newstate, pending = message.parse_state_changed ()
+ if ((oldstate == gst.STATE_READY) and \
+ (newstate == gst.STATE_PAUSED) and \
+ (pending == gst.STATE_VOID_PENDING) and \
+ (stream_data.Ready == False)):
+ state_changed_status, current_state, pending_state = stream_data.Pipe.get_state ()
+ if ((current_state == gst.STATE_PAUSED) and \
+ (pending_state == gst.STATE_VOID_PENDING)):
+ print "Pipe paused"
+ self.__fill_sink_pads (stream_data)
+ stream_data.Loop.quit ()
+ stream_data.Ready = True
+ elif (t == gst.MESSAGE_ERROR):
+ err, debug = message.parse_error()
+ print "Error: %s" % err, debug
+ stream_data.Loop.quit ()
+ stream_data.Ready = False
+
+ return True
+
+
+ def __fill_sink_pads (self, stream_data):
+
+ asink = stream_data.Abin.get_by_name ("asink")
+ vsink = stream_data.Vbin.get_by_name ("vsink")
+
+ asink_pad = asink.get_pad ("sink")
+ stream_data.ACaps = asink_pad.get_negotiated_caps().to_string()
+ print "ACAPS " + stream_data.ACaps
+
+ vsink_pad = vsink.get_pad ("sink")
+ stream_data.VCaps = vsink_pad.get_negotiated_caps().to_string()
+ print "ACAPS " + stream_data.VCaps
+
+
+
+ def __on_decode_unknown_type (self, decode, pad, caps, stream_data):
+
+ print "Unknown Type"
+ return None
+
+ def __on_decode_new_pad (self, decode, pad, arg1, stream_data):
+
+ caps = pad.get_caps().to_string()
+ print "New pad " + caps
+ if (caps.rfind ("audio") != -1):
+ apad = stream_data.Abin.get_pad ("sink")
+ if (pad.link (apad) != gst.PAD_LINK_OK):
+ print "Error on link audio pad"
+ return None
+ elif (caps.rfind ("video") != -1):
+ vpad = stream_data.Vbin.get_pad ("sink")
+ if (pad.link (vpad) != gst.PAD_LINK_OK):
+ print "Error on link video pad"
+ return None
+ else:
+ print "Invalid caps"
+
+
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/gstreamer.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/media/gstreamer.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,290 @@
+#vim:ts=4:sw=4:et
+import pygst
+pygst.require("0.10")
+import gst
+import gobject
+import socket
+import time
+from threading import Thread
+
+class Media:
+ class StreamListener(Thread):
+ def __init__ (self, stream_data):
+ Thread.__init__(self)
+ self.stream = stream_data
+ print "Thread Created"
+
+ def run (self):
+ #Create socket
+ print "Waiting connection"
+ self.stream.Socket.listen(1)
+ self.stream.Connection, self.stream.Addr = self.stream.Socket.accept ()
+ print "Connection requested"
+ self.stream.Sink.set_property ("fd", self.stream.Connection.fileno())
+ self.stream.Pipe.set_state(gst.STATE_PLAYING)
+ print "PLAYING"
+
+
+ class StreamData:
+ stream_count = 0
+
+ def __init__ (self, pipe, abin, vbin, sink):
+ self.stream_count += 1
+ self.Id = self.stream_count
+ self.Pipe = pipe
+ self.Abin = abin
+ self.Vbin = vbin
+ self.Sink = sink
+ self.Loop = gobject.MainLoop()
+ self.ACaps = ""
+ self.VCaps = ""
+ self.Ready = False
+ self.Socket = None
+ self.Connection = None
+ self.Addr = None
+
+ def __init__(self, config):
+ # set gstreamer basic options
+ self.config = config
+ self.streams = []
+ self.socket = None
+ self.connection = None
+ self.addr = None
+ self.ready = False
+ self.current = None
+
+
+ def setup(self, uri, mux, vcodec, vbitrate,
+ fps, acodec, abitrate, width, height, port, options):
+
+ ## Pipelines
+ pipe = gst.Pipeline ()
+ print "Opening Uri:" + uri
+ src = gst.element_make_from_uri (gst.URI_SRC, uri, "src")
+ #src = gst.element_factory_make ("gnomevfssrc", "src")
+ src.set_property ("location", uri)
+ if (src is None):
+ print "Fail to create src element"
+ return None
+
+ print ("Create source")
+ decode = gst.element_factory_make ("decodebin", "decode")
+ if (decode is None):
+ print "Fail to create decodebin"
+ return None
+
+ print ("Create source")
+ mux = gst.element_factory_make ("avimux", "mux")
+ if (mux is None):
+ print "Fail to create mux"
+ return None
+
+ sink = gst.element_factory_make ("fdsink", "sink")
+ if (sink is None):
+ print "Fail to create fdsink"
+ return None
+
+ print ("Create source")
+
+ #video encode
+ #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000
+ vbin = gst.Bin ()
+ vqueue = gst.element_factory_make ("queue", "vqueue")
+ colorspace = gst.element_factory_make ("ffmpegcolorspace", "")
+ vrate = gst.element_factory_make ("videorate", "vrate")
+ vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode")
+ #vencode = gst.element_factory_make ("ffenc_msmpeg4v1", "vencode")
+ vqueue_src = gst.element_factory_make ("queue", "vqueue_src")
+
+ #if (int(vbitrate) > 0):
+ vencode.set_property ("bitrate", 200)
+ #vencode.set_property ("quant-type", 1)
+ vencode.set_property ("pass", 2)
+ vencode.set_property ("quantizer", 5)
+ #vencode.set_property ("me-method", 1)
+
+
+ if (None in [vbin, vqueue, vrate, vencode, vqueue_src]):
+ print "Fail to create video encode elements."
+ return None
+
+ vbin.add (vqueue)
+ if ((int(width) > 0) and (int(height) > 0)):
+ print ("formating output to %d / %d" % (int(width), int(height)))
+
+ vscale = gst.element_factory_make ("ffvideoscale", "vscale")
+
+ vbin.add (vscale);
+ if (not vqueue.link (vscale)):
+ print "Fail to link video elements"
+ return None
+
+ vbin.add (colorspace)
+
+ if (not vscale.link (colorspace, \
+ gst.caps_from_string ("video/x-raw-yuv,width=(int)%d,height=(int)%d" % (int(width), int(height))))):
+ print "Fail to link video elements"
+ return None
+ else:
+ vbin.add (colorspace)
+ vqueue.link (colorspace)
+
+ vbin.add (vrate, vencode, vqueue_src)
+ if (not colorspace.link (vrate)):
+ print "Fail to colorspace with vrate"
+ return None
+
+
+ if (not vrate.link (vencode, \
+ gst.caps_from_string ("video/x-raw-yuv,framerate=(fraction)10/1"))):
+ print "Fail to link vrate element"
+ return None
+
+ if (not vencode.link (vqueue_src)):
+ print "Fail to link video encode with queue"
+ return None
+
+ vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink")))
+ vbin.add_pad (gst.GhostPad ("src", vqueue_src.get_pad ("src")))
+
+ #audio encode
+ #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002
+ abin = gst.Bin ()
+ aqueue = gst.element_factory_make ("queue", "aqueue")
+ aconvert = gst.element_factory_make ("audioconvert", "aconvert")
+ arate = gst.element_factory_make ("audioresample", "arate")
+ #aencode = gst.element_factory_make ("ffenc_ac3", "aencode")
+ aencode = gst.element_factory_make ("queue", "aencode")
+ #aencode = gst.element_factory_make ("lame", "aencode")
+ #aencode = gst.element_factory_make ("ffenc_mp2", "aencode")
+ aqueue_src = gst.element_factory_make ("queue", "aqueue_src")
+
+ if (None in [abin, aqueue, arate, aencode, aqueue_src]):
+ print "Fail to create video encode elements."
+ return None
+
+ abin.add (aqueue, aconvert, arate, aencode, aqueue_src)
+ if (not gst.element_link_many (aqueue, aconvert, arate, aencode, aqueue_src)):
+ print "Fail to link video elements"
+ return None
+
+ abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink")))
+ abin.add_pad (gst.GhostPad ("src", aqueue_src.get_pad ("src")))
+
+ #Finish Pipeline
+ pipe.add (src, decode, abin, vbin, mux, sink)
+ gst.element_link_many (src, decode)
+ gst.element_link_many (mux, sink)
+
+ #Linking decode with mux
+ mux_audio = mux.get_pad ("audio_0")
+ mux_video = mux.get_pad ("video_0")
+
+ audio_pad = abin.get_pad ("src")
+ video_pad = vbin.get_pad ("src")
+
+ if (audio_pad.link (mux_audio) != gst.PAD_LINK_OK):
+ print "Fail to link audio with mux"
+ return None
+
+ if (video_pad.link (mux_video) != gst.PAD_LINK_OK):
+ print "Fail to link audio with mux"
+ return None
+
+ stream_data = self.StreamData (pipe, abin, vbin, sink)
+ bus = pipe.get_bus()
+ bus.add_signal_watch()
+ bus.connect ("message", self.__on_bus_message, stream_data)
+
+ decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data)
+ decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data)
+
+ print ("Create source")
+ pipe.set_state (gst.STATE_PAUSED)
+ print "Running Pipe"
+ stream_data.Loop.run ()
+ print "End run"
+
+
+ #Create socket
+ stream_data.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ print "Bind on port %d" % port
+ stream_data.Socket.bind(('', int (port)))
+ self.streams.append (stream_data)
+ return (True, "")
+
+ def play(self):
+ print "Play"
+ stream = self.streams[0]
+ self.current = self.StreamListener(stream)
+ self.current.start ()
+ time.sleep (1)
+ return (True, "")
+
+ def stop(self):
+ self.current.join ()
+ self.current = None
+ stream = self.streams[0]
+ stream.Pipe.set_state(gst.STATE_NULL)
+ del (stream.Pipe)
+ stream.Pipe = None
+ stream.Abin = None
+ stream.Vbin = None
+ stream.Sink = None
+ if (stream.Connection != None):
+ stream.Connection.close ()
+
+ self.streams = []
+ time.sleep (5)
+ return (True, "")
+
+
+ def __on_bus_message (self, bus, message, stream_data):
+
+ t = message.type
+ if (t == gst.MESSAGE_STATE_CHANGED):
+ oldstate = -1
+ newstate = -1
+ pending = -1
+ oldstate, newstate, pending = message.parse_state_changed ()
+ if ((oldstate == gst.STATE_READY) and \
+ (newstate == gst.STATE_PAUSED) and \
+ (pending == gst.STATE_VOID_PENDING) and \
+ (stream_data.Ready == False)):
+ state_changed_status, current_state, pending_state = stream_data.Pipe.get_state ()
+ if ((current_state == gst.STATE_PAUSED) and \
+ (pending_state == gst.STATE_VOID_PENDING)):
+ print "Pipe paused"
+ stream_data.Loop.quit ()
+ stream_data.Ready = True
+ elif (t == gst.MESSAGE_ERROR):
+ err, debug = message.parse_error()
+ print "Error: %s" % err, debug
+ stream_data.Loop.quit ()
+ stream_data.Ready = False
+
+ return True
+
+ def __on_decode_unknown_type (self, decode, pad, caps, stream_data):
+
+ print "Unknown Type"
+ return None
+
+ def __on_decode_new_pad (self, decode, pad, arg1, stream_data):
+
+ caps = pad.get_caps().to_string()
+ print "New pad " + caps
+ if (caps.rfind ("audio") != -1):
+ apad = stream_data.Abin.get_pad ("sink")
+ if (pad.link (apad) != gst.PAD_LINK_OK):
+ print "Error on link audio pad"
+ return None
+ elif (caps.rfind ("video") != -1):
+ vpad = stream_data.Vbin.get_pad ("sink")
+ if (pad.link (vpad) != gst.PAD_LINK_OK):
+ print "Error on link video pad"
+ return None
+ else:
+ print "Invalid caps"
+ print "Linked"
+
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/mencoder.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/media/mencoder.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,411 @@
+from __future__ import division
+
+import os
+import sys
+import lib
+import time
+import shlex
+import signal
+import socket
+import ConfigParser
+import logging as log
+
+from select import *
+from subprocess import *
+
+class Media(object):
+
+ def __init__(self, config):
+
+ self.config = config
+ self.do_cleanup()
+
+ # __init__()
+
+
+ def do_cleanup(self):
+ self.path = ""
+ self.args = []
+ self.language = None
+ self.subtitle = None
+ self.mpegopts = None
+ self.socket = None
+ self.child_pid = None
+ self.mplayer = None
+ self.mencoder_pid = None
+ self.mplayer_pid = None
+ self.audio_opts = None
+ self.video_opts = None
+ self.gst_pipe = None
+ self.gst_pid = None
+ self.transcode_local = None
+
+ # do_cleanup()
+
+
+ def setup_opts(self, options):
+
+ for opt in options:
+
+ if opt == "local":
+ self.mplayer = lib.which("mplayer")
+
+ elif opt.find("language=") >= 0:
+ try:
+ lan = opt.split("=")[1]
+ if len(lan) < 2:
+ self.language = lan
+ except Exception, e:
+ log.error("Bad language option: %s" % opt)
+
+ elif opt.find("subtitle=") >= 0:
+ try:
+ sub = opt.split("=")[1]
+ if len(sub) < 2:
+ self.language = sub
+ except Exception, e:
+ log.error("Bad subtitle option: %s" % opt)
+
+ elif opt.find("format=") >= 0:
+ try:
+ self.mpegopts = opt.split("=")[1]
+ except Exception, e:
+ log.error("Bad format option: %s" % opt)
+
+ elif opt.find("outfile=") >= 0:
+ try:
+ self.transcode_local = opt.split("=")[1]
+ except Exception, e:
+ log.error("Bad outfile option: %s" % opt)
+
+ # setup_opts()
+
+
+ def run_mplayer(self):
+ msg = self.filename
+
+ if self.kind == "dvd":
+ msg = "dvd://" + msg
+
+ self.mplayer_pid = Popen([self.mplayer, self.filename, "1> %s" % os.devnull,\
+ "2> %s" % os.devnull], stdout=PIPE, close_fds=True)
+
+ # run_mplayer()
+
+
+ def setup_mencoder(self):
+ self.path = self.config.get("Mencoder", "path")
+ mp = Popen([self.path], stdout=PIPE, close_fds=True)
+
+ version = mp.stdout.read().split("MEncoder ")[1].split(" (C)")[0].split("-")[-1]
+
+ if version > "4.1.1": self.mencoder_old = False
+ else: self.mencoder_old = True
+
+ os.kill(mp.pid, signal.SIGKILL)
+ log.info("Mencoder version: %s" % version)
+
+ if self.mencoder_old:
+ try:
+ self.fifo = self.config.get("Mencoder", "fifo_path")
+ os.mkfifo(self.fifo)
+ except Exception, e:
+ log.info("Fifo: %s" % e)
+ else:
+ self.fifo = "-"
+
+ # setup_mencoder()
+
+
+ def setup_audio(self):
+
+ if self.acodec == "mp3lame":
+ return "-oac mp3lame -lameopts cbr:br=%s vol=5" % self.abitrate
+ else:
+ return "-oac lavc -lavcopts acodec=%s:abitrate=%s" % (\
+ self.acodec, self.abitrate)
+
+ # setup_audio()
+
+
+ def setup_video(self):
+
+ video = ""
+
+ video += " -of %s" % self.mux
+ video += " -ofps %s" % self.fps
+
+ if self.vcodec == "nuv" or self.vcodec == "xvid"\
+ or self.vcodec == "qtvideo" or self.vcodec == "copy":
+ video += " -ovc %s" % self.vcodec
+ else:
+ video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % (
+ self.vcodec, self.vbitrate)
+
+ if self.mux == "mpeg" and self.mpegopts is not None:
+ video += " -mpegopts format=%s" % self.mpegopts
+
+ video += " -vf scale=%s:%s" % (self.width, self.height)
+
+ return video
+
+ # setup_video()
+
+
+ def arg_append(self, args, options):
+ l = shlex.split(options)
+ for i in l:
+ args.append(i)
+
+ # arg_append()
+
+
+ def setup_args(self, args):
+
+ args.append(self.path)
+
+ #args.append(self.filename)
+ args.append("-")
+
+ if self.language != None:
+ self.arg_append(args, "-alang %s" % self.language)
+
+ if self.subtitle != None:
+ self.arg_append(args, "-slang %s" % self.subtitle)
+ self.arg_append(args, "-subfps %s" % self.fps)
+
+ self.arg_append(args, "-idx")
+ self.arg_append(args, self.audio_opts)
+ self.arg_append(args, self.video_opts)
+
+ self.arg_append(args, "-really-quiet")
+ self.arg_append(args, "-o %s" % self.fifo)
+ self.arg_append(args, "2> %s" % os.devnull)
+
+ # setup_args()
+
+
+ def setup_filename(self, filename):
+ try:
+ self.kind, self.filename = filename.split("://")
+ except:
+ return (False, "Wrong filename protocol")
+
+ if self.kind == "file":
+ if not os.path.exists(self.filename):
+ msg = "File requested does not exist. SETUP failed."
+ log.error(msg)
+ return (False, msg)
+
+ elif self.kind == "dvd":
+ self.filename = "dvd://" + filename
+
+ elif self.kind == "myth":
+ self.filename = filename
+ self.gst_pipe = os.pipe()
+ print self.gst_pipe[0]
+ print self.gst_pipe[1]
+
+ return (True, "")
+
+ # setup_filename()
+
+
+ def setup_socket(self):
+ if self.socket != None:
+ self.socket = None
+
+ self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ try:
+ self.socket.bind( ('', self.port) )
+ self.socket.listen(1)
+ except Exception, e:
+ log.error("Could not create socket: %s" % e)
+ return (False, e)
+
+ return (True, "")
+
+ # setup_socket()
+
+
+ '''
+ MENCODER SETUP DESCRIPTION
+ ===========================
+
+ -> mux, vcodecs and acodecs
+ |-> mencoder (-of | -ovc | -oac) help
+
+ -> if used mpeg as mux:
+ |-> to setup format: format=%s as an option at the end
+
+ '''
+
+
+ # good one: /tmp/dvb.mpg avi mpeg4 400 25 mp3lame 192 320 240
+ # file:///tmp/dvb.mpg mpeg mpeg1video 400 25 mp2 192 320 240 format=mpeg1
+ # dvd://4 mpeg mpeg1video 400 25 mp3lame 192 400 240 language=en local
+ # file:///tmp/mpg/bad_day.mpg avi mpeg4 400 25 mp3 192 320 240
+
+ def setup(self, filename, mux, vcodec, vbitrate,\
+ fps, acodec, abitrate, width, height, port, options):
+
+ if self.args != []:
+ self.do_cleanup()
+
+ self.mux = mux
+ self.vcodec = vcodec
+ self.vbitrate = vbitrate
+ self.fps = fps
+ self.acodec = acodec
+ self.abitrate = abitrate
+ self.width = width
+ self.height = height
+ self.port = int(port)
+
+ self.setup_mencoder()
+
+ ret_val = self.setup_filename(filename)
+
+ if not ret_val[0]:
+ return ret_val
+
+ self.setup_opts(options)
+ self.audio_opts = self.setup_audio()
+ self.video_opts = self.setup_video()
+ self.setup_args(self.args)
+
+ ret_val = self.setup_socket()
+ return ret_val
+
+ # setup()
+
+ def play_loop(self, conn):
+ data = self.pout.read(4096)
+
+ conn.settimeout(5)
+ retry = 0
+
+ if not self.transcode_local:
+ while data != "" and retry < 5:
+ try:
+ conn.send(data)
+ r, w, x = select([conn], [], [], 0)
+ if conn in r:
+ back = conn.recv(1024)
+ if back == "OK" and self.mplayer and not self.mplayer_pid:
+ self.run_mplayer()
+
+ except socket.error, e:
+ log.error("Socket error: %s" % e)
+ retry += 1
+
+ data = self.pout.read(4096)
+
+ else:
+ local = open(self.transcode_local, "w")
+ total = os.path.getsize(self.filename)
+ partial = 4096
+
+ while data != "":
+ try:
+ local.write(data)
+ except Exception, e:
+ log.error("Write error: %s" % e)
+
+ data = self.pout.read(4096)
+ partial += len(data)
+ conn.send("%.2f\n" % (partial * 100 / total) )
+
+ local.close()
+ conn.send("DONE\n")
+
+ return retry
+
+ # play_loop()
+
+
+ def play(self):
+
+ if self.gst_pipe:
+ try:
+ gst = [ lib.which("gst-launch-0.10"), "--gst-debug-level=0" ]
+ self.arg_append(gst, "mythtvsrc location=%s" % self.filename)
+ self.arg_append(gst, "! fdsink fd=2")
+ self.gst_pid = Popen(gst, close_fds=True)
+ log.info("Running Gstreamer: %s" % gst);
+ except Exception, e:
+ msg = "Could not init Gstreamer: %s" % e
+ log.error(msg)
+ return (False, msg)
+
+
+ log.info("Starting Mencoder: %s" % self.args )
+ try:
+ if not self.gst_pipe:
+ self.stdin = open(self.filename)
+ else:
+ self.stdin = self.gst_pid.stdout
+
+ self.mencoder_pid = Popen(self.args, stdin=self.stdin, stdout=PIPE, close_fds=True)
+ except Exception, e:
+ msg = "Could not init Mencoder: %s" % e
+ log.error(msg)
+ return (False, msg)
+
+ if self.mencoder_old: self.pout = open(self.fifo)
+ else: self.pout = self.mencoder_pid.stdout
+
+ self.child_pid = os.fork()
+
+ if self.child_pid == 0:
+ conn, addr = self.socket.accept()
+
+ log.info("Sending Data to client: %s" % addr[0])
+ retry = self.play_loop(conn)
+
+ if retry < 5:
+ log.info("Finished sending Data to client: %s" % addr[0])
+ else:
+ log.error("Client timed out, retried more than %s times" % retry)
+
+ os.kill(self.mencoder_pid.pid, signal.SIGKILL)
+ sys.exit(0)
+
+ return (True, "")
+
+ # play()
+
+
+ def stop(self):
+ try:
+
+ if self.mencoder_pid:
+ os.kill(self.mencoder_pid.pid, signal.SIGTERM)
+ self.mencoder_pid = None
+
+ if self.mplayer_pid:
+ os.kill(self.mplayer_pid.pid, signal.SIGTERM)
+ self.mplayer_pid = None
+
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+
+ if self.child_pid:
+ os.kill(self.child_pid, signal.SIGTERM)
+ self.child_pid = None
+
+ if self.gst_pid:
+ os.kill(self.gst_pid.pid, signal.SIGTERM)
+ self.gst_pid = None
+
+ self.do_cleanup()
+
+ os.wait()
+
+ except Exception, e:
+ log.error("Stop error: %s" % e)
+
+ # stop()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/plugins/media/vlc.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/plugins/media/vlc.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,80 @@
+import os
+import sys
+import time
+import socket
+import ConfigParser
+
+class Media:
+
+ def __init__(self, config):
+
+ self.config = config
+ self.pipe = ""
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ self.path = config.get("Vlc", "path")
+ self.host = config.get("Vlc", "host")
+ self.port = int(config.get("Vlc", "port"))
+ self.pwd = config.get("Vlc", "pwd")
+
+ # exec VLC
+ pid = os.fork()
+ if (pid == 0):
+ #child
+ print "ESTOU EM CHILD"
+ self.path += " -I telnet -d 1> /dev/null 2> /dev/null &"
+ os.system(self.path)
+ sys.exit(0)
+ else:
+ print "ESTOU EM PARENT 1"
+ time.sleep(3)
+ print "ESTOU EM PARENT 2"
+ self.sock.connect( (self.host, self.port) )
+ self.sock.send("%s\n" % self.pwd)
+
+
+ def insert_file(self, filename):
+
+ self.sock.send("setup output0 input %s\n" % filename)
+
+
+
+ def setup(self, filename, mux, vcodec, vbitrate,\
+ fps, acodec, abitrate, width, height, port):
+
+ self.filename = filename
+ self.mux = mux
+ self.vcodec = vcodec
+ self.vbitrate = int(vbitrate)
+ self.fps = int(fps)
+ self.acodec = acodec
+ self.abitrate = int(abitrate)
+ self.width = int(width)
+ self.height = int(height)
+
+ self.port = int(port)
+
+
+ self.pipe = "#transcode{vcodec=%s,vb=%d,"\
+ "fps=25.0,scale=1,acodec=mpga,"\
+ "ab=64,channels=1,width=%d,height=%d}"\
+ ":duplicate{dst=std{access=http,"\
+ "mux=mpeg1,dst=:%d}}" % (self.vcodec, self.vbitrate,\
+ self.widht, self.height,\
+ self.port)
+
+ self.sock.send("setup output0 broadcast %s\n" % self.pipe)
+ self.insert_file(self.filename)
+
+ def play(self):
+
+ print "Trying to play: %s" % self.pipe
+ self.sock.send("control output0 play\n")
+
+
+ def stop(self):
+
+ print "Trying to stop: %s" % self.pipe
+ self.sock.send("control output0 stop\n")
+
+
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/stream.conf
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/stream.conf Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,23 @@
+[Comm]
+engine = tcp
+port = 50000
+
+
+[Media]
+engine = mencoder
+
+
+[Vlc]
+path = /usr/local/bin/vlc
+host = 127.0.0.1
+port = 4212
+pwd = admin
+
+
+[FFmpeg]
+path = /usr/local/bin/ffmpeg
+
+
+[Mencoder]
+path = /usr/local/bin/mencoder
+fifo_path = /tmp/teste
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.1/tests/client.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.1/tests/client.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,51 @@
+import os
+import sys
+import time
+import socket
+
+
+if len(sys.argv) < 2:
+ HOST = 'localhost'
+ PORT = 50000
+elif len(sys.argv) == 2:
+ HOST = sys.argv[1]
+ PORT = 50000
+else:
+ HOST = sys.argv[1]
+ PORT = int(sys.argv[2])
+
+socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
+socket.settimeout(10)
+
+try:
+ socket.connect( (HOST,PORT) )
+except:
+ print "\n--> Could not connect to ('%s':'%d')\n" % (HOST,PORT)
+ sys.exit(-1)
+
+
+mplayer = os.popen("which mplayer").read().strip()
+mplayer += " -idx - -vo x11 1> /dev/null"
+pin, pout = os.popen2(mplayer)
+
+#teste = open("teste.avi", "w")
+
+data = socket.recv(4096)
+i = 0
+
+while (data != ""):
+ pin.write(data)
+ #teste.write(data)
+ data = socket.recv(4096)
+ #if (i == 500):
+ # socket.send("OK")
+ i += 1
+
+pin.close()
+socket.close()
+#teste.close()
+
+# from select import select
+# r, w, x = select([pout], []. [], 0)
+# if pout in r:
+# pout.read(32)
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/gms.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.2/gms.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+import sys
+import os
+import logging as log
+from lib.server import serve_forever, load_plugins_transcoders
+
+log_level = log.INFO
+for p in sys.argv[1:]:
+ if p == "-v" or p == "--verbose":
+ log_level -= 10
+
+log.basicConfig(level=log_level,
+ format=("### %(asctime)s %(name)-18s %(levelname)-8s "
+ "\t%(message)s"),
+ datefmt="%Y-%m-%d %H:%M:%S")
+
+pd = os.path.join("plugins", "transcoders")
+load_plugins_transcoders(pd)
+serve_forever()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/lib/server.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.2/lib/server.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,464 @@
+#!/usr/bin/env python
+
+__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
+__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.3"
+
+import os
+import threading
+import SocketServer
+import BaseHTTPServer
+import socket
+import urlparse
+import cgi
+import lib.utils as utils
+import logging as log
+
+__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
+ "load_plugins_transcoders")
+
+class Transcoder(object):
+ log = log.getLogger("gmyth-stream.transcoder")
+ priority = 0 # negative values have higher priorities
+ name = None # to be used in requests
+
+ def __init__(self, params):
+ self.params = params
+ # __init__()
+
+
+ def params_first(self, key, default=None):
+ if default is None:
+ return self.params[key][0]
+ else:
+ try:
+ return self.params[key][0]
+ except:
+ return default
+ # params_first()
+
+
+ def get_mimetype(self):
+ mux = self.params_first("mux", "mpg")
+
+ if mux == "mpeg":
+ return "video/mpeg"
+ elif mux == "avi":
+ return "video/x-msvideo"
+ else:
+ return "application/octet-stream"
+ # get_mimetype()
+
+
+ def start(self, outfile):
+ return True
+ # start()
+
+
+ def stop(self):
+ return Tru
+ # stop()
+
+
+ def __str__(self):
+ return '%s("%s", mux="%s", params=%s)' % \
+ (self.__class__.__name__,
+ self.params_first("uri", "None"),
+ self.params_first("mux", "mpg"),
+ self.params)
+ # __str__()
+# Transcoder
+
+
+
+class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ log = log.getLogger("gmyth-stream.request")
+ def_transcoder = None
+ transcoders = utils.PluginSet(Transcoder)
+
+ @classmethod
+ def load_plugins_transcoders(cls, directory):
+ cls.transcoders.load_from_directory(directory)
+
+ if cls.def_transcoder is None and cls.transcoders:
+ cls.def_transcoder = cls.transcoders[0].name
+ # load_plugins_transcoders()
+
+
+ def do_dispatch(self, body):
+ self.url = self.path
+
+ pieces = urlparse.urlparse(self.path)
+ self.path = pieces[2]
+ self.query = cgi.parse_qs(pieces[4])
+
+ if self.path == "/":
+ self.serve_main(body)
+ elif self.path == "/shutdown.do":
+ self.serve_shutdown(body)
+ elif self.path == "/stop-transcoder.do":
+ self.serve_stop_transcoder(body)
+ elif self.path == "/status.do":
+ self.serve_status(body)
+ elif self.path == "/play.do":
+ self.serve_play(body)
+ elif self.path == "/stream.do":
+ self.serve_stream(body)
+ else:
+ self.send_error(404, "File not found")
+ # do_dispatch()
+
+
+ def do_GET(self):
+ self.do_dispatch(True)
+ # do_GET()
+
+
+ def do_HEAD(self):
+ self.do_dispatch(False)
+ # do_HEAD()
+
+
+ def _nav_items(self):
+ self.wfile.write("""\
+
Play
+ Status
+ Stop transcoders
+ Shutdown Server
+""")
+ # _nav_items()
+
+
+ def serve_main(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write("""\
+
+ Catota Server
+
+Welcome to Catota Server
+
+""")
+ self._nav_items()
+ self.wfile.write("""\
+
+
+
+""")
+ # serve_main()
+
+
+ def serve_play(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write("""\
+
+ Catota Server
+
+ Play
+
+
+""")
+ self._nav_items()
+ self.wfile.write("""\
+
+
+
+""")
+ # serve_play()
+
+
+ def serve_shutdown(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write("""\
+
+ Catota Server Exited
+
+ Catota is not running anymore
+
+
+""")
+ self.server.server_close()
+ # serve_shutdown()
+
+
+ def serve_stop_all_transcoders(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.server.stop_transcoders()
+ self.wfile.write("""\
+
+ Catota Server Stopped Transcoders
+
+ Catota stopped running transcoders
+
+""")
+ self._nav_items()
+ self.wfile.write("""\
+
+
+
+ """)
+ # serve_stop_all_transcoders()
+
+
+ def serve_stop_selected_transcoders(self, body, requests):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write("""\
+
+ Catota Server Stopped Transcoders
+
+ Catota stopped running transcoders:
+
+ """)
+ transcoders = self.server.get_transcoders()
+
+ for req in requests:
+ try:
+ host, port = req.split(":")
+ except IndexError:
+ continue
+
+ port = int(port)
+ addr = (host, port)
+
+ for t, r in transcoders:
+ if r.client_address == addr:
+ t.stop()
+ self.server.del_transcoders(self, t)
+ self.wfile.write("""\
+ - %s: %s:%s
+""" % (t, addr[0], addr[1]))
+ t.stop()
+ break
+ self.wfile.write("""\
+
+
+""")
+ self._nav_items()
+ self.wfile.write("""\
+
+
+
+""")
+ # serve_stop_selected_transcoders()
+
+
+ def serve_stop_transcoder(self, body):
+ req = self.query.get("request", None)
+ if req and "all" in req:
+ self.serve_stop_all_transcoders(body)
+ elif req:
+ self.serve_stop_selected_transcoders(body, req)
+ else:
+ self.serve_status(body)
+ # serve_stop_transcoder()
+
+
+ def serve_status(self, body):
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html")
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ if body:
+ self.wfile.write("""\
+
+ Catota Server Status
+
+ Catota Status
+""")
+ tl = self.server.get_transcoders()
+ if not tl:
+ self.wfile.write("No running transcoder.
\n")
+ else:
+ self.wfile.write("Running transcoders:
\n")
+ self.wfile.write("""\
+
+ - [STOP ALL]
+""")
+ for transcoder, request in tl:
+ self.wfile.write("""\
+ - %s: %s:%s [STOP]
+""" % (transcoder, request.client_address[0], request.client_address[1],
+ request.client_address[0], request.client_address[1]))
+
+ self.wfile.write("""\
+
+
+""")
+ self._nav_items()
+ self.wfile.write("""\
+
+
+
+""")
+ # serve_status()
+
+
+ def _get_transcoder(self):
+ # get transcoder option: mencoder is the default
+ request_transcoders = self.query.get("transcoder", ["mencoder"])
+
+ for t in request_transcoders:
+ transcoder = self.transcoders.get(t)
+ if transcoder:
+ return transcoder
+
+ if not transcoder:
+ return self.transcoders[self.def_transcoder]
+ # _get_transcoder()
+
+
+ def serve_stream(self, body):
+ transcoder = self._get_transcoder()
+ try:
+ obj = transcoder(self.query)
+ except Exception, e:
+ self.send_error(500, str(e))
+ return
+
+ self.send_response(200)
+ self.send_header("Content-Type", obj.get_mimetype())
+ self.send_header('Connection', 'close')
+ self.end_headers()
+
+ if body:
+ self.server.add_transcoders(self, obj)
+ obj.start(self.wfile)
+ self.server.del_transcoders(self, obj)
+ # serve_stream()
+
+
+ def log_request(self, code='-', size='-'):
+ self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
+ # log_request()
+
+
+ def log_error(self, format, *args):
+ self.log.error("%s: %s" % (self.address_string(), format % args))
+ # log_error()
+
+
+ def log_message(self, format, *args):
+ self.log.info("%s: %s" % (self.address_string(), format % args))
+ # log_message()
+# RequestHandler
+
+
+
+class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
+ log = log.getLogger("gmyth-streamer.server")
+ run = True
+ _transcoders = {}
+ _lock = threading.RLock()
+
+ def serve_forever(self):
+ self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
+ self.socket.getsockname())
+ try:
+ while self.run:
+ self.handle_request()
+ except KeyboardInterrupt, e:
+ pass
+
+ self.log.debug("Stopping all remaining transcoders...")
+ self.stop_transcoders()
+ self.log.debug("Transcoders stopped!")
+ # serve_forever()
+
+
+ def get_request(self):
+ skt = self.socket
+ old = skt.gettimeout()
+ skt.settimeout(0.5)
+ while self.run:
+ try:
+ r = skt.accept()
+ skt.settimeout(old)
+ return r
+ except socket.timeout, e:
+ pass
+ raise socket.error("Not running")
+ # get_request()
+
+
+ def server_close(self):
+ self.run = False
+ self.stop_transcoders()
+
+ BaseHTTPServer.HTTPServer.server_close(self)
+ # server_close()
+
+
+ def stop_transcoders(self):
+ self._lock.acquire()
+ for transcoder, request in self._transcoders.iteritems():
+ self.log.info("Stop transcoder: %s, client=%s" %
+ (transcoder, request.client_address))
+ transcoder.stop()
+ self._lock.release()
+ # stop_transcoders()
+
+
+ def get_transcoders(self):
+ self._lock.acquire()
+ try:
+ return self._transcoders.items()
+ finally:
+ self._lock.release()
+ # get_transcoders()
+
+
+ def add_transcoders(self, request, transcoder):
+ self._lock.acquire()
+ try:
+ self._transcoders[transcoder] = request
+ finally:
+ self._lock.release()
+ # add_transcoders()
+
+
+ def del_transcoders(self, request, transcoder):
+ self._lock.acquire()
+ try:
+ del self._transcoders[transcoder]
+ finally:
+ self._lock.release()
+ # del_transcoders()
+# Server
+
+
+
+def serve_forever(host="0.0.0.0", port=40000):
+ addr = (host, port)
+
+ RequestHandler.protocol_version = "HTTP/1.0"
+ httpd = Server(addr, RequestHandler)
+ httpd.serve_forever()
+# serve_forever()
+
+
+def load_plugins_transcoders(directory):
+ RequestHandler.load_plugins_transcoders(directory)
+# load_plugins_transcoders()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/lib/utils.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.2/lib/utils.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,148 @@
+#!/usr/bin/env
+
+__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
+__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
+__license__ = "GPL"
+__version__ = "0.3"
+
+import os
+import stat
+import sys
+import logging
+import urllib
+import gobject
+import imp
+
+log = logging.getLogger("gmyth-stream.utils")
+
+__all__ = ("which", "load_plugins", "PluginSet")
+
+def which(app):
+ """function to implement which(1) unix command"""
+ pl = os.environ["PATH"].split(os.pathsep)
+ for p in pl:
+ path = os.path.join(p, app)
+ if os.path.isfile(path):
+ st = os.stat(path)
+ if st[stat.ST_MODE] & 0111:
+ return path
+ return ""
+# which()
+
+
+def _load_module(pathlist, name):
+ fp, path, desc = imp.find_module(name, pathlist)
+ try:
+ module = imp.load_module(name, fp, path, desc)
+ return module
+ finally:
+ if fp:
+ fp.close()
+# _load_module()
+
+
+class PluginSet(object):
+ def __init__(self, basetype, *items):
+ self.basetype = basetype
+ self.map = {}
+ self.list = []
+
+ for i in items:
+ self._add(i)
+ self._sort()
+ # __init__()
+
+
+ def _add(self, item):
+ self.map[item.name] = item
+ self.list.append(item)
+ # _add()
+
+
+ def add(self, item):
+ self._add()
+ self._sort()
+ # add()
+
+
+ def __getitem__(self, spec):
+ if isinstance(spec, basestring):
+ return self.map[spec]
+ else:
+ return self.list[spec]
+ # __getitem__()
+
+
+ def get(self, name, default=None):
+ self.map.get(name, default)
+ # get()
+
+
+ def __iter__(self):
+ return self.list.__iter__()
+ # __iter__()
+
+
+ def __len__(self):
+ return len(self.list)
+ # __len__()
+
+
+ def _sort(self):
+ self.list.sort(lambda a, b: cmp(a.priority, b.priority))
+ # _sort()
+
+
+ def update(self, pluginset):
+ self.map.update(pluginset.map)
+ self.list.extend(pluginset.list)
+ self._sort()
+ # update()
+
+
+ def load_from_directory(self, directory):
+ for i in load_plugins(directory, self.basetype):
+ self._add(i)
+ self._sort()
+ # load_from_directory()
+
+
+ def __str__(self):
+ lst = []
+ for o in self.list:
+ lst.append('"%s" (%s)' % (o.name, o.__name__))
+
+ return "%s(basetype=%s, items=[%s])" % \
+ (self.__class__.__name__,
+ self.basetype.__name__,
+ ", ".join(lst))
+ # __str__()
+# PluginSet
+
+
+def load_plugins(directory, basetype):
+ tn = basetype.__name__
+ log.debug("Loading plugins from %s, type=%s" % (directory, tn))
+
+
+ plugins = []
+ for d in os.listdir(directory):
+ if not d.endswith(".py"):
+ continue
+
+ name = d[0: -3]
+ if name == "__init__":
+ continue
+
+ directory.replace(os.path.sep, ".")
+ mod = _load_module([directory], name)
+ for sym in dir(mod):
+ cls = getattr(mod, sym)
+ if isinstance(cls, type) and issubclass(cls, basetype) and \
+ cls != basetype:
+ plugins.append(cls)
+ log.info("Loaded %s (%s) from %s" % \
+ (cls.__name__, tn, os.path.join(directory, d)))
+
+ return plugins
+# load_plugins()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/plugins/transcoders/gstreamer.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.2/plugins/transcoders/gstreamer.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,305 @@
+#vim:ts=4:sw=4:et
+import pygst
+pygst.require("0.10")
+import gst
+import gobject
+import time
+import lib.utils as utils
+import lib.server as server
+
+from threading import Thread
+
+__all__ = ("TranscoderGstreamer",)
+
+class TranscoderGstreamer(server.Transcoder):
+ gstreamer_path = utils.which("gst-launch-0.10")
+ name = "gstreamer"
+ priority = -2
+
+ # StreamListener()
+
+ class StreamData:
+ stream_count = 0
+
+ def __init__(self, log, pipe, abin, vbin, sink):
+ self.log = log
+ self.stream_count += 1
+ self.Id = self.stream_count
+ self.Pipe = pipe
+ self.Abin = abin
+ self.Vbin = vbin
+ self.Sink = sink
+ self.Loop = gobject.MainLoop()
+ self.ACaps = ""
+ self.VCaps = ""
+ self.Ready = False
+ self.Connection = None
+ self.Addr = None
+ # __init__()
+
+ # StreamData()
+
+
+ def __init__(self, params):
+ server.Transcoder.__init__(self, params)
+ # set gstreamer basic options
+ self.connection = None
+ self.addr = None
+ self.ready = False
+ self.quit = False
+
+ self.log.info("Params for Gstreamer: %s" % self.params)
+ # __init__()
+
+
+ def _create_start_elements(self, uri):
+ self.log.info("Opening Uri:" + uri)
+ src = gst.element_make_from_uri(gst.URI_SRC, uri, "src")
+ decode = gst.element_factory_make("decodebin", "decode")
+ mux = gst.element_factory_make("avimux", "mux")
+ sink = gst.element_factory_make("fdsink", "sink")
+
+ return [src, decode, mux, sink]
+ # _create_start_elements()
+
+
+ def _setup_video_encode(self, vbin, width, height):
+ vqueue = gst.element_factory_make("queue", "vqueue")
+ colorspace = gst.element_factory_make("ffmpegcolorspace", "")
+ vrate = gst.element_factory_make("videorate", "vrate")
+ vencode = gst.element_factory_make("ffenc_mpeg4", "vencode")
+ vqueue_src = gst.element_factory_make("queue", "vqueue_src")
+
+ vencode.set_property("bitrate", 200)
+
+ if None in [vbin, vqueue, vrate, vencode, vqueue_src]:
+ self.log.info("Fail to create video encode elements.")
+ return False
+
+ vbin.add(vqueue)
+ if int(width) > 0 and int(height) > 0:
+ self.log.info(("Formating output to %d / %d" %(int(width), int(height))))
+
+ vscale = gst.element_factory_make("ffvideoscale", "vscale")
+
+ vbin.add(vscale);
+ if not vqueue.link(vscale):
+ self.log.info("Fail to link video elements")
+ return False
+
+ vbin.add(colorspace)
+
+ if not vscale.link(colorspace, \
+ gst.caps_from_string("video/x-raw-yuv,width=(int)%d,height=(int)%d" %(\
+ int(width), int(height)))):
+ self.log.info("Fail to link video elements")
+ return False
+ else:
+ vbin.add(colorspace)
+ vqueue.link(colorspace)
+
+ vbin.add(vrate, vencode, vqueue_src)
+ if not colorspace.link(vrate):
+ self.log.info("Fail to colorspace with vrate")
+ return False
+
+ if not vrate.link(vencode, \
+ gst.caps_from_string("video/x-raw-yuv,framerate=(fraction)10/1")):
+ self.log.info("Fail to link vrate element")
+ return False
+
+ if not vencode.link(vqueue_src):
+ self.log.info("Fail to link video encode with queue")
+ return False
+
+ vbin.add_pad(gst.GhostPad("sink", vqueue.get_pad("sink")))
+ vbin.add_pad(gst.GhostPad("src", vqueue_src.get_pad("src")))
+
+ return True
+ # _setup_video_encode()
+
+
+ def _setup_audio_encode(self, abin):
+ aqueue = gst.element_factory_make("queue", "aqueue")
+ aconvert = gst.element_factory_make("audioconvert", "aconvert")
+ arate = gst.element_factory_make("audioresample", "arate")
+ aencode = gst.element_factory_make("queue", "aencode")
+ aqueue_src = gst.element_factory_make("queue", "aqueue_src")
+
+ if None in [abin, aqueue, arate, aencode, aqueue_src]:
+ self.log.info("Fail to create video encode elements.")
+ return False
+
+ abin.add(aqueue, aconvert, arate, aencode, aqueue_src)
+
+ if not gst.element_link_many(aqueue, aconvert, arate, aencode, aqueue_src):
+ self.log.info("Fail to link video elements")
+ return False
+
+ abin.add_pad(gst.GhostPad("sink", aqueue.get_pad("sink")))
+ abin.add_pad(gst.GhostPad("src", aqueue_src.get_pad("src")))
+
+ return True
+ # _setup_audio_encode()
+
+
+ def setup(self, uri, mux, vcodec, vbitrate,
+ fps, acodec, abitrate, width, height, options):
+
+ ## Pipelines
+ pipe = gst.Pipeline()
+ src, decode, mux, sink = self._create_start_elements(uri)
+
+ if None in [src, decode, mux, sink]:
+ self.log.info("Problems with while starting basic elements");
+ return False
+
+ #video encode
+ #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate !
+ #ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1
+ #port=5000
+
+ vbin = gst.Bin()
+ if not self._setup_video_encode(vbin, width, height):
+ return False
+
+ #audio encode
+ #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
+ #udpsink name=upd_audio host=224.0.0.1 port=5002
+
+ abin = gst.Bin()
+ if not self._setup_audio_encode(abin):
+ return False
+
+ #Finish Pipeline
+ pipe.add(src, decode, abin, vbin, mux, sink)
+ gst.element_link_many(src, decode)
+ gst.element_link_many(mux, sink)
+
+ #Linking decode with mux
+ mux_audio = mux.get_pad("audio_0")
+ mux_video = mux.get_pad("video_0")
+
+ audio_pad = abin.get_pad("src")
+ video_pad = vbin.get_pad("src")
+
+ if audio_pad.link(mux_audio) != gst.PAD_LINK_OK:
+ self.log.info("Fail to link audio with mux")
+ return False
+
+ if video_pad.link(mux_video) != gst.PAD_LINK_OK:
+ self.log.info("Fail to link audio with mux")
+ return False
+
+ self.stream_data = self.StreamData(self.log, pipe, abin, vbin, sink)
+ bus = pipe.get_bus()
+ bus.add_signal_watch()
+ bus.connect("message", self.__on_bus_message, self.stream_data)
+
+ decode.connect("new-decoded-pad", self.__on_decode_new_pad, self.stream_data)
+ decode.connect("unknown-type", self.__on_decode_unknown_type, self.stream_data)
+
+ self.log.info("Setting PIPELINE state to PAUSED")
+ pipe.set_state(gst.STATE_PAUSED)
+ self.log.info("Running Loop")
+ self.stream_data.Loop.run()
+ # setup()
+
+ def __on_bus_message(self, bus, message, stream_data):
+
+ t = message.type
+
+ if t == gst.MESSAGE_STATE_CHANGED:
+ oldstate = -1
+ newstate = -1
+ pending = -1
+
+ oldstate, newstate, pending = message.parse_state_changed()
+
+ if oldstate == gst.STATE_READY and \
+ newstate == gst.STATE_PAUSED and \
+ pending == gst.STATE_VOID_PENDING and \
+ stream_data.Ready == False:
+
+ state_changed_status, current_state, pending_state = stream_data.Pipe.get_state()
+ if current_state == gst.STATE_PAUSED and \
+ pending_state == gst.STATE_VOID_PENDING:
+ self.log.info("Pipe paused")
+ stream_data.Loop.quit()
+ stream_data.Ready = True
+
+ elif t == gst.MESSAGE_EOS:
+ self.log.info("Pipe finished")
+ stream_data.Loop.quit()
+ self.quit = True
+
+ elif t == gst.MESSAGE_ERROR:
+ err, debug = message.parse_error()
+ self.log.error("Error: %s %s" %(err, debug))
+ stream_data.Loop.quit()
+ stream_data.Ready = False
+
+ return True
+ # __on_bus_message()
+
+ def __on_decode_unknown_type(self, decode, pad, caps, stream_data):
+ self.log.info("Unknown Type")
+ return None
+ # __on_decode_unknown_type
+
+ def __on_decode_new_pad(self, decode, pad, arg1, stream_data):
+
+ caps = pad.get_caps().to_string()
+ self.log.info("New pad " + caps)
+ if caps.rfind("audio") != -1:
+ apad = stream_data.Abin.get_pad("sink")
+ if pad.link(apad) != gst.PAD_LINK_OK:
+ self.log.info("Error on link audio pad")
+ return None
+ elif caps.rfind("video") != -1:
+ vpad = stream_data.Vbin.get_pad("sink")
+ if pad.link(vpad) != gst.PAD_LINK_OK:
+ self.log.info("Error on link video pad")
+ return None
+ else:
+ self.log.info("Invalid caps")
+ self.log.info("Linked")
+ # __on_decode_new_pad
+
+
+ def start(self, outfd):
+ params_first = self.params_first
+
+ self.setup(params_first("uri", ""), params_first("mux", "avi"),
+ params_first("vcodec", "ffenc_h263p"), params_first("vbitrate", 256000),
+ params_first("fps", 25), params_first("acodec", "faac"),
+ params_first("abitrate", 192000), params_first("width", 320),
+ params_first("height", 240), params_first("options", ""))
+
+ self.log.info("Play %s", outfd.fileno())
+ self.stream_data.Sink.set_property("fd", outfd.fileno())
+ self.log.info("Setting Pipeline state to PLAYING")
+ self.stream_data.Pipe.set_state(gst.STATE_PLAYING)
+
+ # keep playing until EOS
+ self.log.info("QUIT: %s" % self.quit)
+
+ i = 0
+ loop = gobject.MainLoop()
+ loop.run()
+
+ self.log.info("quit loop")
+
+ return True
+ # start()
+
+ def stop(self):
+ self.log.info("Stop stream_data: %s" % self.stream_data)
+
+ if self.stream_data:
+ self.stream_data.Pipe.set_state(gst.STATE_NULL)
+ self.quit = True
+
+ del self.stream_data
+ self.stream_data = None
+ # stop
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/0.2/plugins/transcoders/mencoder.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gmyth-stream/server/0.2/plugins/transcoders/mencoder.py Wed Apr 18 15:59:10 2007 +0100
@@ -0,0 +1,123 @@
+import lib.utils as utils
+import lib.server as server
+import os
+import signal
+import subprocess
+
+__all__ = ("TranscoderMencoder",)
+
+class TranscoderMencoder(server.Transcoder):
+ mencoder_path = utils.which("mencoder")
+ def_mencoder_outfile = os.path.join(os.path.sep, "tmp",
+ "mencoder-fifo-%(uid)s-%(pid)s")
+ name = "mencoder"
+ priority = -1
+
+ def __init__(self, params):
+ server.Transcoder.__init__(self, params)
+ self.proc = None
+ self.args = None
+
+ vars = {"uid": os.getuid(), "pid": os.getpid()}
+ mencoder_outfile_base = self.def_mencoder_outfile % vars
+ mencoder_outfile = mencoder_outfile_base
+ i = 0
+ while os.path.exists(mencoder_outfile):
+ i += 1
+ mencoder_outfile = mencoder_outfile_base + ".%s" % i
+
+ self.mencoder_outfile = mencoder_outfile
+ os.mkfifo(self.mencoder_outfile)
+
+ args = [self.mencoder_path, "-really-quiet",
+ "-o", self.mencoder_outfile]
+
+ params_first = self.params_first
+
+ type = params_first("type")
+ location = params_first("location")
+ args.append("%s://%s" % (type, location))
+
+ mux = params_first("mux", "avi")
+ args.extend(["-of", mux])
+
+ acodec = params_first("acodec", "mp3")
+ abitrate = params_first("abitrate", "128")
+ if acodec == "mp3lame":
+ args.extend(["-oac", "mp3lame", "-lameopts",
+ "cbr:br=%s" % abitrate])
+ else:
+ args.extend(["-oac", "lavc", "-lavcopts",
+ "acodec=%s:abitrate=%s" % (acodec, abitrate)])
+
+ vcodec = params_first("vcodec", "mpeg4")
+ vbitrate = params_first("vbitrate", "400")
+ args.extend(["-ovc", "lavc", "-lavcopts",
+ "vcodec=%s:vbitrate=%s" % (vcodec, vbitrate)])
+
+ fps = params_first("fps", "24")
+ args.extend(["-ofps", fps])
+
+ width = params_first("width", "320")
+ height = params_first("height", "240")
+ args.extend(["-vf", "scale=%s:%s" % (width, height)])
+
+ self.args = args
+ # __init__()
+
+
+ def _unlink_fifo(self):
+ try:
+ os.unlink(self.mencoder_outfile)
+ except Exception, e:
+ pass
+ # _unlink_fifo()
+
+
+ def start(self, outfd):
+ cmd = " ".join(self.args)
+ self.log.info("Mencoder: %s" % cmd)
+
+ try:
+ self.proc = subprocess.Popen(self.args, close_fds=True)
+ except Exception, e:
+ self.log.error("Error executing mencoder: %s" % cmd)
+ return False
+
+ try:
+ fifo_read = open(self.mencoder_outfile)
+ except Exception, e:
+ self.log.error("Error opening fifo: %s" % cmd)
+ return False
+
+ try:
+ while self.proc and self.proc.poll() == None:
+ d = fifo_read.read(1024)
+ outfd.write(d)
+ except Exception, e:
+ self.log.error("Problems handling data: %s" % e)
+ self._unlink_fifo()
+ return False
+
+ self._unlink_fifo()
+ return True
+ # start()
+
+
+ def stop(self):
+ if self.proc:
+ try:
+ os.kill(self.proc.pid, signal.SIGTERM)
+ except OSError, e:
+ pass
+
+ try:
+ self.proc.wait()
+ except Exception, e:
+ pass
+
+ self.proc = None
+
+ self._unlink_fifo()
+ # stop()
+# TranscoderMencoder
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/lib.py
--- a/gmyth-stream/server/lib.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,36 +0,0 @@
-import time
-import logging
-import os
-import stat
-
-ext = ['mpg', 'avi', 'mp4', 'nuv', 'mpeg', 'mov']
-
-def now():
- return time.strftime("%Y-%m-%d %H:%M:%S");
-
-def log(msg):
- logging.log(logging.DEBUG, msg)
- new_msg = "[%s] %s" % (now(), msg)
- return new_msg
-
-
-bin_path_list = os.environ["PATH"].split(os.pathsep)
-def which(prg):
- for d in bin_path_list:
- path = os.path.join(d, prg)
- if os.path.exists(path):
- st = os.stat(path)
- if st[stat.ST_MODE] & 0111:
- return path
- return ""
-
-def list_media_files(directory, file_list):
- for root, dirs, files in os.walk(directory):
- for name in files:
- if os.path.splitext(name)[1].strip(".") in ext:
- media = os.path.join(root,name)
- if media not in file_list:
- file_list.append(os.path.join(root,name))
-
- return True
-
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/main.py
--- a/gmyth-stream/server/main.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,185 +0,0 @@
-#!/usr/bin/python
-
-import os
-import lib
-import sys
-import imp
-import ConfigParser
-import logging as log
-
-log.basicConfig(level=log.DEBUG,
- format="%(asctime)s %(levelname)-8s %(message)s",
- datefmt='%Y-%m-%d %H:%M:%S')
-
-config = ConfigParser.ConfigParser()
-config.read("stream.conf")
-
-def load_module(pathlist, name):
- fp, path, desc = imp.find_module(name, pathlist)
- try:
- module = imp.load_module(name, fp, path, desc)
- return module
- finally:
- if fp:
- fp.close()
-
-
-media_plugin = config.get("Media", "engine")
-media_plugin_module = load_module(["./plugins/media"], media_plugin)
-media = media_plugin_module.Media(config)
-
-comm_plugin = config.get("Comm", "engine")
-comm_plugin_module = load_module(["./plugins/comm"], comm_plugin)
-server = comm_plugin_module.Server(config)
-
-log.info("Starting GMyth-Stream server")
-
-
-'''
-PROTOCOL DESCRIPTION
-=====================
-
-COMMAND OPTIONS
-
--> SETUP DESCRIPTION
-|-> used to setup transcoding and streaming parameters
-|-> must be used before any "PLAY" command
-|-> e.g:
-
-file://file_name mux vcodec vbitrate fps acodec abitrate width height options
-dvd://title_number mux vcodec vbitrate fps acodec abitrate width height options
-
--> PLAY DESCRIPTION
- |-> used to start transcoding and streaming of file
- |-> must be used just if SETUP was used before
- |-> after it, _must_ send STOP
-
--> STOP DESCRIPTION
- |-> used to stop transcoding and streaming process
- |-> must be used just if PLAY was used before
- |-> must be used after PLAY
-
--> QUIT DESCRIPTION
- |-> used to quit the main loop (quit program)
-
-'''
-nextport = 0
-setup = (False, "STOPPED")
-
-def do_setup(server, filename, mux, vcodec, vbitrate, fps, acodec, abitrate,
- width, height, *options):
- global nextport
- global setup
-
- if setup[1] != "PLAYING":
- nextport += 1
- ret = media.setup(filename, mux, vcodec, vbitrate, fps, acodec,
- abitrate, width, height, nextport, options)
- if ret[0]:
- server.sendOk()
- else:
- server.sendNotOk(ret[1])
-
- setup = (True, setup[1])
-
- else: server.sendNotOk("You must STOP before SETingUP again")
-
- return True
-
-def do_play(server):
- global setup
-
- if setup[0] and setup[1] == "STOPPED":
- setup = (setup[0], "PLAYING")
- ret = media.play()
- if ret[0]:
- server.sendOk("%d" % nextport)
- else:
- server.sendNotOk(ret[1])
-
- else:
- if setup[1] == "STOPPED":
- server.sendNotOk("You must SETUP before PLAYing")
- else:
- server.sendNotOk("You must STOP before PLAYing again")
-
- return True
-
-def do_stop(server):
- global setup
-
- media.stop()
- setup = (False, "STOPPED")
- server.sendOk()
- return True
-
-def do_list(server, *directory):
- file_list = []
- for j in directory:
- lib.list_media_files(j, file_list)
-
- server.sendOk(file_list)
- return True
-
-def do_quit(server):
- server.finish = 1
- media.stop()
- server.sendOk()
- return True
-
-
-mapping = {
- "SETUP": do_setup,
- "PLAY": do_play,
- "STOP": do_stop,
- "LIST": do_list,
- "QUIT": do_quit,
- }
-
-
-def dispatch(server, msg):
- pieces = msg.split()
- if len(pieces) < 1:
- log.error("Invalid client command format: %r" % msg)
- server.sendNotOk("Invalid Format")
- return False
-
- cmd = pieces[0]
- f = mapping.get(cmd, None)
- if not f:
- log.error("Unknow command: %r" % msg)
- server.sendNotOk("Unknow Command")
- return False
-
- try:
- return f(server, *pieces[1:])
- except Exception, e:
- log.error("Could not execute %r: %s" % (msg, e))
- server.sendNotOk(str(e))
- return False
-
-
-
-while not server.finish:
- conn, client, port = server.getRequest()
- if nextport == 0:
- nextport = port
-
- while not server.finish:
- msg = server.getMsg()
- if not msg:
- break
-
- log.info("Client %s sent command: %r" % (client, msg))
- dispatch(server, msg)
-
- log.info("Closing connection with %s" % (client,))
- server.disconnect_client(conn)
- try:
- os.wait()
- except Exception, e:
- log.error(e)
-
-server.stop()
-log.info("Server stopped. Closing...")
-
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/comm/tcp.py
--- a/gmyth-stream/server/plugins/comm/tcp.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,79 +0,0 @@
-import lib
-import time
-import socket
-import logging as log
-
-class Server(object):
-
- def __init__(self, config):
- self.host = '0.0.0.0'
- self.port = int(config.get("Comm", "port"))
- self.finish = 0
-
- addr = (self.host, self.port)
- log.debug("Setup TCP server at %s:%s" % addr)
- self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.tcp.bind(addr)
- self.tcp.listen(1)
- log.info("TCP server listening at %s:%s (sock=%d)" %
- (self.host, self.port, self.tcp.fileno()))
-
- def getMsg(self):
- bytes = []
- try:
- while 1:
- c = self.con.recv(1)
- bytes.append(c)
- if not c or c == "\n":
- break
- except Exception, e:
- log.error("Error reading message from client: %s" % e)
- return None
-
- if not bytes or bytes[-1] != "\n":
- msg = "".join(bytes)
- log.error("Invalid message from client: %r" % msg)
- return None
-
- # remove \n and \r
- bytes.pop()
- if bytes[-1] == "\r":
- bytes.pop()
-
- msg = "".join(bytes)
- log.debug("RECV: %r" % msg)
- return msg
-
- def sendMsg(self, msg):
- log.debug("SEND: %r" % msg)
- self.con.send(msg + "\n")
-
- def sendOk(self, payload=None):
- self.sendMsg("OK %d" % bool(payload is not None))
- if payload is not None:
- if not isinstance(payload, (tuple, list)):
- payload = (payload,)
- for e in payload:
- self.sendMsg("+%s" % e)
- self.sendMsg(".")
-
- def sendNotOk(self, reason=""):
- self.sendMsg("NOTOK %r" % reason)
-
- def getRequest(self):
- log.debug("Wait for client request at %s:%s (sock=%d)" %
- (self.host, self.port, self.tcp.fileno()))
- self.con, self.client = self.tcp.accept()
- log.info("Incoming request from %s (con=%s)" %
- (self.client, self.con.fileno()))
- return (self.con, self.client, self.port)
-
- def disconnect_client(self, connection):
- log.info("Closed request from %s (con=%s)" %
- (self.client, self.con.fileno()))
- connection.close()
-
- def stop(self):
- log.debug("Stop")
- self.tcp.close()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/comm/xmlrpc.py
--- a/gmyth-stream/server/plugins/comm/xmlrpc.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,102 +0,0 @@
-import lib
-import SimpleXMLRPCServer
-
-
-class Handler:
-
- def __init__(self, recv_pool, send_pool):
- self.recv_pool = recv_pool
- self.send_pool = send_pool
- self.getMsg = self.sendMsg
-
- def _listMethods(self):
- return ['setup', 'play', 'stop', 'close', 'getMsg']
-
- def _methodHelp(self, method):
-
- if method == 'setup':
- return "Setup the Media: setup( filename, mux, vcodec, vbitrate,"\
- " fps, acodec, abitrate, width, height, port, options"
- elif method == 'play':
- return "Play the Media: play()"
- elif method == 'stop':
- return "Stop the Media: stop()"
- elif method == 'close':
- return "Close the connection: close()"
- elif method == 'getMsg':
- return "Return the first message in the pool: getMsg()"
- else:
- # By convention, return empty
- # string if no help is available
- return ""
-
- def setup(self, filename, mux, vcodec, vbitrate,\
- fps, acodec, abitrate, width, height, port, options):
-
- msg = "%s %s %s %s %s %s %s" % (filename, mux, vcodec, vbitrate,\
- fps, acodec, abitrate, width, height, port)
-
- if len(options) > 0:
- for opt in options:
- msg += " %s" % opt
-
- self.recv_pool.append("SETUP")
- self.recv_pool.append(msg)
- return self.sendMsg()
-
- def play(self):
- self.recv_pool.append("PLAY")
- return self.sendMsg()
-
- def stop(self):
- self.recv_pool.append("STOP")
- return self.sendMsg()
-
- def close(self):
- self.recv_pool.append("CLOSE")
- return self.sendMsg()
-
- def sendMsg(self):
- if self.send_pool != []:
- return self.send_pool.pop(0)
- else:
- return ""
-
-
-class Server:
-
- def __init__(self, config):
- self.host = 'localhost'
- self.port = int(config.get("Comm", "port"))
- self.finish = 0
- self.recv_pool = []
- self.send_pool = []
-
- self.handler = Handler(self.recv_pool, self.send_pool)
-
- self.xmlrpc = SimpleXMLRPCServer.SimpleXMLRPCServer((self.host, self.port))
- self.xmlrpc.register_instance(self.handler)
-
-
- def getMsg(self, size):
- if self.recv_pool != []:
- return self.recv_pool.pop(0)
- else:
- return ""
-
- def sendMsg(self, msg):
- self.send_pool.append(msg)
-
- def Ack(self, command):
- msg = "[%s] Command %s received" % (lib.now(), command)
- self.sendMsg(msg + "\n")
-
- def getRequest(self):
- self.xmlrpc.handle_request()
- return (0, "RPC Client")
-
- def disconnect_client(self, connection):
- connection = 0
-
- def stop(self):
- self.xmlrpc.server_close()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/ffmpeg.py
--- a/gmyth-stream/server/plugins/media/ffmpeg.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,91 +0,0 @@
-import os
-import sys
-import lib
-import time
-import socket
-import ConfigParser
-
-class Media:
-
- def __init__(self, config):
-
- self.config = config
- self.socket = None
- self.child_pid = None
-
- def setup(self, filename, mux, vcodec, vbitrate,\
- fps, acodec, abitrate, width, height, port):
-
- self.filename = filename
- self.mux = mux
- self.vcodec = vcodec
- self.vbitrate = int(vbitrate)
- self.fps = int(fps)
- self.acodec = acodec
- self.abitrate = int(abitrate)
- self.width = int(width)
- self.height = int(height)
-
- self.port = int(port)
-
- # good one: /tmp/mpg/cpm.mpg mpeg mpeg1video 400 25 mp2 192 320 240 5000
- self.path = self.config.get("FFmpeg", "path")
- self.path += " -i %s -f %s -vcodec %s -b %d -r %d -acodec %s -ab %d -s %dx%d -" % (
- self.filename, self.mux, self.vcodec, self.vbitrate,\
- self.fps, self.acodec, self.abitrate, self.width, self.height)
-
- if (self.socket != None):
- del(self.socket)
-
- self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.bind( ('', self.port) )
- self.socket.settimeout(10)
- self.socket.listen(1)
-
- def play(self):
-
- lib.log("Starting FFmpeg: %s" % self.path)
-
- # exec FFmpeg and get stdout
- child_stdin, child_stdout = os.popen2(self.path)
- child_stdin.close()
-
- self.child_pid = os.fork()
- if (self.child_pid == 0):
- #child
-
- conn,addr= self.socket.accept()
- lib.log("Sending Data to client: %s" % addr[0])
- data = child_stdout.read(1024)
- conn.settimeout(5)
- retry = 0
-
- while( data != "" and retry < 5):
- try:
- conn.send(data)
- except socket.error:
- lib.log("Socket error (maybe timeout ?)")
- retry = retry + 1
-
- data = child_stdout.read(1024)
-
- if (retry < 5):
- lib.log("Finished sending Data to client: %s" % addr[0])
- else:
- lib.log("Client timed out")
-
- child_stdout.close()
- #conn.close()
- #sys.exit()
-
-
- def stop(self):
-
- if (self.socket != None):
- lib.log("Closing socket")
- self.socket.close()
-
- lib.log("Trying to stop FFmpeg process")
- if (self.child_pid != None):
- os.kill(self.child_pid, 9)
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/gstreamer-rtp.py
--- a/gmyth-stream/server/plugins/media/gstreamer-rtp.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,218 +0,0 @@
-import pygst
-pygst.require("0.10")
-import gst
-import gobject
-
-class Media:
- class StreamData:
- stream_count = 0
-
- def __init__ (self, pipe, abin, vbin):
-
- self.stream_count += 1
- self.Id = self.stream_count
- self.Pipe = pipe
- self.Abin = abin
- self.Vbin = vbin
- self.Loop = gobject.MainLoop()
- self.ACaps = ""
- self.VCaps = ""
- self.Ready = False
-
-
- def __init__(self, config):
- # set gstreamer basic options
- self.config = config
- self.pipe = None
- self.streams = []
-
-
- def setup(self, filename, mux, vcodec, vbitrate,
- fps, acodec, abitrate, width, height, port, options):
-
- ## Pipelines
- self.pipe = gst.Pipeline ()
- uri = "file://" + filename
- print "Opening Uri:" + uri
- src = gst.element_make_from_uri (gst.URI_SRC, uri, "src")
- if (src is None):
- return None
-
- decode = gst.element_factory_make ("decodebin", "decode")
- if (decode is None):
- return None
-
-
- #video encode
- #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000
- vbin = gst.Bin ()
- vqueue = gst.element_factory_make ("queue", "vqueue")
- vscale = gst.element_factory_make ("videoscale", "vscale")
- vrate = gst.element_factory_make ("videorate", "vrate")
- vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode")
- vpay = gst.element_factory_make ("rtpmp4vpay", "vpay")
- vsink = gst.element_factory_make ("udpsink", "vsink")
-
- if (None in [vbin, vqueue, vscale, vrate, vencode, vpay, vsink]):
- print "Fail to create video encode elements."
- return None
-
- vscale_pad = vscale.get_pad("sink")
- if (vscale_pad is None):
- print "Fail to get vscale sink pad."
- return None
-
- vscale_caps = gst.caps_from_string ("video/x-raw-yuv, width=%s, height=%s" % (width, height))
- if (vscale_caps is None):
- print "Fail to create video caps"
- return None
-
- if (not vscale_pad.set_caps (vscale_caps)):
- print "Fail to set video output caps"
- return None
-
- vencode.set_property ("bitrate", 256000)
- vencode.set_property ("me-method", 2)
-
- vsink.set_property ("host", "224.0.0.1")
- vsink.set_property ("port", 5000)
-
- vbin.add (vqueue, vscale, vrate, vencode, vpay, vsink)
- if (not gst.element_link_many (vqueue, vscale, vrate, vencode, vpay, vsink)):
- print "Fail to link video elements"
- return None
-
- vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink")))
-
- #audio encode
- #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002
- abin = gst.Bin ()
- aqueue = gst.element_factory_make ("queue", "vqueue")
- aconvert = gst.element_factory_make ("audioconvert", "aconvert")
- aencode = gst.element_factory_make ("faac", "aencode")
- apay = gst.element_factory_make ("rtpmp4gpay", "apay")
- asink = gst.element_factory_make ("udpsink", "asink")
-
- if (None in [abin, aqueue, aconvert, aencode, apay, asink]):
- print "Fail to create video encode elements."
- return None
-
- asink.set_property ("host", "224.0.0.1")
- asink.set_property ("port", 5002)
-
- abin.add (aqueue, aconvert, aencode, apay, asink)
- if (not gst.element_link_many (aqueue, aconvert, aencode, apay, asink)):
- print "Fail to link video elements"
- return None
-
- abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink")))
-
- self.pipe.add (src, decode, abin, vbin)
- gst.element_link_many (src, decode)
-
- stream_data = self.StreamData (self.pipe, abin, vbin)
-
- bus = self.pipe.get_bus()
- bus.add_signal_watch()
- bus.connect("message", self.__on_bus_message, stream_data)
-
- decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data)
- decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data)
-
-
- self.pipe.set_state (gst.STATE_PAUSED)
- print "Running Pipe"
- stream_data.Loop.run ()
- print "End run"
-
- a_caps = stream_data.ACaps
- v_caps = stream_data.VCaps
- stream_id = stream_data.Id
-
- self.streams.append (stream_data)
-
- def play(self):
-
- print "Trying to play pipeline: %s" % self.pipe
- try:
- if (self.pipe):
- self.pipe.set_state(gst.STATE_PLAYING)
- except gobject.GError, e:
- print "Error: " + str(e)
-
-
- def stop(self):
-
- print "Trying to stop pipeline: %s" % self.pipe
- try:
- if (self.pipeline):
- self.pipeline.set_state(gst.STATE_NULL)
- except gobject.GError, e:
- print "Error: " + str(e)
-
- def __on_bus_message (self, bus, message, stream_data):
-
- t = message.type
- if (t == gst.MESSAGE_STATE_CHANGED):
- oldstate = -1
- newstate = -1
- pending = -1
- oldstate, newstate, pending = message.parse_state_changed ()
- if ((oldstate == gst.STATE_READY) and \
- (newstate == gst.STATE_PAUSED) and \
- (pending == gst.STATE_VOID_PENDING) and \
- (stream_data.Ready == False)):
- state_changed_status, current_state, pending_state = stream_data.Pipe.get_state ()
- if ((current_state == gst.STATE_PAUSED) and \
- (pending_state == gst.STATE_VOID_PENDING)):
- print "Pipe paused"
- self.__fill_sink_pads (stream_data)
- stream_data.Loop.quit ()
- stream_data.Ready = True
- elif (t == gst.MESSAGE_ERROR):
- err, debug = message.parse_error()
- print "Error: %s" % err, debug
- stream_data.Loop.quit ()
- stream_data.Ready = False
-
- return True
-
-
- def __fill_sink_pads (self, stream_data):
-
- asink = stream_data.Abin.get_by_name ("asink")
- vsink = stream_data.Vbin.get_by_name ("vsink")
-
- asink_pad = asink.get_pad ("sink")
- stream_data.ACaps = asink_pad.get_negotiated_caps().to_string()
- print "ACAPS " + stream_data.ACaps
-
- vsink_pad = vsink.get_pad ("sink")
- stream_data.VCaps = vsink_pad.get_negotiated_caps().to_string()
- print "ACAPS " + stream_data.VCaps
-
-
-
- def __on_decode_unknown_type (self, decode, pad, caps, stream_data):
-
- print "Unknown Type"
- return None
-
- def __on_decode_new_pad (self, decode, pad, arg1, stream_data):
-
- caps = pad.get_caps().to_string()
- print "New pad " + caps
- if (caps.rfind ("audio") != -1):
- apad = stream_data.Abin.get_pad ("sink")
- if (pad.link (apad) != gst.PAD_LINK_OK):
- print "Error on link audio pad"
- return None
- elif (caps.rfind ("video") != -1):
- vpad = stream_data.Vbin.get_pad ("sink")
- if (pad.link (vpad) != gst.PAD_LINK_OK):
- print "Error on link video pad"
- return None
- else:
- print "Invalid caps"
-
-
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/gstreamer.py
--- a/gmyth-stream/server/plugins/media/gstreamer.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,290 +0,0 @@
-#vim:ts=4:sw=4:et
-import pygst
-pygst.require("0.10")
-import gst
-import gobject
-import socket
-import time
-from threading import Thread
-
-class Media:
- class StreamListener(Thread):
- def __init__ (self, stream_data):
- Thread.__init__(self)
- self.stream = stream_data
- print "Thread Created"
-
- def run (self):
- #Create socket
- print "Waiting connection"
- self.stream.Socket.listen(1)
- self.stream.Connection, self.stream.Addr = self.stream.Socket.accept ()
- print "Connection requested"
- self.stream.Sink.set_property ("fd", self.stream.Connection.fileno())
- self.stream.Pipe.set_state(gst.STATE_PLAYING)
- print "PLAYING"
-
-
- class StreamData:
- stream_count = 0
-
- def __init__ (self, pipe, abin, vbin, sink):
- self.stream_count += 1
- self.Id = self.stream_count
- self.Pipe = pipe
- self.Abin = abin
- self.Vbin = vbin
- self.Sink = sink
- self.Loop = gobject.MainLoop()
- self.ACaps = ""
- self.VCaps = ""
- self.Ready = False
- self.Socket = None
- self.Connection = None
- self.Addr = None
-
- def __init__(self, config):
- # set gstreamer basic options
- self.config = config
- self.streams = []
- self.socket = None
- self.connection = None
- self.addr = None
- self.ready = False
- self.current = None
-
-
- def setup(self, uri, mux, vcodec, vbitrate,
- fps, acodec, abitrate, width, height, port, options):
-
- ## Pipelines
- pipe = gst.Pipeline ()
- print "Opening Uri:" + uri
- src = gst.element_make_from_uri (gst.URI_SRC, uri, "src")
- #src = gst.element_factory_make ("gnomevfssrc", "src")
- src.set_property ("location", uri)
- if (src is None):
- print "Fail to create src element"
- return None
-
- print ("Create source")
- decode = gst.element_factory_make ("decodebin", "decode")
- if (decode is None):
- print "Fail to create decodebin"
- return None
-
- print ("Create source")
- mux = gst.element_factory_make ("avimux", "mux")
- if (mux is None):
- print "Fail to create mux"
- return None
-
- sink = gst.element_factory_make ("fdsink", "sink")
- if (sink is None):
- print "Fail to create fdsink"
- return None
-
- print ("Create source")
-
- #video encode
- #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1 port=5000
- vbin = gst.Bin ()
- vqueue = gst.element_factory_make ("queue", "vqueue")
- colorspace = gst.element_factory_make ("ffmpegcolorspace", "")
- vrate = gst.element_factory_make ("videorate", "vrate")
- vencode = gst.element_factory_make ("ffenc_mpeg4", "vencode")
- #vencode = gst.element_factory_make ("ffenc_msmpeg4v1", "vencode")
- vqueue_src = gst.element_factory_make ("queue", "vqueue_src")
-
- #if (int(vbitrate) > 0):
- vencode.set_property ("bitrate", 200)
- #vencode.set_property ("quant-type", 1)
- vencode.set_property ("pass", 2)
- vencode.set_property ("quantizer", 5)
- #vencode.set_property ("me-method", 1)
-
-
- if (None in [vbin, vqueue, vrate, vencode, vqueue_src]):
- print "Fail to create video encode elements."
- return None
-
- vbin.add (vqueue)
- if ((int(width) > 0) and (int(height) > 0)):
- print ("formating output to %d / %d" % (int(width), int(height)))
-
- vscale = gst.element_factory_make ("ffvideoscale", "vscale")
-
- vbin.add (vscale);
- if (not vqueue.link (vscale)):
- print "Fail to link video elements"
- return None
-
- vbin.add (colorspace)
-
- if (not vscale.link (colorspace, \
- gst.caps_from_string ("video/x-raw-yuv,width=(int)%d,height=(int)%d" % (int(width), int(height))))):
- print "Fail to link video elements"
- return None
- else:
- vbin.add (colorspace)
- vqueue.link (colorspace)
-
- vbin.add (vrate, vencode, vqueue_src)
- if (not colorspace.link (vrate)):
- print "Fail to colorspace with vrate"
- return None
-
-
- if (not vrate.link (vencode, \
- gst.caps_from_string ("video/x-raw-yuv,framerate=(fraction)10/1"))):
- print "Fail to link vrate element"
- return None
-
- if (not vencode.link (vqueue_src)):
- print "Fail to link video encode with queue"
- return None
-
- vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink")))
- vbin.add_pad (gst.GhostPad ("src", vqueue_src.get_pad ("src")))
-
- #audio encode
- #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay ! udpsink name=upd_audio host=224.0.0.1 port=5002
- abin = gst.Bin ()
- aqueue = gst.element_factory_make ("queue", "aqueue")
- aconvert = gst.element_factory_make ("audioconvert", "aconvert")
- arate = gst.element_factory_make ("audioresample", "arate")
- #aencode = gst.element_factory_make ("ffenc_ac3", "aencode")
- aencode = gst.element_factory_make ("queue", "aencode")
- #aencode = gst.element_factory_make ("lame", "aencode")
- #aencode = gst.element_factory_make ("ffenc_mp2", "aencode")
- aqueue_src = gst.element_factory_make ("queue", "aqueue_src")
-
- if (None in [abin, aqueue, arate, aencode, aqueue_src]):
- print "Fail to create video encode elements."
- return None
-
- abin.add (aqueue, aconvert, arate, aencode, aqueue_src)
- if (not gst.element_link_many (aqueue, aconvert, arate, aencode, aqueue_src)):
- print "Fail to link video elements"
- return None
-
- abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink")))
- abin.add_pad (gst.GhostPad ("src", aqueue_src.get_pad ("src")))
-
- #Finish Pipeline
- pipe.add (src, decode, abin, vbin, mux, sink)
- gst.element_link_many (src, decode)
- gst.element_link_many (mux, sink)
-
- #Linking decode with mux
- mux_audio = mux.get_pad ("audio_0")
- mux_video = mux.get_pad ("video_0")
-
- audio_pad = abin.get_pad ("src")
- video_pad = vbin.get_pad ("src")
-
- if (audio_pad.link (mux_audio) != gst.PAD_LINK_OK):
- print "Fail to link audio with mux"
- return None
-
- if (video_pad.link (mux_video) != gst.PAD_LINK_OK):
- print "Fail to link audio with mux"
- return None
-
- stream_data = self.StreamData (pipe, abin, vbin, sink)
- bus = pipe.get_bus()
- bus.add_signal_watch()
- bus.connect ("message", self.__on_bus_message, stream_data)
-
- decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data)
- decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data)
-
- print ("Create source")
- pipe.set_state (gst.STATE_PAUSED)
- print "Running Pipe"
- stream_data.Loop.run ()
- print "End run"
-
-
- #Create socket
- stream_data.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- print "Bind on port %d" % port
- stream_data.Socket.bind(('', int (port)))
- self.streams.append (stream_data)
- return (True, "")
-
- def play(self):
- print "Play"
- stream = self.streams[0]
- self.current = self.StreamListener(stream)
- self.current.start ()
- time.sleep (1)
- return (True, "")
-
- def stop(self):
- self.current.join ()
- self.current = None
- stream = self.streams[0]
- stream.Pipe.set_state(gst.STATE_NULL)
- del (stream.Pipe)
- stream.Pipe = None
- stream.Abin = None
- stream.Vbin = None
- stream.Sink = None
- if (stream.Connection != None):
- stream.Connection.close ()
-
- self.streams = []
- time.sleep (5)
- return (True, "")
-
-
- def __on_bus_message (self, bus, message, stream_data):
-
- t = message.type
- if (t == gst.MESSAGE_STATE_CHANGED):
- oldstate = -1
- newstate = -1
- pending = -1
- oldstate, newstate, pending = message.parse_state_changed ()
- if ((oldstate == gst.STATE_READY) and \
- (newstate == gst.STATE_PAUSED) and \
- (pending == gst.STATE_VOID_PENDING) and \
- (stream_data.Ready == False)):
- state_changed_status, current_state, pending_state = stream_data.Pipe.get_state ()
- if ((current_state == gst.STATE_PAUSED) and \
- (pending_state == gst.STATE_VOID_PENDING)):
- print "Pipe paused"
- stream_data.Loop.quit ()
- stream_data.Ready = True
- elif (t == gst.MESSAGE_ERROR):
- err, debug = message.parse_error()
- print "Error: %s" % err, debug
- stream_data.Loop.quit ()
- stream_data.Ready = False
-
- return True
-
- def __on_decode_unknown_type (self, decode, pad, caps, stream_data):
-
- print "Unknown Type"
- return None
-
- def __on_decode_new_pad (self, decode, pad, arg1, stream_data):
-
- caps = pad.get_caps().to_string()
- print "New pad " + caps
- if (caps.rfind ("audio") != -1):
- apad = stream_data.Abin.get_pad ("sink")
- if (pad.link (apad) != gst.PAD_LINK_OK):
- print "Error on link audio pad"
- return None
- elif (caps.rfind ("video") != -1):
- vpad = stream_data.Vbin.get_pad ("sink")
- if (pad.link (vpad) != gst.PAD_LINK_OK):
- print "Error on link video pad"
- return None
- else:
- print "Invalid caps"
- print "Linked"
-
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/mencoder.py
--- a/gmyth-stream/server/plugins/media/mencoder.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,411 +0,0 @@
-from __future__ import division
-
-import os
-import sys
-import lib
-import time
-import shlex
-import signal
-import socket
-import ConfigParser
-import logging as log
-
-from select import *
-from subprocess import *
-
-class Media(object):
-
- def __init__(self, config):
-
- self.config = config
- self.do_cleanup()
-
- # __init__()
-
-
- def do_cleanup(self):
- self.path = ""
- self.args = []
- self.language = None
- self.subtitle = None
- self.mpegopts = None
- self.socket = None
- self.child_pid = None
- self.mplayer = None
- self.mencoder_pid = None
- self.mplayer_pid = None
- self.audio_opts = None
- self.video_opts = None
- self.gst_pipe = None
- self.gst_pid = None
- self.transcode_local = None
-
- # do_cleanup()
-
-
- def setup_opts(self, options):
-
- for opt in options:
-
- if opt == "local":
- self.mplayer = lib.which("mplayer")
-
- elif opt.find("language=") >= 0:
- try:
- lan = opt.split("=")[1]
- if len(lan) < 2:
- self.language = lan
- except Exception, e:
- log.error("Bad language option: %s" % opt)
-
- elif opt.find("subtitle=") >= 0:
- try:
- sub = opt.split("=")[1]
- if len(sub) < 2:
- self.language = sub
- except Exception, e:
- log.error("Bad subtitle option: %s" % opt)
-
- elif opt.find("format=") >= 0:
- try:
- self.mpegopts = opt.split("=")[1]
- except Exception, e:
- log.error("Bad format option: %s" % opt)
-
- elif opt.find("outfile=") >= 0:
- try:
- self.transcode_local = opt.split("=")[1]
- except Exception, e:
- log.error("Bad outfile option: %s" % opt)
-
- # setup_opts()
-
-
- def run_mplayer(self):
- msg = self.filename
-
- if self.kind == "dvd":
- msg = "dvd://" + msg
-
- self.mplayer_pid = Popen([self.mplayer, self.filename, "1> %s" % os.devnull,\
- "2> %s" % os.devnull], stdout=PIPE, close_fds=True)
-
- # run_mplayer()
-
-
- def setup_mencoder(self):
- self.path = self.config.get("Mencoder", "path")
- mp = Popen([self.path], stdout=PIPE, close_fds=True)
-
- version = mp.stdout.read().split("MEncoder ")[1].split(" (C)")[0].split("-")[-1]
-
- if version > "4.1.1": self.mencoder_old = False
- else: self.mencoder_old = True
-
- os.kill(mp.pid, signal.SIGKILL)
- log.info("Mencoder version: %s" % version)
-
- if self.mencoder_old:
- try:
- self.fifo = self.config.get("Mencoder", "fifo_path")
- os.mkfifo(self.fifo)
- except Exception, e:
- log.info("Fifo: %s" % e)
- else:
- self.fifo = "-"
-
- # setup_mencoder()
-
-
- def setup_audio(self):
-
- if self.acodec == "mp3lame":
- return "-oac mp3lame -lameopts cbr:br=%s vol=5" % self.abitrate
- else:
- return "-oac lavc -lavcopts acodec=%s:abitrate=%s" % (\
- self.acodec, self.abitrate)
-
- # setup_audio()
-
-
- def setup_video(self):
-
- video = ""
-
- video += " -of %s" % self.mux
- video += " -ofps %s" % self.fps
-
- if self.vcodec == "nuv" or self.vcodec == "xvid"\
- or self.vcodec == "qtvideo" or self.vcodec == "copy":
- video += " -ovc %s" % self.vcodec
- else:
- video += " -ovc lavc -lavcopts vcodec=%s:vbitrate=%s" % (
- self.vcodec, self.vbitrate)
-
- if self.mux == "mpeg" and self.mpegopts is not None:
- video += " -mpegopts format=%s" % self.mpegopts
-
- video += " -vf scale=%s:%s" % (self.width, self.height)
-
- return video
-
- # setup_video()
-
-
- def arg_append(self, args, options):
- l = shlex.split(options)
- for i in l:
- args.append(i)
-
- # arg_append()
-
-
- def setup_args(self, args):
-
- args.append(self.path)
-
- #args.append(self.filename)
- args.append("-")
-
- if self.language != None:
- self.arg_append(args, "-alang %s" % self.language)
-
- if self.subtitle != None:
- self.arg_append(args, "-slang %s" % self.subtitle)
- self.arg_append(args, "-subfps %s" % self.fps)
-
- self.arg_append(args, "-idx")
- self.arg_append(args, self.audio_opts)
- self.arg_append(args, self.video_opts)
-
- self.arg_append(args, "-really-quiet")
- self.arg_append(args, "-o %s" % self.fifo)
- self.arg_append(args, "2> %s" % os.devnull)
-
- # setup_args()
-
-
- def setup_filename(self, filename):
- try:
- self.kind, self.filename = filename.split("://")
- except:
- return (False, "Wrong filename protocol")
-
- if self.kind == "file":
- if not os.path.exists(self.filename):
- msg = "File requested does not exist. SETUP failed."
- log.error(msg)
- return (False, msg)
-
- elif self.kind == "dvd":
- self.filename = "dvd://" + filename
-
- elif self.kind == "myth":
- self.filename = filename
- self.gst_pipe = os.pipe()
- print self.gst_pipe[0]
- print self.gst_pipe[1]
-
- return (True, "")
-
- # setup_filename()
-
-
- def setup_socket(self):
- if self.socket != None:
- self.socket = None
-
- self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- try:
- self.socket.bind( ('', self.port) )
- self.socket.listen(1)
- except Exception, e:
- log.error("Could not create socket: %s" % e)
- return (False, e)
-
- return (True, "")
-
- # setup_socket()
-
-
- '''
- MENCODER SETUP DESCRIPTION
- ===========================
-
- -> mux, vcodecs and acodecs
- |-> mencoder (-of | -ovc | -oac) help
-
- -> if used mpeg as mux:
- |-> to setup format: format=%s as an option at the end
-
- '''
-
-
- # good one: /tmp/dvb.mpg avi mpeg4 400 25 mp3lame 192 320 240
- # file:///tmp/dvb.mpg mpeg mpeg1video 400 25 mp2 192 320 240 format=mpeg1
- # dvd://4 mpeg mpeg1video 400 25 mp3lame 192 400 240 language=en local
- # file:///tmp/mpg/bad_day.mpg avi mpeg4 400 25 mp3 192 320 240
-
- def setup(self, filename, mux, vcodec, vbitrate,\
- fps, acodec, abitrate, width, height, port, options):
-
- if self.args != []:
- self.do_cleanup()
-
- self.mux = mux
- self.vcodec = vcodec
- self.vbitrate = vbitrate
- self.fps = fps
- self.acodec = acodec
- self.abitrate = abitrate
- self.width = width
- self.height = height
- self.port = int(port)
-
- self.setup_mencoder()
-
- ret_val = self.setup_filename(filename)
-
- if not ret_val[0]:
- return ret_val
-
- self.setup_opts(options)
- self.audio_opts = self.setup_audio()
- self.video_opts = self.setup_video()
- self.setup_args(self.args)
-
- ret_val = self.setup_socket()
- return ret_val
-
- # setup()
-
- def play_loop(self, conn):
- data = self.pout.read(4096)
-
- conn.settimeout(5)
- retry = 0
-
- if not self.transcode_local:
- while data != "" and retry < 5:
- try:
- conn.send(data)
- r, w, x = select([conn], [], [], 0)
- if conn in r:
- back = conn.recv(1024)
- if back == "OK" and self.mplayer and not self.mplayer_pid:
- self.run_mplayer()
-
- except socket.error, e:
- log.error("Socket error: %s" % e)
- retry += 1
-
- data = self.pout.read(4096)
-
- else:
- local = open(self.transcode_local, "w")
- total = os.path.getsize(self.filename)
- partial = 4096
-
- while data != "":
- try:
- local.write(data)
- except Exception, e:
- log.error("Write error: %s" % e)
-
- data = self.pout.read(4096)
- partial += len(data)
- conn.send("%.2f\n" % (partial * 100 / total) )
-
- local.close()
- conn.send("DONE\n")
-
- return retry
-
- # play_loop()
-
-
- def play(self):
-
- if self.gst_pipe:
- try:
- gst = [ lib.which("gst-launch-0.10"), "--gst-debug-level=0" ]
- self.arg_append(gst, "mythtvsrc location=%s" % self.filename)
- self.arg_append(gst, "! fdsink fd=2")
- self.gst_pid = Popen(gst, close_fds=True)
- log.info("Running Gstreamer: %s" % gst);
- except Exception, e:
- msg = "Could not init Gstreamer: %s" % e
- log.error(msg)
- return (False, msg)
-
-
- log.info("Starting Mencoder: %s" % self.args )
- try:
- if not self.gst_pipe:
- self.stdin = open(self.filename)
- else:
- self.stdin = self.gst_pid.stdout
-
- self.mencoder_pid = Popen(self.args, stdin=self.stdin, stdout=PIPE, close_fds=True)
- except Exception, e:
- msg = "Could not init Mencoder: %s" % e
- log.error(msg)
- return (False, msg)
-
- if self.mencoder_old: self.pout = open(self.fifo)
- else: self.pout = self.mencoder_pid.stdout
-
- self.child_pid = os.fork()
-
- if self.child_pid == 0:
- conn, addr = self.socket.accept()
-
- log.info("Sending Data to client: %s" % addr[0])
- retry = self.play_loop(conn)
-
- if retry < 5:
- log.info("Finished sending Data to client: %s" % addr[0])
- else:
- log.error("Client timed out, retried more than %s times" % retry)
-
- os.kill(self.mencoder_pid.pid, signal.SIGKILL)
- sys.exit(0)
-
- return (True, "")
-
- # play()
-
-
- def stop(self):
- try:
-
- if self.mencoder_pid:
- os.kill(self.mencoder_pid.pid, signal.SIGTERM)
- self.mencoder_pid = None
-
- if self.mplayer_pid:
- os.kill(self.mplayer_pid.pid, signal.SIGTERM)
- self.mplayer_pid = None
-
- if self.socket:
- self.socket.close()
- self.socket = None
-
- if self.child_pid:
- os.kill(self.child_pid, signal.SIGTERM)
- self.child_pid = None
-
- if self.gst_pid:
- os.kill(self.gst_pid.pid, signal.SIGTERM)
- self.gst_pid = None
-
- self.do_cleanup()
-
- os.wait()
-
- except Exception, e:
- log.error("Stop error: %s" % e)
-
- # stop()
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/plugins/media/vlc.py
--- a/gmyth-stream/server/plugins/media/vlc.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,80 +0,0 @@
-import os
-import sys
-import time
-import socket
-import ConfigParser
-
-class Media:
-
- def __init__(self, config):
-
- self.config = config
- self.pipe = ""
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- self.path = config.get("Vlc", "path")
- self.host = config.get("Vlc", "host")
- self.port = int(config.get("Vlc", "port"))
- self.pwd = config.get("Vlc", "pwd")
-
- # exec VLC
- pid = os.fork()
- if (pid == 0):
- #child
- print "ESTOU EM CHILD"
- self.path += " -I telnet -d 1> /dev/null 2> /dev/null &"
- os.system(self.path)
- sys.exit(0)
- else:
- print "ESTOU EM PARENT 1"
- time.sleep(3)
- print "ESTOU EM PARENT 2"
- self.sock.connect( (self.host, self.port) )
- self.sock.send("%s\n" % self.pwd)
-
-
- def insert_file(self, filename):
-
- self.sock.send("setup output0 input %s\n" % filename)
-
-
-
- def setup(self, filename, mux, vcodec, vbitrate,\
- fps, acodec, abitrate, width, height, port):
-
- self.filename = filename
- self.mux = mux
- self.vcodec = vcodec
- self.vbitrate = int(vbitrate)
- self.fps = int(fps)
- self.acodec = acodec
- self.abitrate = int(abitrate)
- self.width = int(width)
- self.height = int(height)
-
- self.port = int(port)
-
-
- self.pipe = "#transcode{vcodec=%s,vb=%d,"\
- "fps=25.0,scale=1,acodec=mpga,"\
- "ab=64,channels=1,width=%d,height=%d}"\
- ":duplicate{dst=std{access=http,"\
- "mux=mpeg1,dst=:%d}}" % (self.vcodec, self.vbitrate,\
- self.widht, self.height,\
- self.port)
-
- self.sock.send("setup output0 broadcast %s\n" % self.pipe)
- self.insert_file(self.filename)
-
- def play(self):
-
- print "Trying to play: %s" % self.pipe
- self.sock.send("control output0 play\n")
-
-
- def stop(self):
-
- print "Trying to stop: %s" % self.pipe
- self.sock.send("control output0 stop\n")
-
-
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/stream.conf
--- a/gmyth-stream/server/stream.conf Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,23 +0,0 @@
-[Comm]
-engine = tcp
-port = 50000
-
-
-[Media]
-engine = mencoder
-
-
-[Vlc]
-path = /usr/local/bin/vlc
-host = 127.0.0.1
-port = 4212
-pwd = admin
-
-
-[FFmpeg]
-path = /usr/local/bin/ffmpeg
-
-
-[Mencoder]
-path = /usr/local/bin/mencoder
-fifo_path = /tmp/teste
diff -r 1b897f699097 -r ed34b1dab103 gmyth-stream/server/tests/client.py
--- a/gmyth-stream/server/tests/client.py Wed Apr 18 15:47:40 2007 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,51 +0,0 @@
-import os
-import sys
-import time
-import socket
-
-
-if len(sys.argv) < 2:
- HOST = 'localhost'
- PORT = 50000
-elif len(sys.argv) == 2:
- HOST = sys.argv[1]
- PORT = 50000
-else:
- HOST = sys.argv[1]
- PORT = int(sys.argv[2])
-
-socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
-socket.settimeout(10)
-
-try:
- socket.connect( (HOST,PORT) )
-except:
- print "\n--> Could not connect to ('%s':'%d')\n" % (HOST,PORT)
- sys.exit(-1)
-
-
-mplayer = os.popen("which mplayer").read().strip()
-mplayer += " -idx - -vo x11 1> /dev/null"
-pin, pout = os.popen2(mplayer)
-
-#teste = open("teste.avi", "w")
-
-data = socket.recv(4096)
-i = 0
-
-while (data != ""):
- pin.write(data)
- #teste.write(data)
- data = socket.recv(4096)
- #if (i == 500):
- # socket.send("OK")
- i += 1
-
-pin.close()
-socket.close()
-#teste.close()
-
-# from select import select
-# r, w, x = select([pout], []. [], 0)
-# if pout in r:
-# pout.read(32)