zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱
互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里
故此有了根据丢包率和时延变换联动mtr的需求
前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示
后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢
系统环境
Ubuntu 18.04.5 LTS+Python 3.6.9
python库
自带基本库,考虑到系统权限问题没有使用第三方库
ip查询
http://ip-api.com,免费版,限制频率45次/分钟,国外归属地准确率较高,国内查询一塌糊涂,国内推荐使用ipip
1 #!/usr/bin/env python3
2 #-*-coding:utf-8-*-
3 from collections import deque
4 import itertools,time
5 import queue,json
6 import argparse,sys,re,os,subprocess
7 import time,socket,random,string
8 import threading
9 from functools import reduce
10 import logging
11
12 ipqli=deque()
13 filename = os.path.realpath(sys.argv[0])
14 def logger():
15 dir = os.path.dirname(os.path.realpath(sys.argv[0]))
16 log_name = dir+"/log"
17 logger = logging.getLogger()
18 fh = logging.FileHandler(log_name)
19 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
20 fh.setFormatter(formater)
21 logger.setLevel(logging.DEBUG)
22 logger.addHandler(fh)
23 return logger
24 log = logger()
25 #ping程序,避免系统权限问题未使用ping3
26 class Ping:
27 def __init__(self,ip,count=20,udp_length=64):
28 ip = tuple(ip)
29 self.sip,self.tip,self.type,self.port,self.inver=ip
30 self.type = self.type.lower()
31 self.port = int(self.port)
32 self.count=count
33 self.inver = float(self.inver)
34 self.udp_length=udp_length
35 restime_name = "restime_deque"+"".join(ip).replace(".","")
36 pkloss_name = "pkloss_deque"+"".join(ip).replace(".","")
37 ipqevent = "event"+"".join(ip).replace(".","")
38 locals()[restime_name] = deque(maxlen=60)
39 locals()[pkloss_name] = deque(maxlen=60)
40 self.restime_deque = locals()[restime_name]
41 self.pkloss_deque = locals()[pkloss_name]
42 self.ret_restime_deque = globals()[restime_name]
43 self.ret_pkloss_deque = globals()[pkloss_name]
44 self.ipqevent = globals()[ipqevent]
45 self.compile= r"(?<=time=)d+.?d+(?= ms)"
46 def _tcp(self):
47 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 s.settimeout(1)
49 start_time = time.time()
50 res_count=0
51 try:
52 s.bind((self.sip,0))
53 s.connect((self.tip, self.port))
54 s.shutdown(socket.SHUT_RD)
55 value = (time.time() - start_time)*1000
56 self.restime_deque.append(value)
57 self.pkloss_deque.append(0)
58 res_count=1
59 except socket.timeout:
60 self.restime_deque.append(0)
61 self.pkloss_deque.append(1)
62 except OSError as e:
63 log.debug(e)
64 return 0,0
65 usetime = time.time()-start_time
66 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
67 return sleep_time,res_count
68 def _udp(self):
69 res_count=0
70 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
71 s.settimeout(1)
72 start_time = time.time()
73 data="".join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
74 try:
75 s.sendto(data.encode("utf-8"),(self.tip,self.port))
76 s.recv(1024)
77 value = (time.time() - start_time)*1000
78 self.restime_deque.append(value)
79 self.pkloss_deque.append(0)
80 res_count=1
81 except socket.timeout:
82 self.restime_deque.append(0)
83 self.pkloss_deque.append(1)
84 except OSError as e:
85 log.debug(e)
86 return 0,0
87 usetime = time.time()-start_time
88 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
89 return sleep_time,res_count
90 def _icmp(self):
91 res_count=0
92 start_time = time.time()
93 cmd = "ping -i %s -c 1 -W 1 -I %s %s"%(self.inver,self.sip,self.tip)
94 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode("utf8")
95 try:
96 value=re.findall(self.compile, ret,re.S)[0]
97 self.restime_deque.append(value)
98 self.pkloss_deque.append(0)
99 res_count=1
100 except:
101 self.pkloss_deque.append(1)
102 self.restime_deque.append(0)
103 usetime = time.time()-start_time
104 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
105 return sleep_time,res_count
106 def fastping(self):
107 getattr(self, "_"+self.type)()
108 def slow_ping(self):
109 index = 0
110 res_count=0
111 self.ipqevent.set()
112 while index<self.count:
113 sleep_time,count=getattr(self, "_"+self.type)()
114 index+=1
115 res_count+=count
116 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
117 break
118 time.sleep(sleep_time)
119 return index,res_count
120 def ping_value(self):
121 start_time = time.time()
122 count = self.count
123 rescount = self.count
124 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
125 fastli=[]
126 for x in range(self.count):
127 t = threading.Thread(target=self.fastping)
128 t.start()
129 fastli.append(t)
130 for th in fastli:
131 th.join()
132 else:
133 count,rescount = self.slow_ping()
134 rescount=count if rescount==0 else rescount
135 use_time = round(time.time()-start_time,4)
136 li = [self.restime_deque.pop() for x in range(count)]
137 pkli = [self.pkloss_deque.pop() for x in range(count)]
138 try:
139 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
140 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
141 return (round(restime,2),round(pkloss,2),use_time)
142 except Exception as e:
143 log.debug(e)
144 return 0,0,0
145 #server端代码
146 class Server():
147 def __init__(self,sock):
148 global ipqli
149 self.ipqli=ipqli
150 self.thli=[]
151 self.ipli = []
152 self.sock=sock
153 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
154 self.env = threading.Event()
155 @classmethod
156 def start(cls):
157 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
158 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
159 address = ("127.0.0.1",6590)
160 s.bind(address)
161 obj = cls(s)
162 ping_server=threading.Thread(target=obj.server)
163 ping_server.start()
164 obj.thli.append(ping_server)
165 create_t = threading.Thread(target=obj.create)
166 create_t.start()
167 obj.thli.append(create_t)
168 for t in obj.thli:
169 t.join()
170 def server(self):
171 while True:
172 self.sock.listen(100)
173 conn,addr = self.sock.accept()
174 data=conn.recv(1024)
175 data = data.decode("utf-8")
176 data = json.loads(data)
177 ip,item = data
178 restime_ipq = "restime_deque"+"".join(ip).replace(".","")
179 pkloss_ipq = "pkloss_deque"+"".join(ip).replace(".","")
180 ipqevent = "event"+"".join(ip).replace(".","")
181 if ip not in self.ipli:
182 globals()[restime_ipq] = deque(maxlen=30)
183 globals()[pkloss_ipq] = deque(maxlen=30)
184 globals()[ipqevent] = threading.Event()
185 self.ipqli.append(ip)
186 self.ipli.append(ip)
187 log.debug("create ipdeque %s %s"%(restime_ipq,pkloss_ipq))
188 self.env.set()
189 self.sendvalue(conn,ip,item)
190 conn.close()
191 def create(self):
192 while True:
193 self.env.wait()
194 try:
195 ip = self.ipqli.pop()
196 log.debug("create %s"%ip)
197 t=threading.Thread(target=self.makevalue,args=(ip,))
198 t.start()
199 except Exception as a:
200 log.debug(str(a))
201 if not self.ipqli:
202 self.env.clear()
203
204 def makevalue(self,ip):
205 restime_name = "restime_deque"+"".join(ip).replace(".","")
206 pkloss_name = "pkloss_deque"+"".join(ip).replace(".","")
207 restime_ipq = globals()[restime_name]
208 pkloss_ipq = globals()[pkloss_name]
209 obj = Ping(ip)
210 while len(restime_ipq) < 30 or len(pkloss_ipq) <30:
211 restime,pkloss,use_time=obj.ping_value()
212 restime_ipq.append((restime,use_time))
213 pkloss_ipq.append((pkloss,use_time))
214 else:
215 del restime_ipq
216 del pkloss_ipq
217 self.ipli.remove(ip)
218 log.debug("delete ipdeque %s %s"%(restime_name,pkloss_name))
219 def sendvalue(self,conn,ip,item):
220 fromat_ip="".join(ip).replace(".","")
221 _,tip,*arg=ip
222 restime_name = "restime_deque"+fromat_ip
223 pkloss_name = "pkloss_deque"+fromat_ip
224 ipqevent_name = "event"+fromat_ip
225 restime_ipq = globals()[restime_name]
226 pkloss_ipq = globals()[pkloss_name]
227 ipqevent = globals()[ipqevent_name]
228 mtr_dir = self.basedir+"/mtr_log/"+tip+"-"+time.strftime("%Y-%m-%d",time.localtime()) + ".log"
229 mtr_cmd = self.basedir + "/mtr.py"+" "+tip+" "+mtr_dir
230 if len(restime_ipq) < 2 and len(restime_ipq) <2:
231 ipqevent.clear()
232 try:
233 ipqevent.wait()
234 if item =="restime":
235 ret,use_time = restime_ipq.pop()
236 hisret,_=restime_ipq[-1]
237 if ret - hisret >20:
238 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
239 elif item =="pkloss":
240 ret,use_time = pkloss_ipq.pop()
241 if 100> ret >20:
242 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
243 except Exception as a:
244 ret = a
245 log.debug(str(ret))
246 conn.sendall(str(ret).encode())
247
248 #用户输入IP格式检查
249 class Ipcheck():
250 def __init__(self,sip,tip,item,ping_type,inver):
251 self.sip =sip
252 self.tip=tip
253 self.item=item
254 self.type = ping_type.lower()
255 self.inver=float(inver)
256 def check(self):
257 if self.item not in ["restime","pkloss"] or self.type not in ["icmp","tcp","udp"] or self.inver<0.2:
258 return False
259 elif not self.checkipformat():
260 return False
261 else:
262 return True
263 def check_fun(self,ip):
264 return int(ip)<256
265 def checkipformat(self):
266 try:
267 tiplist = self.tip.split(".")
268 tipformat = re.findall(r"^d+.d+.d+.d+$", self.tip)
269 if self.sip:
270 siplist = self.sip.split(".")
271 sipformat = re.findall(r"^d+.d+.d+.d+$", self.sip)
272 else:
273 siplist=[1,1,1,1]
274 sipformat=True
275 if not tipformat or not sipformat:
276 raise
277 check_ipli = tiplist+siplist
278 return self.checkiplength(check_ipli)
279 except:
280 return False
281 def checkiplength(self,check_ipli):
282 if list(itertools.filterfalse(self.check_fun, check_ipli)):
283 return False
284 else:
285 return True
286 def run():
287
288 cmd = "python3 %s -S server"%filename
289 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
290 #socket_client端,向server请求数据并返回给用户
291 def socket_client(ip,item):
292 try:
293 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
294 s.settimeout(3)
295 s.connect(("127.0.0.1",6590))
296 data = [ip,item]
297 data = json.dumps(data)
298 s.sendall(data.encode())
299 ret = s.recv(1024)
300 s.close()
301 print(ret.decode())
302 except socket.timeout as t:
303 log.debug(str(t))
304 s.close()
305 sys.exit(0)
306 except Exception as e:
307 print("server will start")
308 log.debug(str(e))
309 sys.exit(0)
310 if __name__ == "__main__":
311 parser = argparse.ArgumentParser(description="icmp for monitor")
312 parser.add_argument("-S",action = "store",dest="server")
313 parser.add_argument("-t",action = "store",dest="tip")
314 parser.add_argument("-s",action = "store",dest="sip")
315 parser.add_argument("-I",action="store",dest="item")
316 parser.add_argument("-i",action="store",dest="inver",default="1")
317 parser.add_argument("-T",action="store",dest="ping_type",default="icmp")
318 parser.add_argument("-p",action="store",dest="port",default="0")
319 args= parser.parse_args()
320 server_status_cmd = "ps -ef | grep "%s -S server" | grep -v grep | cut -c 9-16"%filename
321 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
322 if not server_status:
323 run()
324 if args.server:
325 Server.start()
326 sys.exit(0)
327 try:
328 tip = socket.gethostbyname(args.tip)
329 sip = args.sip
330 item = args.item
331 ping_type = args.ping_type
332 port = args.port
333 inver=args.inver
334 ip=(sip,tip,ping_type,port,inver)
335 except:
336 print("format error")
337 check = Ipcheck(sip, tip, item,ping_type,inver)
338 if not check.check():
339 print("""---------------------------Options-----------------------------------
340 -s --source ip address
341 -t --destination ip address
342 -I --item(restime/pkloss)
343 -T --type(icmp/tcp/udp default icmp)
344 -p --port(default 0)
345 -i --inver(default 1/min 0.2)
346 ---------------------------Example-----------------------------------
347 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80-------
348 """)
349 sys.exit(0)
350 socket_client(ip,item)
hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱
自学咖网 » zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱


