1.1 --- a/gmyth-stream/server/0.2/plugins/transcoders/gstreamer.py Tue May 15 22:14:04 2007 +0100
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,324 +0,0 @@
1.4 -#vim:ts=4:sw=4:et
1.5 -import pygst
1.6 -pygst.require("0.10")
1.7 -import gst
1.8 -import gobject
1.9 -import time
1.10 -import lib.utils as utils
1.11 -import lib.server as server
1.12 -
1.13 -from threading import Thread
1.14 -
1.15 -__all__ = ("TranscoderGstreamer",)
1.16 -
1.17 -class TranscoderGstreamer(server.Transcoder):
1.18 - gstreamer_path = utils.which("gst-launch-0.10")
1.19 - name = "gstreamer"
1.20 - priority = -2
1.21 -
1.22 - # StreamListener()
1.23 -
1.24 - class StreamData:
1.25 - stream_count = 0
1.26 -
1.27 - def __init__(self, log, pipe, abin, vbin, sink):
1.28 - self.log = log
1.29 - self.stream_count += 1
1.30 - self.Id = self.stream_count
1.31 - self.Pipe = pipe
1.32 - self.Abin = abin
1.33 - self.Vbin = vbin
1.34 - self.Sink = sink
1.35 - self.Loop = gobject.MainLoop()
1.36 - self.Ready = False
1.37 - # __init__()
1.38 -
1.39 - # StreamData()
1.40 -
1.41 -
1.42 - def __init__(self, params):
1.43 - server.Transcoder.__init__(self, params)
1.44 - gobject.threads_init ()
1.45 - # set gstreamer basic options
1.46 - self.connection = None
1.47 - self.addr = None
1.48 - self.ready = False
1.49 - self.log.debug("Params for Gstreamer: %s" % self.params)
1.50 - # __init__()
1.51 -
1.52 -
1.53 - def _create_start_elements(self, uri):
1.54 - self.log.debug("Opening Uri:" + uri)
1.55 - src = gst.element_make_from_uri(gst.URI_SRC, uri, "src")
1.56 - decode = gst.element_factory_make("decodebin", "decode")
1.57 - mux = gst.element_factory_make("ffmux_mpeg", "mux")
1.58 - sink = gst.element_factory_make("fdsink", "sink")
1.59 -
1.60 - return [src, decode, mux, sink]
1.61 - # _create_start_elements()
1.62 -
1.63 -
1.64 - def _setup_video_encode(self, vbin, width, height):
1.65 - vqueue = gst.element_factory_make("queue", "vqueue")
1.66 - colorspace = gst.element_factory_make("ffmpegcolorspace", "")
1.67 - vrate = gst.element_factory_make("videorate", "vrate")
1.68 - #vencode = gst.element_factory_make("ffenc_mpeg4", "vencode")
1.69 - vencode = gst.element_factory_make("ffenc_mpeg1video", "vencode")
1.70 - vqueue_src = gst.element_factory_make("queue", "vqueue_src")
1.71 -
1.72 - vencode.set_property("bitrate", 200)
1.73 - vencode.set_property ("pass", 2)
1.74 - vencode.set_property ("quantizer", 5)
1.75 -
1.76 - if None in [vbin, vqueue, vrate, vencode, vqueue_src]:
1.77 - self.log.error("Fail to create video encode elements.")
1.78 - return False
1.79 -
1.80 - vbin.add(vqueue)
1.81 - if int(width) > 0 and int(height) > 0:
1.82 - self.log.debug("Formating output to %d / %d" % ( int(width), int(height)))
1.83 -
1.84 - vscale = gst.element_factory_make("ffvideoscale", "vscale")
1.85 -
1.86 - vbin.add(vscale);
1.87 - if not vqueue.link(vscale):
1.88 - self.log.error("Fail to link video elements")
1.89 - return False
1.90 -
1.91 - vbin.add(colorspace)
1.92 -
1.93 - if not vscale.link(colorspace, \
1.94 - gst.caps_from_string("video/x-raw-yuv,width=(int)%d,height=(int)%d" %(\
1.95 - int(width), int(height)))):
1.96 - self.log.error("Fail to link video elements")
1.97 - return False
1.98 - else:
1.99 - vbin.add(colorspace)
1.100 - vqueue.link(colorspace)
1.101 -
1.102 - vbin.add(vrate, vencode, vqueue_src)
1.103 - if not colorspace.link(vrate):
1.104 - self.log.error("Fail to colorspace with vrate")
1.105 - return False
1.106 -
1.107 - if not vrate.link(vencode, \
1.108 - gst.caps_from_string("video/x-raw-yuv,framerate=(fraction)10/1")):
1.109 - self.log.error("Fail to link vrate element")
1.110 - return False
1.111 -
1.112 - if not vencode.link(vqueue_src):
1.113 - self.log.error("Fail to link video encode with queue")
1.114 - return False
1.115 -
1.116 - vbin.add_pad(gst.GhostPad("sink", vqueue.get_pad("sink")))
1.117 - vbin.add_pad(gst.GhostPad("src", vqueue_src.get_pad("src")))
1.118 -
1.119 - return True
1.120 - # _setup_video_encode()
1.121 -
1.122 -
1.123 - def _setup_audio_encode(self, abin):
1.124 - aqueue = gst.element_factory_make("queue", "aqueue")
1.125 - aconvert = gst.element_factory_make("audioconvert", "aconvert")
1.126 - #aencode = gst.element_factory_make("queue", "aencode")
1.127 - #aencode = gst.element_factory_make("ffenc_mp2", "aencode")
1.128 - aencode = gst.element_factory_make("lame", "aencode")
1.129 - aqueue_src = gst.element_factory_make("queue", "aqueue_src")
1.130 -
1.131 - if None in [abin, aqueue, aencode, aqueue_src]:
1.132 - self.log.error("Fail to create video encode elements.")
1.133 - return False
1.134 -
1.135 - #aencode.set_property ("bitrate", 32)
1.136 - #aencode.set_property ("vbr-quality", 2)
1.137 -
1.138 - abin.add(aqueue, aconvert, aencode, aqueue_src)
1.139 -
1.140 - self.log.debug("Link queue -> aconvert")
1.141 - if not aqueue.link (aconvert):
1.142 - self.log.error("Fail to link queue video")
1.143 - return False
1.144 -
1.145 - self.log.debug("Link aconvert -> aencode")
1.146 - if not aconvert.link (aencode):
1.147 - self.log.error("Fail to link video elements")
1.148 - return False
1.149 -
1.150 - self.log.debug("Link aencode -> aqueue_src")
1.151 - if not aencode.link (aqueue_src):
1.152 - self.log.error("Fail to link aencode -> aqueue_src")
1.153 - return False
1.154 -
1.155 - self.log.debug("Link:OK")
1.156 -
1.157 - ghost_sink = gst.GhostPad("sink", aqueue.get_pad("sink"))
1.158 - ghost_src = gst.GhostPad("src", aqueue_src.get_pad("src"))
1.159 - #ghost_src.set_caps (gst.caps_from_string ("audio/mpeg,mpegversion=(int)1,layer=(int)3,rate=(int)32000"))
1.160 - abin.add_pad(ghost_sink)
1.161 - abin.add_pad(ghost_src)
1.162 -
1.163 - return True
1.164 - # _setup_audio_encode()
1.165 -
1.166 -
1.167 - def setup(self, uri, mux, vcodec, vbitrate,
1.168 - fps, acodec, abitrate, width, height, options):
1.169 -
1.170 - ## Pipelines
1.171 - pipe = gst.Pipeline()
1.172 - src, decode, mux, sink = self._create_start_elements(uri)
1.173 -
1.174 - if None in [src, decode, mux, sink]:
1.175 - self.log.info("Problems with while starting basic elements");
1.176 - return False
1.177 -
1.178 - #video encode
1.179 - #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate !
1.180 - #ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink host=224.0.0.1
1.181 - #port=5000
1.182 -
1.183 - vbin = gst.Bin()
1.184 - if not self._setup_video_encode(vbin, width, height):
1.185 - return False
1.186 -
1.187 - #audio encode
1.188 - #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
1.189 - #udpsink name=upd_audio host=224.0.0.1 port=5002
1.190 -
1.191 - abin = gst.Bin()
1.192 - if not self._setup_audio_encode(abin):
1.193 - return False
1.194 -
1.195 - #Finish Pipeline
1.196 - pipe.add(src, decode, abin, vbin, mux, sink)
1.197 - gst.element_link_many(src, decode)
1.198 - gst.element_link_many(mux, sink)
1.199 -
1.200 - #Linking decode with mux
1.201 - mux_audio = mux.get_pad("audio_0")
1.202 - mux_video = mux.get_pad("video_0")
1.203 -
1.204 - audio_pad = abin.get_pad("src")
1.205 - video_pad = vbin.get_pad("src")
1.206 -
1.207 - if audio_pad.link(mux_audio) != gst.PAD_LINK_OK:
1.208 - self.log.error("Fail to link audio with mux")
1.209 - return False
1.210 -
1.211 - if video_pad.link(mux_video) != gst.PAD_LINK_OK:
1.212 - self.log.error("Fail to link audio with mux")
1.213 - return False
1.214 -
1.215 - self.stream_data = self.StreamData(self.log, pipe, abin, vbin, sink)
1.216 - bus = pipe.get_bus()
1.217 - bus.add_signal_watch()
1.218 - bus.connect("message", self.__on_bus_message, self.stream_data)
1.219 -
1.220 - decode.connect("new-decoded-pad", self.__on_decode_new_pad, self.stream_data)
1.221 - decode.connect("unknown-type", self.__on_decode_unknown_type, self.stream_data)
1.222 -
1.223 - self.log.info("Setting PIPELINE state to PAUSED")
1.224 - pipe.set_state(gst.STATE_PAUSED)
1.225 - self.log.info("Running Loop")
1.226 - self.stream_data.Loop.run()
1.227 - # setup()
1.228 -
1.229 - def __on_bus_message(self, bus, message, stream_data):
1.230 -
1.231 - t = message.type
1.232 - self.log.debug("__on_bus_message")
1.233 -
1.234 - if t == gst.MESSAGE_STATE_CHANGED:
1.235 - oldstate = -1
1.236 - newstate = -1
1.237 - pending = -1
1.238 -
1.239 - oldstate, newstate, pending = message.parse_state_changed()
1.240 -
1.241 - if oldstate == gst.STATE_READY and \
1.242 - newstate == gst.STATE_PAUSED and \
1.243 - pending == gst.STATE_VOID_PENDING and \
1.244 - stream_data.Ready == False:
1.245 -
1.246 - state_changed_status, current_state, pending_state = stream_data.Pipe.get_state()
1.247 - if current_state == gst.STATE_PAUSED and \
1.248 - pending_state == gst.STATE_VOID_PENDING:
1.249 - self.log.info("Pipe paused")
1.250 - stream_data.Loop.quit()
1.251 - stream_data.Ready = True
1.252 -
1.253 - elif t == gst.MESSAGE_EOS:
1.254 - self.log.info("Pipe finished")
1.255 - if stream_data.Ready:
1.256 - self.stop()
1.257 - else:
1.258 - stream_data.Loop.quit()
1.259 -
1.260 - elif t == gst.MESSAGE_ERROR:
1.261 - err, debug = message.parse_error()
1.262 - self.log.error("Error: %s %s" %(err, debug))
1.263 - if stream_data.Ready:
1.264 - self.stop()
1.265 - else:
1.266 - stream_data.Loop.quit()
1.267 -
1.268 - return True
1.269 - # __on_bus_message()
1.270 -
1.271 - def __on_decode_unknown_type(self, decode, pad, caps, stream_data):
1.272 - self.log.info("Unknown Type")
1.273 - return None
1.274 - # __on_decode_unknown_type
1.275 -
1.276 - def __on_decode_new_pad(self, decode, pad, arg1, stream_data):
1.277 -
1.278 - caps = pad.get_caps().to_string()
1.279 - self.log.debug("New pad " + caps)
1.280 - if caps.rfind("audio") != -1:
1.281 - apad = stream_data.Abin.get_pad("sink")
1.282 - if pad.link(apad) != gst.PAD_LINK_OK:
1.283 - self.log.error("Error on link audio pad")
1.284 - return None
1.285 - elif caps.rfind("video") != -1:
1.286 - vpad = stream_data.Vbin.get_pad("sink")
1.287 - if pad.link(vpad) != gst.PAD_LINK_OK:
1.288 - self.log.error("Error on link video pad")
1.289 - return None
1.290 - else:
1.291 - self.log.error("Invalid caps")
1.292 - self.log.debug("Linked")
1.293 - # __on_decode_new_pad
1.294 -
1.295 -
1.296 - def start(self, outfd):
1.297 - params_first = self.params_first
1.298 -
1.299 - uri = '%s://%s' % (params_first("uri_prefix", ""), params_first("uri_path", ""))
1.300 - self.setup(uri, params_first("mux", "avi"),
1.301 - params_first("vcodec", "ffenc_h263p"), params_first("vbitrate", 256000),
1.302 - params_first("fps", 25), params_first("acodec", "faac"),
1.303 - params_first("abitrate", 192000), params_first("width", 320),
1.304 - params_first("height", 240), params_first("options", ""))
1.305 -
1.306 - self.log.debug("Play %s", outfd.fileno())
1.307 - self.stream_data.Sink.set_property("fd", outfd.fileno())
1.308 - self.log.info("Setting Pipeline state to PLAYING")
1.309 - self.stream_data.Pipe.set_state(gst.STATE_PLAYING)
1.310 -
1.311 - # keep playing until EOS
1.312 - self.stream_data.Loop.run()
1.313 - return True
1.314 - # start()
1.315 -
1.316 - def stop(self):
1.317 - self.log.info("Stop stream_data: %s" % self.stream_data)
1.318 -
1.319 - if self.stream_data:
1.320 - self.stream_data.Pipe.set_state(gst.STATE_NULL)
1.321 -
1.322 - self.stream_data.Ready = False
1.323 - self.stream_data.Loop.quit ()
1.324 - del self.stream_data
1.325 - self.stream_data = None
1.326 - time.sleep (2)
1.327 - # stop