gmyth-stream/server/0.2/plugins/transcoders/gstreamer.py
author rosfran
Wed Apr 18 23:13:26 2007 +0100 (2007-04-18)
branchtrunk
changeset 568 f5ef83bbe8b5
child 570 00ecee7b9625
permissions -rw-r--r--
[svn r573] New function to get the information about the program info path directiory.
     1 #vim:ts=4:sw=4:et
     2 import pygst
     3 pygst.require("0.10")
     4 import gst
     5 import gobject
     6 import time
     7 import lib.utils as utils
     8 import lib.server as server
     9 
    10 from threading import Thread
    11 
    12 __all__ = ("TranscoderGstreamer",)
    13 
    14 class TranscoderGstreamer(server.Transcoder):
    15     gstreamer_path = utils.which("gst-launch-0.10")
    16     name = "gstreamer"
    17     priority = -2
    18 
    19     # StreamListener()
    20 
    21     class StreamData:
    22         stream_count = 0
    23 
    24         def __init__(self, log, pipe, abin, vbin, sink):
    25             self.log = log
    26             self.stream_count += 1
    27             self.Id = self.stream_count
    28             self.Pipe = pipe
    29             self.Abin = abin
    30             self.Vbin = vbin
    31             self.Sink = sink
    32             self.Loop = gobject.MainLoop()
    33             self.ACaps = ""
    34             self.VCaps = ""
    35             self.Ready = False
    36             self.Connection = None
    37             self.Addr = None
    38         # __init__()
    39 
    40     # StreamData()
    41 
    42 
    43     def __init__(self, params):
    44         server.Transcoder.__init__(self, params)
    45         # set gstreamer basic options
    46         self.connection = None
    47         self.addr = None
    48         self.ready = False
    49         self.quit = False
    50 
    51         self.log.info("Params for Gstreamer: %s" % self.params)
    52     # __init__()
    53 
    54 
    55     def _create_start_elements(self, uri):
    56         self.log.info("Opening Uri:" + uri)
    57         src = gst.element_make_from_uri(gst.URI_SRC, uri, "src")
    58         decode = gst.element_factory_make("decodebin", "decode")
    59         mux = gst.element_factory_make("avimux", "mux")
    60         sink = gst.element_factory_make("fdsink", "sink")
    61 
    62         return [src, decode, mux, sink]
    63     # _create_start_elements()
    64 
    65 
    66     def _setup_video_encode(self, vbin, width, height):
    67         vqueue = gst.element_factory_make("queue", "vqueue")
    68         colorspace = gst.element_factory_make("ffmpegcolorspace", "")
    69         vrate = gst.element_factory_make("videorate", "vrate")
    70         vencode = gst.element_factory_make("ffenc_mpeg4", "vencode")
    71         vqueue_src = gst.element_factory_make("queue", "vqueue_src")
    72 
    73         vencode.set_property("bitrate", 200)
    74 
    75         if None in [vbin, vqueue, vrate, vencode, vqueue_src]:
    76             self.log.info("Fail to create video encode elements.")
    77             return False
    78 
    79         vbin.add(vqueue)
    80         if int(width) > 0 and int(height) > 0:
    81             self.log.info(("Formating output to %d / %d" %(int(width), int(height))))
    82 
    83             vscale = gst.element_factory_make("ffvideoscale", "vscale")
    84 
    85             vbin.add(vscale);
    86             if not vqueue.link(vscale):
    87                 self.log.info("Fail to link video elements")
    88                 return False
    89 
    90             vbin.add(colorspace)
    91 
    92             if not vscale.link(colorspace, \
    93                 gst.caps_from_string("video/x-raw-yuv,width=(int)%d,height=(int)%d" %(\
    94                 int(width), int(height)))):
    95                 self.log.info("Fail to link video elements")
    96                 return False
    97         else:
    98             vbin.add(colorspace)
    99             vqueue.link(colorspace)
   100 
   101         vbin.add(vrate, vencode, vqueue_src)
   102         if not colorspace.link(vrate):
   103             self.log.info("Fail to colorspace with vrate")
   104             return False
   105 
   106         if not vrate.link(vencode, \
   107             gst.caps_from_string("video/x-raw-yuv,framerate=(fraction)10/1")):
   108             self.log.info("Fail to link vrate element")
   109             return False
   110 
   111         if not vencode.link(vqueue_src):
   112             self.log.info("Fail to link video encode with queue")
   113             return False
   114 
   115         vbin.add_pad(gst.GhostPad("sink", vqueue.get_pad("sink")))
   116         vbin.add_pad(gst.GhostPad("src", vqueue_src.get_pad("src")))
   117 
   118         return True
   119     # _setup_video_encode()
   120 
   121 
   122     def _setup_audio_encode(self, abin):
   123         aqueue = gst.element_factory_make("queue", "aqueue")
   124         aconvert = gst.element_factory_make("audioconvert", "aconvert")
   125         arate = gst.element_factory_make("audioresample", "arate")
   126         aencode = gst.element_factory_make("queue", "aencode")
   127         aqueue_src = gst.element_factory_make("queue", "aqueue_src")
   128 
   129         if None in [abin, aqueue, arate, aencode, aqueue_src]:
   130             self.log.info("Fail to create video encode elements.")
   131             return False
   132 
   133         abin.add(aqueue, aconvert, arate, aencode, aqueue_src)
   134 
   135         if not gst.element_link_many(aqueue,  aconvert, arate, aencode, aqueue_src):
   136             self.log.info("Fail to link video elements")
   137             return False
   138 
   139         abin.add_pad(gst.GhostPad("sink", aqueue.get_pad("sink")))
   140         abin.add_pad(gst.GhostPad("src", aqueue_src.get_pad("src")))
   141 
   142         return True
   143     # _setup_audio_encode()
   144 
   145 
   146     def setup(self, uri, mux, vcodec, vbitrate,
   147               fps, acodec, abitrate, width, height, options):
   148 
   149         ## Pipelines
   150         pipe = gst.Pipeline()
   151         src, decode, mux, sink = self._create_start_elements(uri)
   152 
   153         if None in [src, decode, mux, sink]:
   154             self.log.info("Problems with while starting basic elements");
   155             return False
   156 
   157         #video encode
   158         #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate !
   159         #ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink  host=224.0.0.1
   160         #port=5000
   161 
   162         vbin = gst.Bin()
   163         if not self._setup_video_encode(vbin, width, height):
   164             return False
   165 
   166         #audio encode
   167         #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !
   168         #udpsink name=upd_audio host=224.0.0.1 port=5002
   169 
   170         abin = gst.Bin()
   171         if not self._setup_audio_encode(abin):
   172             return False
   173 
   174         #Finish Pipeline
   175         pipe.add(src, decode, abin, vbin, mux, sink)
   176         gst.element_link_many(src, decode)
   177         gst.element_link_many(mux, sink)
   178 
   179         #Linking decode with mux
   180         mux_audio = mux.get_pad("audio_0")
   181         mux_video = mux.get_pad("video_0")
   182 
   183         audio_pad = abin.get_pad("src")
   184         video_pad = vbin.get_pad("src")
   185 
   186         if audio_pad.link(mux_audio) != gst.PAD_LINK_OK:
   187             self.log.info("Fail to link audio with mux")
   188             return False
   189 
   190         if video_pad.link(mux_video) != gst.PAD_LINK_OK:
   191             self.log.info("Fail to link audio with mux")
   192             return False
   193 
   194         self.stream_data = self.StreamData(self.log, pipe, abin, vbin, sink)
   195         bus = pipe.get_bus()
   196         bus.add_signal_watch()
   197         bus.connect("message", self.__on_bus_message, self.stream_data)
   198 
   199         decode.connect("new-decoded-pad", self.__on_decode_new_pad, self.stream_data)
   200         decode.connect("unknown-type", self.__on_decode_unknown_type, self.stream_data)
   201 
   202         self.log.info("Setting PIPELINE state to PAUSED")
   203         pipe.set_state(gst.STATE_PAUSED)
   204         self.log.info("Running Loop")
   205         self.stream_data.Loop.run()
   206     # setup()
   207 
   208     def __on_bus_message(self, bus, message, stream_data):
   209 
   210         t = message.type
   211 
   212         if t == gst.MESSAGE_STATE_CHANGED:
   213             oldstate = -1
   214             newstate = -1
   215             pending = -1
   216 
   217             oldstate, newstate, pending = message.parse_state_changed()
   218 
   219             if oldstate == gst.STATE_READY and \
   220                newstate == gst.STATE_PAUSED and \
   221                pending == gst.STATE_VOID_PENDING and \
   222                stream_data.Ready == False:
   223 
   224                 state_changed_status, current_state, pending_state = stream_data.Pipe.get_state()
   225                 if current_state == gst.STATE_PAUSED and \
   226                     pending_state == gst.STATE_VOID_PENDING:
   227                     self.log.info("Pipe paused")
   228                     stream_data.Loop.quit()
   229                     stream_data.Ready = True
   230 
   231         elif t == gst.MESSAGE_EOS:
   232             self.log.info("Pipe finished")
   233             stream_data.Loop.quit()
   234             self.quit = True
   235 
   236         elif t == gst.MESSAGE_ERROR:
   237             err, debug = message.parse_error()
   238             self.log.error("Error: %s %s" %(err, debug))
   239             stream_data.Loop.quit()
   240             stream_data.Ready = False
   241 
   242         return True
   243     # __on_bus_message()
   244 
   245     def __on_decode_unknown_type(self, decode, pad, caps, stream_data):
   246         self.log.info("Unknown Type")
   247         return None
   248     # __on_decode_unknown_type
   249 
   250     def __on_decode_new_pad(self, decode, pad, arg1, stream_data):
   251 
   252         caps = pad.get_caps().to_string()
   253         self.log.info("New pad " + caps)
   254         if caps.rfind("audio") != -1:
   255             apad = stream_data.Abin.get_pad("sink")
   256             if pad.link(apad) != gst.PAD_LINK_OK:
   257                 self.log.info("Error on link audio pad")
   258                 return None
   259         elif caps.rfind("video") != -1:
   260             vpad = stream_data.Vbin.get_pad("sink")
   261             if pad.link(vpad) != gst.PAD_LINK_OK:
   262                 self.log.info("Error on link video pad")
   263                 return None
   264         else:
   265             self.log.info("Invalid caps")
   266         self.log.info("Linked")
   267     # __on_decode_new_pad
   268 
   269 
   270     def start(self, outfd):
   271         params_first = self.params_first
   272 
   273         self.setup(params_first("uri", ""), params_first("mux", "avi"),
   274                    params_first("vcodec", "ffenc_h263p"), params_first("vbitrate", 256000),
   275                    params_first("fps", 25),  params_first("acodec", "faac"),
   276                    params_first("abitrate", 192000),  params_first("width", 320),
   277                    params_first("height", 240), params_first("options", ""))
   278 
   279         self.log.info("Play %s", outfd.fileno())
   280         self.stream_data.Sink.set_property("fd", outfd.fileno())
   281         self.log.info("Setting Pipeline state to PLAYING")
   282         self.stream_data.Pipe.set_state(gst.STATE_PLAYING)
   283 
   284         # keep playing until EOS
   285         self.log.info("QUIT: %s" % self.quit)
   286 
   287         i = 0
   288         loop = gobject.MainLoop()
   289         loop.run()
   290 
   291         self.log.info("quit loop")
   292 
   293         return True
   294     # start()
   295 
   296     def stop(self):
   297         self.log.info("Stop stream_data: %s" % self.stream_data)
   298 
   299         if self.stream_data:
   300             self.stream_data.Pipe.set_state(gst.STATE_NULL)
   301             self.quit = True
   302 
   303         del self.stream_data
   304         self.stream_data = None
   305     # stop