实现一个简单的网络嗅探。
我们想写一个网络嗅探器,希望它有很好的通用性,能适应不同的嗅探场景。根据这样的需求,我们可以在功能是做分离。一个嗅探器可以分为两部分:
嗅探的原理
我们所熟知的TCP/IP网络协议栈中,数据时同网络接口层出入网络层然后传入运输层,然后根据端口号分流到不同的进程中。这是一个多路复用的过程。但,正是这种层层上传的过程,数据被层层解包:上层接收的数据丢失了底层网络的数据包结构信息。例如,IP层分组进入到TCP层变为报文段丢失了IP地址等。在网络编程中,一种方法叫“混杂模式”的套接字编程,可以绕过上述的过程,直接在TCP层接收网络接口层的信息。我们称其为原生套接字。
关键是在建立套接字时指定SOCK_RAW
:
1 2 3
| sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)
|
还有要考虑系统的兼容性,不同系统在元素套接字上有不同的处理。详细差异见代码注释。
定义一个嗅探器的通用接口,采用OOP的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class Sniffer:
def __init__(self, raw_sniffer): self.sniffer = raw_sniffer self.target_address = None
def recv_data(self, length=65536): data, address = self.sniffer.recvfrom(length) self.target_address = address return address, data
recvfrom = self.recv_data
def get_current_target_address(self): return self.target_address
|
考虑到网络嗅探采集数据的通用性,这里采用上下文管理的方式实现该函数。具体如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import socket import os import contextlib
@contextlib.contextmanager def make_sniffer(host=None, port=None): if host is None: host = socket.gethostbyname(socket.gethostname()) if port is None: port = 0 if os.name == 'nt': protocol = socket.IPPROTO_IP else: protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol) sniffer.bind((host, port)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: yield Sniffer(sniffer) finally: if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
|
这样,当我们要实现具体的场景需求是,只要在这个sniffer上下文下就可以了。例如:
1 2 3 4
| with make_sniffer() as sniffer:
|
基于通用嗅探器的一个例子
我们编写一个统计局域网下各IP的数据流量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import socket import os import contextlib
def ip_counter(): with make_sniffer() as sniffer: import collections import time import threading
c = collections.Counter() def task(c, interval): while True: time.sleep(interval) os.system("cls") for ip, count in sorted(c.items(), key=lambda x: x[1], reverse=True): print(ip, count) threading.Thread(target=task, args=(c, 1)).start()
while True: _, address = sniffer.recvfrom(65536) ip = address[0] c.update({ip: 1})
if __name__ == '__main__': ip_counter()
|
运行:
1 2 3 4 5 6 7
| 192.168.0.111 17 14.114.244.1 11 220.249.245.146 2 192.168.0.1 1 120.4.90.229 1 64.233.188.188 1 101.27.78.51 1
|
可以看到局域网下不同主机的数据活跃度。
详细代码见simple-sniffer。
转载请包括本文地址:https://allenwind.github.io/blog/2396
更多文章请参考:https://allenwind.github.io/blog/archives/