歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux綜合 >> 學習Linux >> 基礎監控-同比告警,監控同比告警

基礎監控-同比告警,監控同比告警

日期:2017/3/3 18:06:50   编辑:學習Linux

基礎監控-同比告警,監控同比告警

基礎監控-同比告警,監控同比告警


  基礎監控的同比告警主要是針對服務器監控采集的指標,包括負載(load1/load5/load15)、平均CPU使用率、內存使用率、內外網流量、端口數量等,具體采集方法可參考《基礎監控-服務器監控》。

一、告警原理

  多個指標每分鐘1個數據,比較當前分鐘的前10分鐘的7天平均值,如果幅度超過100%並且絕對值相差達到M則算是一次異常,包括上升/下降異常,如果一個指標持續兩次上升異常或者持續兩次下降異常(不包括先上升後下降或者先下降後上升情況)則開始告警給機器的對應負責人(運維/開發)。例如當前是10號11:00,則比較的是10:59,10:58...10:50共10分鐘的最近7天的平均值,例如10:59則是10號、9、8...4共7天的10:59這個點的數據的平均值。實際情況一般是第二分鐘才解析了前一分鐘的數據,相當於當前是11點,則解析的是10:59這個點和它之前的10分鐘的7天平均值的比較。

二、數據來源

  基礎監控-服務器監控每分鐘會采集一份數據保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},則每分鐘一個redis hash,共7天 7*1440=10080個數據。實際情況保存的時候會多保留10分鐘的數據,即7天+10分鐘;由於reportTime是根據機器的實際時間來上報(這樣畫圖才能保證是准確的),而某些機器沒有NTP服務器或者其他原因導致時間不准,則reportTime則又會多種多樣,所以導致的結果是redis的hash會變多一些,當然這並不影響我們的數據獲取,因為整個同比告警就是根據畫圖來比較的,畫圖采用的是reportTime。采用hash保存到redis則不用每個ip讀取一次redis,可以減少N次網絡IO,大大提高程序速度。由於redis占據內存較多,大概10G左右,需要調整redis配置文件maxmemory的大小,不然redis會隨機刪除設置自動過期的key。

三、程序設計

1、DB設計

需要數據來源保存(redis)、異常展示表(mysql)、阈值配置表(mysql)、上次狀態(redis)。異常展示表保存所有ip的異常描述信息、異常持續時間,可在頁面展示;阈值配置表保存所有ip的阈值配置信息,每個ip的異常比率、各個指標的絕對值差、是否需要監控等;上次狀態則用來判斷是否需要告警的,持續2次同類異常則告警,使用redis保存即可。

mysql> show tables;
+-----------------------------------+
| Tables_in_machineMonitor_overLast |
+-----------------------------------+
| currentDisplay                    |
| monitorConf                       |
+-----------------------------------+

2、導入測試數據

測試數據是使用mysql和redis將數據從mysql導入redis中,線上數據是等程序完成後修改"服務器監控"的上報CGI來導入的。測試導入的時候遇到的坑參考《python處理json和redis hash的坑》

def initRedis(client):
    if client not in CONF.redisInfo:
        raise RedisNotFound("can not found redis %s in config" % client)
    try:
        pool = redis.ConnectionPool(**CONF.redisInfo[client])  # 線程安全
        red = redis.Redis(connection_pool=pool)
        red.randomkey()  # check
    except Exception, e:
        raise RedisException("connect redis %s failed: %s" % (client, str(e)))
    return red


def initDb(client):
    if client not in CONF.dbInfo:
        raise MysqlNotFound("can not found mysql db %s in config" % client)
    try:
        db = opMysql.OP_MYSQL(**CONF.dbInfo[client])
    except Exception, e:
        raise MysqlException("connect mysql %s failed: %s" % (client, str(e)))
    code, errMsg = db.connect()
    if 0 != code:
        raise MysqlException("connect mysql %s failed: %s" % (client, errMsg))
    return db

3、告警接口

調用"告警平台"接口,項目配置後可發送RTX/SMS/Wechat,可在頁面查看歷史記錄,輕松修改配置和臨時屏蔽等。調用接口直接使用urllib/urllib2庫即可

postData = urllib.urlencode(values)
req = urllib2.Request(CONF.amcUrl, postData)
response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT)
amcDict = json.loads(response.read())
code = int(amcDict["returnCode"])
errMsg = amcDict["errorMsg"]

4、解析來源的數據

將數據轉成可識別字典,並做排錯處理,將錯誤數據拒絕掉

5、使用numpy和panda求平均值

將來源數據解析後存入panda.DataFrame中,如果數據不存在則使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值並保留2位數。如果某一分鐘的7天數據全部獲取不到則拒絕解析當前值,如果是7天數據的部分數據獲取不到則剔除該點並在平均值的除數相對減一,使用NAN代替該點則在mean中可以解決這種情況。增加自定義函數比較每個列是否滿足幅度上升/下降100%(可配置)並且絕對值差達到M(可配置),是則返回True,否則返回False,判斷所有返回值是否均為True或者均為False,是則符合異常場景。

初始化DataFrame

for item in vd:
    if value is None or value[item] is None:
        vd[item][lastDayKey] = numpy.nan
    else:
        vd[item][lastDayKey] = value[item]           
vf = pandas.DataFrame(vd)
            
columns.append(vf.mean().round(2))  
indexes.append(lastMinKey)   

self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)

panda自定義函數比較並且判斷是否需要告警

for item in curValue:
    if curValue[item] is None:  # error
        continue
    else:
        curValue[item] = round(curValue[item], 2)
    
        def overLastCompute(v2, absSub):
            """
            :param v2: float  
            :param absSub: absolute subtract 
            :return: high/low/null
            """
            v1 = curValue[item]
            v2 = round(v2, 2)
            if 0 == v2:
                if v1 > absSub:
                    return "HIGH"
                if v1 < -absSub:
                    return "LOW"
                return "NULL"
            subVal = abs(v1 - v2)
            if subVal / v2 > CONF.RATIO and subVal > absSub:
                if v1 > v2:
                    return "HIGH"
                return "LOW"
            return "NULL"
        self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item])
        res = self.ipInfo[ip]["result"][item] == "HIGH"  # Series
        if all(i for i in res):
            resErr[item] = CONF.HIGH_ERR
            if CONF.HIGH_ERR == self.lastCache[str(ip)][item]:
                # will  Alert if switch on 
                pass
        else:
            res = self.ipInfo[ip]["result"][item] == "LOW"
            if all(i for i in res):
                resErr[item] = CONF.LOW_ERR
                if CONF.LOW_ERR == self.lastCache[str(ip)][item]:
                    # will  Alert if switch on 
                    pass

6、由於IP較多,並且主要邏輯在解析數據和panda的計算上,使用CPU比較多,則需要使用多進程,並且結合線程池將進程跑滿,別浪費進程資源。

step = ipNums / multiprocessing.cpu_count()
ipList = list()
i = 0
j = 1
processList = list()

for ip in self.ipInfo:
    ipS = str(ip)
    if ipS not in self.lastCache:
        self.lastCache[ipS] = copy.deepcopy(self.value)
    ipList.append(ip)
    i += 1
    if i == step * j or i == ipNums:
        j += 1

        def innerRun():
            wm = Pool.ThreadPool(CONF.POOL_SIZE)
            for myIp in ipList:
                kw = dict(ip=myIp, handlerKey=myIp)
                wm.addJob(self.handleOne, **kw)
            wm.waitForComplete()
            ipListNums = len(ipList)
            for tmp in xrange(ipListNums):
                res = wm.getResult()
                if res:
                    handlerKey, code, handlerRet, errMsg = res
                    if 0 != code:
                        continue
                    self.lastCache[str(handlerKey)] = handlerRet
        process = multiprocessing.Process(target=innerRun)
        process.start()
        processList.append(process)
        ipList = list()

for process in processList:
    process.join()

四、優化

指標監控v2則是同比告警的升級版,數據量將會大好幾倍,目前想到的優化如下

1、使用hbase替代redis

2、將程序做京廣容災,改成分布式運行,橫向擴展多套程序並列運行

http://xxxxxx/Linuxjc/1173650.html TechArticle

Copyright © Linux教程網 All Rights Reserved