gmyth-stream/server/0.2/lib/server.py
author rosfran
Fri May 18 01:34:31 2007 +0100 (2007-05-18)
branchtrunk
changeset 689 74c1f4270cf0
parent 686 b29ea6deb6f8
child 717 24db16480456
permissions -rw-r--r--
[svn r695] Fixed erroneous maximum iteration count on reading zero-sized buffers on FileTransfer.
morphbr@565
     1
#!/usr/bin/env python
morphbr@565
     2
morphbr@565
     3
__author__ = "Gustavo Sverzut Barbieri / Artur Duque de Souza"
morphbr@565
     4
__author_email__ = "barbieri@gmail.com / artur.souza@indt.org.br"
morphbr@565
     5
__license__ = "GPL"
morphbr@565
     6
__version__ = "0.3"
morphbr@565
     7
morphbr@565
     8
import os
morphbr@565
     9
import threading
morphbr@565
    10
import SocketServer
morphbr@565
    11
import BaseHTTPServer
morphbr@565
    12
import socket
morphbr@565
    13
import urlparse
morphbr@565
    14
import cgi
morphbr@565
    15
import lib.utils as utils
morphbr@565
    16
import logging as log
morphbr@565
    17
morphbr@565
    18
__all__ = ("Transcoder", "RequestHandler", "Server", "serve_forever",
morphbr@565
    19
           "load_plugins_transcoders")
morphbr@565
    20
morphbr@565
    21
class Transcoder(object):
morphbr@577
    22
    log = log.getLogger("gms.transcoder")
morphbr@565
    23
    priority = 0   # negative values have higher priorities
morphbr@565
    24
    name = None # to be used in requests
morphbr@595
    25
    status = None
morphbr@565
    26
morphbr@565
    27
    def __init__(self, params):
morphbr@565
    28
        self.params = params
morphbr@565
    29
    # __init__()
morphbr@565
    30
morphbr@565
    31
morphbr@565
    32
    def params_first(self, key, default=None):
morphbr@565
    33
        if default is None:
morphbr@565
    34
            return self.params[key][0]
morphbr@565
    35
        else:
morphbr@565
    36
            try:
morphbr@565
    37
                return self.params[key][0]
morphbr@565
    38
            except:
morphbr@565
    39
                return default
morphbr@565
    40
    # params_first()
morphbr@565
    41
morphbr@565
    42
morphbr@565
    43
    def get_mimetype(self):
morphbr@565
    44
        mux = self.params_first("mux", "mpg")
morphbr@565
    45
morphbr@565
    46
        if mux == "mpeg":
morphbr@565
    47
            return "video/mpeg"
morphbr@565
    48
        elif mux == "avi":
morphbr@565
    49
            return "video/x-msvideo"
morphbr@565
    50
        else:
morphbr@565
    51
            return "application/octet-stream"
morphbr@565
    52
    # get_mimetype()
morphbr@565
    53
morphbr@565
    54
morphbr@565
    55
    def start(self, outfile):
morphbr@565
    56
        return True
morphbr@565
    57
    # start()
morphbr@565
    58
morphbr@565
    59
morphbr@565
    60
    def stop(self):
morphbr@565
    61
        return Tru
morphbr@565
    62
    # stop()
morphbr@565
    63
morphbr@565
    64
morphbr@565
    65
    def __str__(self):
morphbr@653
    66
        return '%s( params=%s )' % \
morphbr@565
    67
               (self.__class__.__name__,
morphbr@653
    68
                self.params)
morphbr@565
    69
    # __str__()
morphbr@565
    70
# Transcoder
morphbr@565
    71
morphbr@565
    72
morphbr@565
    73
morphbr@565
    74
class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
morphbr@577
    75
    log = log.getLogger("gms.request")
morphbr@565
    76
    def_transcoder = None
morphbr@565
    77
    transcoders = utils.PluginSet(Transcoder)
morphbr@565
    78
morphbr@585
    79
    menu = {
morphbr@638
    80
        "Stop": "/stop-transcoder.do",
morphbr@585
    81
        "Status": "/status.do",
morphbr@638
    82
        "Version": "/version.do",
morphbr@585
    83
        "Shutdown": "/shutdown.do"
morphbr@585
    84
        }
morphbr@585
    85
morphbr@565
    86
    @classmethod
morphbr@565
    87
    def load_plugins_transcoders(cls, directory):
morphbr@565
    88
        cls.transcoders.load_from_directory(directory)
morphbr@565
    89
morphbr@565
    90
        if cls.def_transcoder is None and cls.transcoders:
morphbr@565
    91
            cls.def_transcoder = cls.transcoders[0].name
morphbr@565
    92
    # load_plugins_transcoders()
morphbr@565
    93
morphbr@565
    94
morphbr@565
    95
    def do_dispatch(self, body):
morphbr@565
    96
        self.url = self.path
morphbr@565
    97
morphbr@565
    98
        pieces = urlparse.urlparse(self.path)
morphbr@565
    99
        self.path = pieces[2]
morphbr@565
   100
        self.query = cgi.parse_qs(pieces[4])
morphbr@565
   101
morphbr@565
   102
        if self.path == "/":
morphbr@565
   103
            self.serve_main(body)
morphbr@565
   104
        elif self.path == "/shutdown.do":
morphbr@565
   105
            self.serve_shutdown(body)
morphbr@565
   106
        elif self.path == "/stop-transcoder.do":
morphbr@565
   107
            self.serve_stop_transcoder(body)
morphbr@565
   108
        elif self.path == "/status.do":
morphbr@565
   109
            self.serve_status(body)
morphbr@638
   110
        elif self.path == "/version.do":
morphbr@638
   111
            self.serve_version(body)
morphbr@565
   112
        elif self.path == "/stream.do":
morphbr@565
   113
            self.serve_stream(body)
morphbr@565
   114
        else:
morphbr@628
   115
            action = self.query.get("action", None)
morphbr@688
   116
            if "stream.do" in action:
morphbr@628
   117
                self.serve_stream(body)
morphbr@628
   118
            else:
morphbr@628
   119
                self.send_error(404, "File not found")
morphbr@565
   120
    # do_dispatch()
morphbr@565
   121
morphbr@565
   122
morphbr@565
   123
    def do_GET(self):
morphbr@565
   124
        self.do_dispatch(True)
morphbr@565
   125
    # do_GET()
morphbr@565
   126
morphbr@565
   127
morphbr@565
   128
    def do_HEAD(self):
morphbr@565
   129
        self.do_dispatch(False)
morphbr@565
   130
    # do_HEAD()
morphbr@565
   131
morphbr@565
   132
morphbr@565
   133
    def _nav_items(self):
morphbr@585
   134
        ret = ""
morphbr@585
   135
        for name, url in self.menu.items():
morphbr@585
   136
            ret += utils.getHTML("menu", {"name": name, "url": url})
morphbr@585
   137
morphbr@585
   138
        return ret
morphbr@565
   139
    # _nav_items()
morphbr@565
   140
morphbr@565
   141
    def serve_main(self, body):
morphbr@565
   142
        self.send_response(200)
morphbr@565
   143
        self.send_header("Content-Type", "text/html")
morphbr@565
   144
        self.send_header('Connection', 'close')
morphbr@565
   145
        self.end_headers()
morphbr@565
   146
        if body:
morphbr@585
   147
            self.wfile.write(utils.getHTML("index", {"menu": self._nav_items()}))
morphbr@565
   148
    # serve_main()
morphbr@565
   149
morphbr@638
   150
    def serve_version(self, body):
morphbr@638
   151
        self.send_response(200)
morphbr@638
   152
        self.send_header("Content-Type", "text/html")
morphbr@638
   153
        self.send_header('Connection', 'close')
morphbr@638
   154
        self.end_headers()
morphbr@638
   155
        if body:
morphbr@638
   156
            self.wfile.write("Version: %s" %  __version__)
morphbr@638
   157
morphbr@565
   158
morphbr@565
   159
    def serve_shutdown(self, body):
morphbr@565
   160
        self.send_response(200)
morphbr@565
   161
        self.send_header("Content-Type", "text/html")
morphbr@565
   162
        self.send_header('Connection', 'close')
morphbr@565
   163
        self.end_headers()
morphbr@565
   164
        if body:
morphbr@585
   165
            self.wfile.write(utils.getHTML("shutdown"))
morphbr@565
   166
        self.server.server_close()
morphbr@565
   167
    # serve_shutdown()
morphbr@565
   168
morphbr@565
   169
