现在的位置: 首页 > 综合 > 正文

python连接scribe收集apache日志

2013年10月22日 ⁄ 综合 ⁄ 共 3636字 ⁄ 字号 评论关闭
#!/usr/bin/env python

import optparse
import os
import sys
import time
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.transport.TTransport import TTransportException
from thrift.protocol import TBinaryProtocol
from scribe import scribe

class Error(Exception): pass
class FileError(Error): pass

class Tail(object):

    def __init__(self, path, sleep=1.0, reopen_count=5):
        self.path = path
        self.sleep = sleep
        self.reopen_count = reopen_count

    def __iter__(self):
        while True:
            pos = self.file.tell()
            line = self.file.readline()
            if not line:
                self.wait(pos)
            else:
                yield line

    def open(self, tail=True):
        try:
            self.real_path = os.path.realpath(self.path)
            self.inode = os.stat(self.path).st_ino
        except OSError, error:
            raise FileError(error)
        try:
            self.file = open(self.real_path)
        except IOError, error:
            raise FileError(error)
        if tail:
            self.file.seek(0, 2)

    def close(self):
        try:
            self.file.close()
        except Exception:
            pass

    def reopen(self):
        self.close()
        reopen_count = self.reopen_count
        while reopen_count >= 0:
            reopen_count -= 1
            try:
                self.open(tail=False)
                return True
            except FileError:
                time.sleep(self.sleep)
        return False

    def check(self, pos):
        try:
            if self.real_path != os.path.realpath(self.path):
                return True
            stat = os.stat(self.path)
            if self.inode != stat.st_ino:
                return True
            if pos > stat.st_size:
                return True
        except OSError:
            return True
        return False

    def wait(self, pos):
        if self.check(pos):
            if not self.reopen():
                raise Error('Unable to reopen file: %s' % self.path)
        else:
            self.file.seek(pos)
            time.sleep(self.sleep)

def scribe_fix_legacy():
    global scribe
    old_log_entry = scribe.LogEntry
    def new_log_entry(**kwargs):
        return old_log_entry(kwargs)
    scribe.LogEntry = new_log_entry

def handle(path, category, host='127.0.0.1', port=1463, prefix='', postfix=''):
    result = 0

    socket = TSocket.TSocket(host=host, port=port)
    transport = TTransport.TFramedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocol(
        trans=transport,
        strictRead=False,
        strictWrite=False,
    )
    client = scribe.Client(iprot=protocol, oprot=protocol)

    try:
        transport.open()

        tail = Tail(path)
        try:
            tail.open()
            for line in tail:
                try:
                    log_entry = scribe.LogEntry(
                        category=category,
                        message=prefix+line+postfix,
                    )
                except TypeError:
                    scribe_fix_legacy()
                    log_entry = scribe.LogEntry(
                        category=category,
                        message=prefix+line+postfix,
                    )
                result = client.Log(messages=[log_entry])
        finally:
            tail.close()
    finally:
        try:
            transport.close()
        except Exception:
            pass

    if result == scribe.ResultCode.OK:
        pass
    elif result == scribe.ResultCode.TRY_LATER:
        raise Error('Scribe Error: TRY LATER')
    else:
        raise Error('Scribe Error: Unknown error code (%s)' % result)

if __name__ == '__main__':
    parser = optparse.OptionParser()
    parser.add_option(
        '--file',
        dest='file',
        help='file to tail into Scribe',
        metavar='FILE',
    )
    parser.add_option(
        '--category',
        dest='category',
        help='Scribe category',
        metavar='CATEGORY',
    )
    parser.add_option(
        '--host',
        default='127.0.0.1',
        dest='host',
        help='destination Scribe host server',
        metavar='HOST',
    )
    parser.add_option(
        '--port',
        default=1463,
        dest='port',
        help='destination Scribe port',
        metavar='PORT',
        type='int',
    )
    parser.add_option(
        '--prefix',
        default='',
        dest='prefix',
        help='add to the beginning of each log line',
        metavar='PREFIX',
    )
    parser.add_option(
        '--postfix',
        default='',
        dest='postfix',
        help='add to the end of each log line',
        metavar='POSTFIX',
    )
    options, args = parser.parse_args()

    if options.file and options.category:
        try:
            handle(
                path=options.file,
                category=options.category,
                host=options.host,
                port=options.port,
                prefix=options.prefix,
                postfix=options.postfix,
            )
        except KeyboardInterrupt:
            sys.exit(0)
        except (Error, TTransportException), error:
            print >> sys.stderr, error
            sys.exit(1)
    else:
        parser.print_help()

usage: scribe_log [options]

options:
  -h, --help           show this help message and exit
  --file=FILE          file to tail into Scribe
  --category=CATEGORY  Scribe category
  --host=HOST          destination Scribe host server
  --port=PORT          destination Scribe port
  --prefix=PREFIX      add to the beginning of each log line
  --postfix=POSTFIX    add to the end of each log line

抱歉!评论已关闭.