[svn r658] Added g_main_context_iteration to while loop on reading from FileTransfer.
3 __author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
4 __author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
15 import lib.utils as utils
18 __all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
19 "load_plugins_transcoders")
21 class Transcoder(object):
22 log = log.getLogger("gms.transcoder")
23 priority = 0 # negative values have higher priorities
24 name = None # to be used in requests
27 def __init__(self, params):
32 def params_first(self, key, default=None):
34 return self.params[key][0]
37 return self.params[key][0]
43 def get_mimetype(self):
44 mux = self.params_first("mux", "mpg")
49 return "video/x-msvideo"
51 return "application/octet-stream"
55 def start(self, outfile):
66 return '%s("%s", mux="%s", params=%s, addr=%s)' % \
67 (self.__class__.__name__,
68 self.params_first("uri", "None"),
69 self.params_first("mux", "mpg"),
77 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
78 log = log.getLogger("gms.request")
80 transcoders = utils.PluginSet(Transcoder)
83 "Stop": "/stop-transcoder.do",
84 "Status": "/status.do",
85 "Version": "/version.do",
86 "Shutdown": "/shutdown.do"
90 def load_plugins_transcoders(cls, directory):
91 cls.transcoders.load_from_directory(directory)
93 if cls.def_transcoder is None and cls.transcoders:
94 cls.def_transcoder = cls.transcoders[0].name
95 # load_plugins_transcoders()
98 def do_dispatch(self, body):
101 pieces = urlparse.urlparse(self.path)
102 self.path = pieces[2]
103 self.query = cgi.parse_qs(pieces[4])
106 self.serve_main(body)
107 elif self.path == "/shutdown.do":
108 self.serve_shutdown(body)
109 elif self.path == "/stop-transcoder.do":
110 self.serve_stop_transcoder(body)
111 elif self.path == "/status.do":
112 self.serve_status(body)
113 elif self.path == "/version.do":
114 self.serve_version(body)
115 elif self.path == "/stream.do":
116 self.serve_stream(body)
118 action = self.query.get("action", None)
119 if action is not None:
120 self.serve_stream(body)
122 self.send_error(404, "File not found")
127 self.do_dispatch(True)
132 self.do_dispatch(False)
136 def _nav_items(self):
138 for name, url in self.menu.items():
139 ret += utils.getHTML("menu", {"name": name, "url": url})
144 def _create_html_item(self, opt):
145 return "<li>%s</li>\n" % opt
148 def serve_main(self, body):
149 self.send_response(200)
150 self.send_header("Content-Type", "text/html")
151 self.send_header('Connection', 'close')
154 self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
157 def serve_version(self, body):
158 self.send_response(200)
159 self.send_header("Content-Type", "text/html")
160 self.send_header('Connection', 'close')
163 self.wfile.write("Version: %s" % __version__)
166 def serve_shutdown(self, body):
167 self.send_response(200)
168 self.send_header("Content-Type", "text/html")
169 self.send_header('Connection', 'close')
172 self.wfile.write(utils.getHTML("shutdown"))
173 self.server.server_close()
177 def serve_stop_all_transcoders(self, body):
178 self.send_response(200)
179 self.send_header("Content-Type", "text/html")
180 self.send_header('Connection', 'close')
183 self.server.stop_transcoders()
184 self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
185 # serve_stop_all_transcoders()
188 def serve_stop_selected_transcoders(self, body, requests):
189 self.send_response(200)
190 self.send_header("Content-Type", "text/html")
191 self.send_header('Connection', 'close')
195 transcoders = self.server.get_transcoders()
199 host, port = req.split(":")
206 for t, r in transcoders:
207 if r.client_address == addr:
211 self.log.info("Plugin already stopped")
213 opts += self._create_html_item("%s: %s:%s" % (
214 t, addr[0], addr[1]))
218 self.wfile.write(utils.getHTML("stop_selected",
219 {"menu": self._nav_items(),
221 # serve_stop_selected_transcoders()
224 def serve_stop_transcoder(self, body):
225 req = self.query.get("request", None)
226 if req and "all" in req:
227 self.serve_stop_all_transcoders(body)
229 self.serve_stop_selected_transcoders(body, req)
231 self.serve_status(body)
232 # serve_stop_transcoder()
235 def serve_status(self, body):
236 self.send_response(200)
237 self.send_header("Content-Type", "text/html")
238 self.send_header('Connection', 'close')
242 tl = self.server.get_transcoders()
244 running = "<p>No running transcoder.</p>\n"
248 running = "<p>Running transcoders:</p>\n"
249 stopall = self._create_html_item("<a href='%s?request=all'>[STOP ALL]</a>" %
252 for transcoder, request in tl:
253 stopone = self._create_html_item("%s: %s:%s<a href='%s?request=%s:%s'>"
254 "[STOP]</a> - Status: %s%%" % (
255 transcoder, request.client_address[0], request.client_address[1],
256 self.menu["Stop"], request.client_address[0], request.client_address[1],
259 self.wfile.write(utils.getHTML("status",
260 {"menu": self._nav_items(),
263 "stopone": stopone}))
267 def _get_transcoder(self):
268 # get transcoder option: mencoder is the default
269 request_transcoders = self.query.get("transcoder", ["mencoder"])
271 for t in request_transcoders:
272 transcoder = self.transcoders.get(t)
277 return self.transcoders[self.def_transcoder]
281 def serve_stream(self, body):
282 transcoder = self._get_transcoder()
284 obj = transcoder(self.query)
286 self.send_error(500, str(e))
289 self.send_response(200)
290 self.send_header("Content-Type", obj.get_mimetype())
291 self.send_header('Connection', 'close')
295 self.server.add_transcoders(self, obj)
296 obj.start(self.wfile)
297 self.server.del_transcoders(self, obj)
301 def log_request(self, code='-', size='-'):
302 self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
306 def log_error(self, format, *args):
307 self.log.error("%s: %s" % (self.address_string(), format % args))
311 def log_message(self, format, *args):
312 self.log.info("%s: %s" % (self.address_string(), format % args))
318 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
319 log = log.getLogger("gms.server")
322 _lock = threading.RLock()
324 def serve_forever(self):
325 self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
326 self.socket.getsockname())
329 self.handle_request()
330 except KeyboardInterrupt, e:
333 self.log.debug("Stopping all remaining transcoders...")
334 self.stop_transcoders()
335 self.log.debug("Transcoders stopped!")
339 def get_request(self):
341 old = skt.gettimeout()
348 except socket.timeout, e:
350 raise socket.error("Not running")
354 def server_close(self):
356 self.stop_transcoders()
358 BaseHTTPServer.HTTPServer.server_close(self)
362 def stop_transcoders(self):
364 for transcoder, request in self._transcoders.iteritems():
365 self.log.info("Stop transcoder: %s, client=%s" %
366 (transcoder, request.client_address))
372 def get_transcoders(self):
375 return self._transcoders.items()
381 def add_transcoders(self, request, transcoder):
384 self._transcoders[transcoder] = request
390 def del_transcoders(self, request, transcoder):
393 del self._transcoders[transcoder]
401 def serve_forever(host="0.0.0.0", port=40000):
403 RequestHandler.protocol_version = "HTTP/1.0"
404 httpd = Server(addr, RequestHandler)
405 httpd.serve_forever()
409 def load_plugins_transcoders(directory):
410 RequestHandler.load_plugins_transcoders(directory)
411 # load_plugins_transcoders()