morphbr@565
   170
    def serve_stop_all_transcoders(self, body):
morphbr@565
   171
        self.send_response(200)
morphbr@565
   172
        self.send_header("Content-Type", "text/html")
morphbr@565
   173
        self.send_header('Connection', 'close')
morphbr@565
   174
        self.end_headers()
morphbr@565
   175
        if body:
morphbr@565
   176
            self.server.stop_transcoders()
morphbr@585
   177
            self.wfile.write(utils.getHTML("stop_all", {"menu": self._nav_items()}))
morphbr@565
   178
    # serve_stop_all_transcoders()
morphbr@565
   179
morphbr@565
   180
morphbr@565
   181
    def serve_stop_selected_transcoders(self, body, requests):
morphbr@565
   182
        self.send_response(200)
morphbr@565
   183
        self.send_header("Content-Type", "text/html")
morphbr@565
   184
        self.send_header('Connection', 'close')
morphbr@565
   185
        self.end_headers()
morphbr@585
   186
        opts = ""
morphbr@565
   187
        if body:
morphbr@565
   188
            transcoders = self.server.get_transcoders()
morphbr@565
   189
morphbr@565
   190
            for req in requests:
morphbr@565
   191
                try:
morphbr@565
   192
                    host, port = req.split(":")
morphbr@565
   193
                except IndexError:
morphbr@565
   194
                    continue
morphbr@565
   195
morphbr@565
   196
                port = int(port)
morphbr@565
   197
                addr = (host, port)
morphbr@565
   198
morphbr@565
   199
                for t, r in transcoders:
morphbr@565
   200
                    if r.client_address == addr:
morphbr@571
   201
                        try:
morphbr@571
   202
                            t.stop()
morphbr@571
   203
                        except Exception, e:
morphbr@571
   204
                            self.log.info("Plugin already stopped")
morphbr@571
   205
morphbr@585
   206
                        opts += self._create_html_item("%s: %s:%s" % (
morphbr@585
   207
                            t, addr[0], addr[1]))
morphbr@585
   208
morphbr@565
   209
                        break
morphbr@585
   210
morphbr@585
   211
                self.wfile.write(utils.getHTML("stop_selected",
morphbr@585
   212
                                               {"menu": self._nav_items(),
morphbr@585
   213
                                                "opts": opts}))
morphbr@565
   214
    # serve_stop_selected_transcoders()
morphbr@565
   215
morphbr@565
   216
morphbr@565
   217
    def serve_stop_transcoder(self, body):
morphbr@565
   218
        req = self.query.get("request", None)
morphbr@565
   219
        if req and "all" in req:
morphbr@565
   220
            self.serve_stop_all_transcoders(body)
morphbr@565
   221
        elif req:
morphbr@565
   222
            self.serve_stop_selected_transcoders(body, req)
morphbr@565
   223
        else:
morphbr@565
   224
            self.serve_status(body)
morphbr@565
   225
    # serve_stop_transcoder()
morphbr@565
   226
morphbr@565
   227
morphbr@565
   228
    def serve_status(self, body):
morphbr@565
   229
        self.send_response(200)
morphbr@565
   230
        self.send_header("Content-Type", "text/html")
morphbr@565
   231
        self.send_header('Connection', 'close')
morphbr@565
   232
        self.end_headers()
morphbr@682
   233
        stopone = ""
morphbr@585
   234
morphbr@565
   235
        if body:
morphbr@565
   236
            tl = self.server.get_transcoders()
morphbr@565
   237
            if not tl:
morphbr@585
   238
                running = "<p>No running transcoder.</p>\n"
morphbr@585
   239
                stopall = ""
morphbr@653
   240
morphbr@653
   241
            elif self.query.get("ip") and self.query.get("file"):
morphbr@653
   242
                for transcoder, request in tl:
morphbr@653
   243
                    filename = "%s" % self.query.get("file")[0]
morphbr@653
   244
                    tfilename = "%s" % transcoder.params_first("uri")
morphbr@653
   245
morphbr@653
   246
                    if tfilename.find(filename) >= 0 and \
morphbr@653
   247
                           request.client_address[0] == self.query.get("ip")[0]:
morphbr@653
   248
                        self.wfile.write("Status: %s %%" % transcoder.status)
morphbr@684
   249
                        return True
morphbr@684
   250
