基礎監控的同比告警主要是針對服務器監控采集的指標,包括負載(load1/load5/load15)、平均CPU使用率、內存使用率、內外網流量、端口數量等,具體采集方法可參考《基礎監控-服務器監控》。
一、告警原理
多個指標每分鐘1個數據,比較當前分鐘的前10分鐘的7天平均值,如果幅度超過100%並且絕對值相差達到M則算是一次異常,包括上升/下降異常,如果一個指標持續兩次上升異常或者持續兩次下降異常(不包括先上升後下降或者先下降後上升情況)則開始告警給機器的對應負責人(運維/開發)。例如當前是10號11:00,則比較的是10:59,10:58...10:50共10分鐘的最近7天的平均值,例如10:59則是10號、9、8...4共7天的10:59這個點的數據的平均值。實際情況一般是第二分鐘才解析了前一分鐘的數據,相當於當前是11點,則解析的是10:59這個點和它之前的10分鐘的7天平均值的比較。
二、數據來源
基礎監控-服務器監控每分鐘會采集一份數據保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},則每分鐘一個redis hash,共7天 7*1440=10080個數據。實際情況保存的時候會多保留10分鐘的數據,即7天+10分鐘;由於reportTime是根據機器的實際時間來上報(這樣畫圖才能保證是准確的),而某些機器沒有NTP服務器或者其他原因導致時間不准,則reportTime則又會多種多樣,所以導致的結果是redis的hash會變多一些,當然這並不影響我們的數據獲取,因為整個同比告警就是根據畫圖來比較的,畫圖采用的是reportTime。采用hash保存到redis則不用每個ip讀取一次redis,可以減少N次網絡IO,大大提高程序速度。由於redis占據內存較多,大概10G左右,需要調整redis配置文件maxmemory的大小,不然redis會隨機刪除設置自動過期的key。
三、程序設計
1、DB設計
需要數據來源保存(redis)、異常展示表(mysql)、阈值配置表(mysql)、上次狀態(redis)。異常展示表保存所有ip的異常描述信息、異常持續時間,可在頁面展示;阈值配置表保存所有ip的阈值配置信息,每個ip的異常比率、各個指標的絕對值差、是否需要監控等;上次狀態則用來判斷是否需要告警的,持續2次同類異常則告警,使用redis保存即可。
mysql> show tables; +-----------------------------------+ | Tables_in_machineMonitor_overLast | +-----------------------------------+ | currentDisplay | | monitorConf | +-----------------------------------+
2、導入測試數據
測試數據是使用mysql和redis將數據從mysql導入redis中,線上數據是等程序完成後修改"服務器監控"的上報CGI來導入的。測試導入的時候遇到的坑參考《python處理json和redis hash的坑》
def initRedis(client):
if client not in CONF.redisInfo:
raise RedisNotFound("can not found redis %s in config" % client)
try:
pool = redis.ConnectionPool(**CONF.redisInfo[client]) # 線程安全
red = redis.Redis(connection_pool=pool)
red.randomkey() # check
except Exception, e:
raise RedisException("connect redis %s failed: %s" % (client, str(e)))
return red
def initDb(client):
if client not in CONF.dbInfo:
raise MysqlNotFound("can not found mysql db %s in config" % client)
try:
db = opMysql.OP_MYSQL(**CONF.dbInfo[client])
except Exception, e:
raise MysqlException("connect mysql %s failed: %s" % (client, str(e)))
code, errMsg = db.connect()
if 0 != code:
raise MysqlException("connect mysql %s failed: %s" % (client, errMsg))
return db
3、告警接口
調用"告警平台"接口,項目配置後可發送RTX/SMS/Wechat,可在頁面查看歷史記錄,輕松修改配置和臨時屏蔽等。調用接口直接使用urllib/urllib2庫即可
postData = urllib.urlencode(values) req = urllib2.Request(CONF.amcUrl, postData) response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT) amcDict = json.loads(response.read()) code = int(amcDict["returnCode"]) errMsg = amcDict["errorMsg"]
4、解析來源的數據
將數據轉成可識別字典,並做排錯處理,將錯誤數據拒絕掉
5、使用numpy和panda求平均值
將來源數據解析後存入panda.DataFrame中,如果數據不存在則使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值並保留2位數。如果某一分鐘的7天數據全部獲取不到則拒絕解析當前值,如果是7天數據的部分數據獲取不到則剔除該點並在平均值的除數相對減一,使用NAN代替該點則在mean中可以解決這種情況。增加自定義函數比較每個列是否滿足幅度上升/下降100%(可配置)並且絕對值差達到M(可配置),是則返回True,否則返回False,判斷所有返回值是否均為True或者均為False,是則符合異常場景。
初始化DataFrame
for item in vd:
if value is None or value[item] is None:
vd[item][lastDayKey] = numpy.nan
else:
vd[item][lastDayKey] = value[item]
vf = pandas.DataFrame(vd)
columns.append(vf.mean().round(2))
indexes.append(lastMinKey)
self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)
panda自定義函數比較並且判斷是否需要告警
for item in curValue:
if curValue[item] is None: # error
continue
else:
curValue[item] = round(curValue[item], 2)
def overLastCompute(v2, absSub):
"""
:param v2: float
:param absSub: absolute subtract
:return: high/low/null
"""
v1 = curValue[item]
v2 = round(v2, 2)
if 0 == v2:
if v1 > absSub:
return "HIGH"
if v1 < -absSub:
return "LOW"
return "NULL"
subVal = abs(v1 - v2)
if subVal / v2 > CONF.RATIO and subVal > absSub:
if v1 > v2:
return "HIGH"
return "LOW"
return "NULL"
self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item])
res = self.ipInfo[ip]["result"][item] == "HIGH" # Series
if all(i for i in res):
resErr[item] = CONF.HIGH_ERR
if CONF.HIGH_ERR == self.lastCache[str(ip)][item]:
# will Alert if switch on
pass
else:
res = self.ipInfo[ip]["result"][item] == "LOW"
if all(i for i in res):
resErr[item] = CONF.LOW_ERR
if CONF.LOW_ERR == self.lastCache[str(ip)][item]:
# will Alert if switch on
pass
6、由於IP較多,並且主要邏輯在解析數據和panda的計算上,使用CPU比較多,則需要使用多進程,並且結合線程池將進程跑滿,別浪費進程資源。
step = ipNums / multiprocessing.cpu_count()
ipList = list()
i = 0
j = 1
processList = list()
for ip in self.ipInfo:
ipS = str(ip)
if ipS not in self.lastCache:
self.lastCache[ipS] = copy.deepcopy(self.value)
ipList.append(ip)
i += 1
if i == step * j or i == ipNums:
j += 1
def innerRun():
wm = Pool.ThreadPool(CONF.POOL_SIZE)
for myIp in ipList:
kw = dict(ip=myIp, handlerKey=myIp)
wm.addJob(self.handleOne, **kw)
wm.waitForComplete()
ipListNums = len(ipList)
for tmp in xrange(ipListNums):
res = wm.getResult()
if res:
handlerKey, code, handlerRet, errMsg = res
if 0 != code:
continue
self.lastCache[str(handlerKey)] = handlerRet
process = multiprocessing.Process(target=innerRun)
process.start()
processList.append(process)
ipList = list()
for process in processList:
process.join()
四、優化
指標監控v2則是同比告警的升級版,數據量將會大好幾倍,目前想到的優化如下
1、使用hbase替代redis
2、將程序做京廣容災,改成分布式運行,橫向擴展多套程序並列運行
http://xxxxxx/Linuxjc/1173650.html TechArticle