现在的位置: 首页 > 综合 > 正文

变通的思维方式-记一次业务需求的处理过程

2013年09月01日 ⁄ 综合 ⁄ 共 3898字 ⁄ 字号 评论关闭

事情起于一个业务需求,具体的需求是这样的:

总体来说是一个监控需求。

问题的背景是这样的:我们有一个用scribe搭建起的分布式日志收集系统,通过scribe服务从各个IDC的产品中收集业务事件日志(按照不同的category分开),存储于数据仓库系统,供数据分析使用。我们的数据仓库系统是基于Hadoop搭建的,所以我们最终就直接使用scribe直接将各个方向汇集来的日志按一定频率同步到Hadoop集群中。受限于其实现机制,我们不能将数据实时地同步到的机群众,当然,这其中有效率的考量,毕竟HDFS是被设计成一次写入,多次读取的
;所以我们采取了一个折中的办法,每隔10分钟通过scribe的reload操作,强制缓存的数据更新到集群中。

需求是这样的:由于业务系统不可避免地会出现各种各样的问题,比如宕机,或者程序本身有bug,导致一些数据不能成功报送到数据中心。为此,我们需要建立一种监控机制,来监测不能报送的情况。

尝试过的机制

最初,我们的OLAP工程师使用Hive来手动查询最新的数据,来判断最近是否有新的数据进入到特定的category中,但是这种机制有一个明显的缺陷,首先基于Hive的查询,其粒度在我们的使用模式下最小就是一个category的数据,也就是说我们每进行一次判断就要至少扫描一个category的数据 ,这从效率上来说就已经大打折扣了,何况这样的检测会按照一定的频率持续进行。

后来,我对方案做了一些改进:记录每个category数据的最后的进入时间到memcached中,然后根据存储category最后更新时间来进行就爱内控;我们通过一个实现了scribe协议的python应用,对其源代码进行了一些修改,从而实现了一个类似于钩子的东西,我们通过将scribe的配置分出一份到这个python版的scribe服务,来截取我们需要的数据。但是具体的使用中我们遇到了服务进程运行占用资源过高,并且python版本的scribe实现的bug导致了我们数据的损坏(未同步的并行写入);后来我直接hack了scribe的C++源代码,使用的时候还是会导致进程占用资源过高,因为,毕竟每秒进来的数据太多了!

在有了以上的经验和教训之后,我把目光转向了scribe在reload时产生的日志!是的,scribe在reload时会打印出每个category和HDFS目录的一些详细信息,在这里我们仅仅是需要category信息。所以我决定跟踪scribe的日志,来获取我们想要的数据。想到跟踪日志,我很自然地想到了tail命令,想到了tail命令我想到了它会把最新的数据打印到屏幕上,接着我又想到屏幕仅仅是一个文件,所以我想输出的信息通过管道给其它的程序作为输入,所以我们很自然地想到了些一个程序来“消费”这些信息,所以我拿起我最擅长的Python写了一个只有几十行的程序,最后通过shell脚本将scribe日志和程序通过管道(|)结合起来。程序运行之后,一切都显得很正常,负载和时效性都非常令人满意,虽然这个方案牺牲了一定的实时性,但是我们这里不需要真正的实时,每10分钟一次的检测够了!所以在有的时候要做适当的折衷,思路要open。

其实和大家分享这个历程也是想和大家分享一种变通的思维方式,找到与问题关联的因素,看看能否从这些因素上找到问题的切入点,也许一些令人头疼的问题的解决方案就隐藏在其中。

最后附上shell脚本的内容:

tail -f var/log/scribe.log | ./monitor.py

python处理代码,其中涉及到逻辑处理的部分不超过20行-_-:

#!/usr/local/services/biutility/bin/python
import sys
import re
import time

from sqlalchemy.engine import create_engine
from sqlalchemy.schema import MetaData
from sqlalchemy.schema import Table
from sqlalchemy.schema import Column
from sqlalchemy.schema import Index
from sqlalchemy.schema import ForeignKey
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.schema import ColumnDefault

from sqlalchemy.types import INT
from sqlalchemy.types import VARCHAR
from sqlalchemy.types import DATE
from sqlalchemy.types import DATETIME
from sqlalchemy.types import TIMESTAMP
from sqlalchemy.types import FLOAT
from sqlalchemy.types import BLOB
from sqlalchemy.types import Enum
from sqlalchemy.types import BOOLEAN
from sqlalchemy.types import BIGINT
from sqlalchemy.sql import func

DATABASE = {
    'ENGINE':'mysql',
    'NAME':'monitor',
    'USER':'username',
    'PASSWORD':'password',
    'HOST':'localhost',
    'PORT':3306,
}

connection_str = '%(ENGINE)s://%(USER)s:%(PASSWORD)s@%(HOST)s:%(PORT)s/%(NAME)s?charset=utf8' % DATABASE
engine = create_engine(connection_str, echo=False)
metadata = MetaData(bind=engine, reflect=True) # automatically load all tables from the bound database

table_scribe = Table('scribe',metadata,
                Column('id', INT, primary_key=True, autoincrement=True, nullable=False),
                Column('metric', VARCHAR(64), nullable=False),
                Column('snid', INT, nullable=False),
                Column('clientid', INT, nullable=False),
                Column('gameid', INT, nullable=False),
                Column('clientdate', VARCHAR(64), nullable=False),
                Column('lastupdatetime', TIMESTAMP, server_default=func.current_timestamp(), nullable=Fal
se),
                UniqueConstraint('metric', 'snid', 'clientid', 'gameid', 'clientdate'),
                useexisting = True
                )
Index('table_scribeindex', table_scribe.c.snid, table_scribe.c.clientid, table_scribe.c.gameid, table_scr
ibe.c.clientdate, table_scribe.c.lastupdatetime)

metadata.create_all() #create all tables


cols = ['metric', 'snid', 'clientid', 'gameid', 'clientdate']
pattern = r"\[([a-z_?a-z]+)_(\d+)_(\d+)_(\d+)_(\d{4}-\d{2}-\d{2})\]"
for line in sys.stdin:
    try:
        r = re.findall(pattern, line)
        if r:
            kwargs = dict(zip(cols, r[0]))
            kwargs['lastupdatetime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
            monitor_items = table_scribe.select().where('clientid=:clientid and metric=:metric and snid=:snid and gameid=:gameid and clientdate=:clientdate').execute(**kwargs)
            if monitor_items.rowcount == 0:
                table_scribe.insert().values(**kwargs).execute()
            else:
                table_scribe.update().where(table_scribe.c.id==monitor_items.fetchone().id).values(**kwargs).execute()
    except Exception, e:
        print 'Exception [%s] triggered when processing [%s]' % (e, line)

抱歉!评论已关闭.