morphbr@684
   251
                return False
morphbr@653
   252
morphbr@565
   253
            else:
morphbr@585
   254
                running = "<p>Running transcoders:</p>\n"
morphbr@682
   255
                stopall = utils._create_html_item("<a href='%s?request=all'>"
morphbr@653
   256
                                                 "[STOP ALL]</a>" %
morphbr@585
   257
                                                 self.menu["Stop"])
morphbr@585
   258
morphbr@565
   259
                for transcoder, request in tl:
morphbr@682
   260
                    stopone += utils._create_html_item("%s: %s:%s<a href='%s?"
morphbr@682
   261
                                                       "request=%s:%s'>"
morphbr@682
   262
                                                       "[STOP]</a> - "
morphbr@682
   263
                                                       "Status: %s%%"\
morphbr@653
   264
                                                     % (
morphbr@653
   265
                        transcoder, request.client_address[0],
morphbr@653
   266
                        request.client_address[1],
morphbr@653
   267
                        self.menu["Stop"], request.client_address[0],
morphbr@653
   268
                        request.client_address[1],
morphbr@595
   269
                        transcoder.status) )
morphbr@565
   270
morphbr@682
   271
                self.wfile.write(utils.getHTML("status",
morphbr@682
   272
                                               {"menu": self._nav_items(),
morphbr@682
   273
                                                "running": running,
morphbr@682
   274
                                                "stopall": stopall,
morphbr@682
   275
                                                "stopone": stopone}))
morphbr@565
   276
    # serve_status()
morphbr@565
   277
morphbr@565
   278
morphbr@565
   279
    def _get_transcoder(self):
morphbr@565
   280
        # get transcoder option: mencoder is the default
morphbr@565
   281
        request_transcoders = self.query.get("transcoder", ["mencoder"])
morphbr@565
   282
morphbr@565
   283
        for t in request_transcoders:
morphbr@565
   284
            transcoder = self.transcoders.get(t)
morphbr@565
   285
            if transcoder:
morphbr@565
   286
                return transcoder
morphbr@565
   287
morphbr@565
   288
        if not transcoder:
morphbr@565
   289
            return self.transcoders[self.def_transcoder]
morphbr@565
   290
    # _get_transcoder()
morphbr@565
   291
morphbr@565
   292
morphbr@565
   293
    def serve_stream(self, body):
morphbr@565
   294
        transcoder = self._get_transcoder()
morphbr@565
   295
        try:
morphbr@572
   296
            obj = transcoder(self.query)
morphbr@565
   297
        except Exception, e:
morphbr@565
   298
            self.send_error(500, str(e))
morphbr@565
   299
            return
morphbr@565
   300
morphbr@565
   301
        self.send_response(200)
morphbr@565
   302
        self.send_header("Content-Type", obj.get_mimetype())
morphbr@565
   303
        self.send_header('Connection', 'close')
morphbr@565
   304
        self.end_headers()
morphbr@565
   305
morphbr@565
   306
        if body:
morphbr@565
   307
            self.server.add_transcoders(self, obj)
morphbr@565
   308
            obj.start(self.wfile)
morphbr@565
   309
            self.server.del_transcoders(self, obj)
morphbr@565
   310
    # serve_stream()
morphbr@565
   311
morphbr@565
   312
morphbr@565
   313
    def log_request(self, code='-', size='-'):
morphbr@565
   314
        self.log.info('"%s" %s %s', self.requestline, str(code), str(size))
morphbr@565
   315
    # log_request()
morphbr@565
   316
morphbr@565
   317
morphbr@565
   318
    def log_error(self, format, *args):
morphbr@565
   319
        self.log.error("%s: %s" % (self.address_string(), format % args))
morphbr@565
   320
    # log_error()
morphbr@565
   321
morphbr@565
   322
morphbr@565
   323
    def log_message(self, format, *args):
morphbr@565
   324
        self.log.info("%s: %s" % (self.address_string(), format % args))
morphbr@565
   325
    # log_message()
morphbr@565
   326
# RequestHandler
morphbr@565
   327
morphbr@565
   328
morphbr@565
   329
morphbr@565
   330
class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
morphbr@577
   331
    log = log.getLogger("gms.server")
morphbr@565
   332
    run = True
morphbr@565
   333
    _transcoders = {}
morphbr@565
   334
    _lock = threading.RLock()
