gmyth-stream/server/plugins/media/gstreamer.py
author rosfran
Mon Apr 09 15:20:37 2007 +0100 (2007-04-09)
branchtrunk
changeset 513 12a61040588d
child 521 605cbc483e09
permissions -rw-r--r--
[svn r518] Newly added GMythFileLocal, to deal with local stream files.
     1 import pygst
     2 pygst.require("0.10")
     3 import gst
     4 import gobject
     5 import socket
     6 
     7 class Media:
     8     class StreamData:
     9         stream_count = 0
    10 		
    11         def __init__ (self, pipe, abin, vbin, sink):
    12 
    13 	    self.stream_count += 1
    14 	    self.Id = self.stream_count
    15 	    self.Pipe = pipe
    16 	    self.Abin = abin
    17 	    self.Vbin = vbin
    18             self.Sink = sink
    19 	    self.Loop = gobject.MainLoop()
    20 	    self.ACaps = ""
    21 	    self.VCaps = ""
    22 	    self.Ready = False
    23 
    24 
    25     def __init__(self, config):
    26         # set gstreamer basic options
    27         self.config = config
    28         self.pipe = None
    29         self.streams = []
    30 	self.socket = None
    31 	self.connection = None
    32 	self.addr = None
    33 
    34 
    35     def setup(self, filename, mux, vcodec, vbitrate,
    36               fps, acodec, abitrate, width, height, port, options):
    37 
    38         ## Pipelines
    39         self.pipe = gst.Pipeline ()
    40         uri = "file://" + filename
    41         print "Opening Uri:" + uri
    42         src = gst.element_make_from_uri (gst.URI_SRC, uri, "src")
    43         if (src is None):
    44             return None
    45         
    46         decode = gst.element_factory_make ("decodebin", "decode")
    47         if (decode is None):
    48             return None
    49 
    50 	mux = gst.element_factory_make ("avimux", "mux")
    51         if (mux is None):
    52             return None
    53 
    54         sink = gst.element_factory_make ("fdsink", "sink")
    55         if (sink is None):
    56             return None
    57 
    58         #Create socket
    59         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    60 	self.socket.bind(('', int (port)))
    61         
    62         #video encode 
    63         #queue ! videoscale ! video/x-raw-yuv,width=240,height=144 ! videorate ! ffenc_h263p bitrate=256000 me-method=2 ! rtph263ppay ! udpsink  host=224.0.0.1 port=5000
    64         vbin = gst.Bin ()
    65         vqueue = gst.element_factory_make ("queue", "vqueue")
    66         vscale = gst.element_factory_make ("videoscale", "vscale")
    67         vrate = gst.element_factory_make ("videorate", "vrate")
    68         vencode = gst.element_factory_make ("ffenc_h263p", "vencode")
    69 
    70         if (None in [vbin, vqueue, vscale, vrate, vencode]):
    71             print "Fail to create video encode elements."
    72             return None
    73 
    74         vscale_pad = vscale.get_pad("sink")
    75         if (vscale_pad is None):
    76             print "Fail to get vscale sink pad."
    77             return None
    78 
    79         vscale_caps = gst.caps_from_string ("video/x-raw-yuv, width=%s, height=%s" % (width, height))
    80         if (vscale_caps is None):
    81             print "Fail to create video caps"
    82             return None
    83 
    84         if (not vscale_pad.set_caps (vscale_caps)):
    85             print "Fail to set video output caps"
    86             return None
    87         
    88         vbin.add (vqueue, vscale, vrate, vencode)
    89         if (not gst.element_link_many (vqueue,  vscale, vrate, vencode)):
    90             print "Fail to link video elements"
    91             return None
    92         
    93         vbin.add_pad (gst.GhostPad ("sink", vqueue.get_pad ("sink")))
    94 	vbin.add_pad (gst.GhostPad ("src", vencode.get_pad ("src")))
    95 
    96         #audio encode
    97         #audio/x-raw-int ! queue ! audioconvert ! faac ! rtpmp4gpay !  udpsink name=upd_audio host=224.0.0.1 port=5002
    98         abin = gst.Bin ()
    99         aqueue = gst.element_factory_make ("queue", "vqueue")
   100         aconvert = gst.element_factory_make ("audioconvert", "aconvert")
   101         aencode = gst.element_factory_make ("ffenc_ac3", "aencode")
   102 
   103         if (None in [abin, aqueue, aconvert, aencode]):
   104             print "Fail to create video encode elements."
   105             return None
   106 
   107         abin.add (aqueue, aconvert, aencode)
   108         if (not gst.element_link_many (aqueue, aconvert, aencode)):
   109             print "Fail to link video elements"
   110             return None
   111         
   112         abin.add_pad (gst.GhostPad ("sink", aqueue.get_pad ("sink")))
   113         abin.add_pad (gst.GhostPad ("src", aencode.get_pad ("src")))
   114 
   115         #Finish Pipeline
   116 
   117 	self.pipe.add (src, decode, abin, vbin, mux, sink)
   118 	gst.element_link_many (src, decode)
   119         gst.element_link_many (mux, sink)
   120 
   121         #Linking decode with mux
   122         mux_audio = mux.get_pad ("audio_0")
   123         mux_video = mux.get_pad ("video_0")
   124 
   125 	audio_pad = abin.get_pad ("src")
   126         video_pad = vbin.get_pad ("src")
   127 
   128         if (audio_pad.link (mux_audio) != gst.PAD_LINK_OK):
   129             print "Fail to link audio with mux"
   130             return None
   131  
   132         if (video_pad.link (mux_video) != gst.PAD_LINK_OK):
   133             print "Fail to link audio with mux"
   134             return None
   135 
   136 	stream_data = self.StreamData (self.pipe, abin, vbin, sink)
   137 
   138 	bus = self.pipe.get_bus()
   139 	bus.add_signal_watch()
   140 	bus.connect("message", self.__on_bus_message, stream_data)
   141 	
   142 	decode.connect("new-decoded-pad", self.__on_decode_new_pad, stream_data)
   143 	decode.connect("unknown-type", self.__on_decode_unknown_type, stream_data)
   144 
   145 	
   146 	self.pipe.set_state (gst.STATE_PAUSED)
   147         print "Running Pipe"
   148 	stream_data.Loop.run ()
   149         print "End run"
   150 
   151 	a_caps = stream_data.ACaps
   152 	v_caps = stream_data.VCaps
   153 	stream_id = stream_data.Id
   154 
   155         self.streams.append (stream_data)
   156 
   157     def play(self):
   158 
   159         print "Trying to play pipeline: %s" % self.pipe
   160         try:
   161             if (self.pipe):
   162 		print "Waiting for connection"
   163 		self.socket.listen(1)
   164 		print "Connection Requested"
   165 	        #Create socket
   166 		self.connection, self.addr = self.socket.accept ()
   167 
   168 		stream_data = self.streams[0]
   169 		stream_data.Sink.set_property ("fd", self.connection.fileno());
   170                 print "Connected"
   171 
   172                 self.pipe.set_state(gst.STATE_PLAYING)
   173         except gobject.GError, e:
   174             print "Error: " + str(e)
   175 
   176 
   177     def stop(self):
   178 
   179         print "Trying to stop pipeline: %s" % self.pipe
   180         try:
   181             if (self.pipeline):
   182                 self.connection.close ()
   183                 self.pipeline.set_state(gst.STATE_NULL)
   184         except gobject.GError, e:
   185             print "Error: " + str(e)
   186 
   187     def __on_bus_message (self, bus, message, stream_data):
   188 
   189         t = message.type
   190         if (t == gst.MESSAGE_STATE_CHANGED):
   191             oldstate = -1
   192             newstate = -1
   193             pending = -1
   194             oldstate, newstate, pending = message.parse_state_changed ()
   195             if ((oldstate == gst.STATE_READY) and \
   196                 (newstate == gst.STATE_PAUSED) and \
   197                 (pending == gst.STATE_VOID_PENDING) and \
   198                 (stream_data.Ready == False)):
   199                 state_changed_status, current_state, pending_state = stream_data.Pipe.get_state () 
   200 		if ((current_state == gst.STATE_PAUSED) and \
   201                     (pending_state == gst.STATE_VOID_PENDING)):
   202                     print "Pipe paused"
   203                     stream_data.Loop.quit ()
   204                     stream_data.Ready = True
   205         elif (t == gst.MESSAGE_ERROR):
   206             err, debug = message.parse_error()
   207 	    print "Error: %s" % err, debug
   208             stream_data.Loop.quit ()
   209             stream_data.Ready = False
   210 
   211         return True
   212  
   213     def __on_decode_unknown_type (self, decode, pad, caps, stream_data):
   214 
   215         print "Unknown Type"
   216         return None
   217 
   218     def __on_decode_new_pad (self, decode, pad, arg1, stream_data):
   219         
   220         caps = pad.get_caps().to_string()
   221         print "New pad " + caps
   222 	if (caps.rfind ("audio") != -1):
   223             apad = stream_data.Abin.get_pad ("sink")
   224             if (pad.link (apad) != gst.PAD_LINK_OK):
   225                 print "Error on link audio pad"
   226                 return None
   227         elif (caps.rfind ("video") != -1):
   228             vpad = stream_data.Vbin.get_pad ("sink")
   229             if (pad.link (vpad) != gst.PAD_LINK_OK):
   230                 print "Error on link video pad"
   231                 return None
   232         else:
   233             print "Invalid caps"
   234 
   235