morphbr@565
   335
morphbr@565
   336
    def serve_forever(self):
morphbr@565
   337
        self.log.info("GMyth-Streamer serving HTTP on %s:%s" %
morphbr@565
   338
                      self.socket.getsockname())
morphbr@565
   339
        try:
morphbr@565
   340
            while self.run:
morphbr@565
   341
                self.handle_request()
morphbr@565
   342
        except KeyboardInterrupt, e:
morphbr@565
   343
            pass
morphbr@565
   344
morphbr@565
   345
        self.log.debug("Stopping all remaining transcoders...")
morphbr@565
   346
        self.stop_transcoders()
morphbr@565
   347
        self.log.debug("Transcoders stopped!")
morphbr@565
   348
    # serve_forever()
morphbr@565
   349
morphbr@565
   350
morphbr@565
   351
    def get_request(self):
morphbr@565
   352
        skt = self.socket
morphbr@565
   353
        old = skt.gettimeout()
morphbr@565
   354
        skt.settimeout(0.5)
morphbr@565
   355
        while self.run:
morphbr@565
   356
            try:
morphbr@565
   357
                r = skt.accept()
morphbr@565
   358
                skt.settimeout(old)
morphbr@565
   359
                return r
morphbr@565
   360
            except socket.timeout, e:
morphbr@565
   361
                pass
morphbr@565
   362
        raise socket.error("Not running")
morphbr@565
   363
    # get_request()
morphbr@565
   364
morphbr@565
   365
morphbr@565
   366
    def server_close(self):
morphbr@565
   367
        self.run = False
morphbr@565
   368
        self.stop_transcoders()
morphbr@565
   369
morphbr@565
   370
        BaseHTTPServer.HTTPServer.server_close(self)
morphbr@565
   371
    # server_close()
morphbr@565
   372
morphbr@565
   373
morphbr@565
   374
    def stop_transcoders(self):
morphbr@565
   375
        self._lock.acquire()
morphbr@565
   376
        for transcoder, request in self._transcoders.iteritems():
morphbr@565
   377
            self.log.info("Stop transcoder: %s, client=%s" %
morphbr@565
   378
                          (transcoder, request.client_address))
morphbr@565
   379
            transcoder.stop()
morphbr@565
   380
        self._lock.release()
morphbr@565
   381
    # stop_transcoders()
morphbr@565
   382
morphbr@565
   383
morphbr@565
   384
    def get_transcoders(self):
morphbr@565
   385
        self._lock.acquire()
morphbr@565
   386
        try:
morphbr@565
   387
            return self._transcoders.items()
morphbr@565
   388
        finally:
morphbr@565
   389
            self._lock.release()
morphbr@565
   390
    # get_transcoders()
morphbr@565
   391
morphbr@565
   392
morphbr@565
   393
    def add_transcoders(self, request, transcoder):
morphbr@565
   394
        self._lock.acquire()
morphbr@565
   395
        try:
morphbr@565
   396
            self._transcoders[transcoder] = request
morphbr@565
   397
        finally:
morphbr@565
   398
            self._lock.release()
morphbr@565
   399
    # add_transcoders()
morphbr@565
   400
morphbr@565
   401
morphbr@565
   402
    def del_transcoders(self, request, transcoder):
morphbr@565
   403
        self._lock.acquire()
morphbr@565
   404
        try:
morphbr@565
   405
            del self._transcoders[transcoder]
morphbr@565
   406
        finally:
morphbr@565
   407
            self._lock.release()
morphbr@565
   408
    # del_transcoders()
morphbr@565
   409
# Server
morphbr@565
   410
morphbr@565
   411
morphbr@565
   412
morphbr@565
   413
def serve_forever(host="0.0.0.0", port=40000):
morphbr@565
   414
    addr = (host, port)
morphbr@565
   415
    RequestHandler.protocol_version = "HTTP/1.0"
morphbr@565
   416
    httpd = Server(addr, RequestHandler)
morphbr@565
   417
    httpd.serve_forever()
morphbr@565
   418
# serve_forever()
morphbr@565
   419
morphbr@565
   420
morphbr@565
   421
def load_plugins_transcoders(directory):
morphbr@565
   422
    RequestHandler.load_plugins_transcoders(directory)
morphbr@565
   423
# load_plugins_transcoders()