实现一个简单的网络嗅探。

我们想写一个网络嗅探器,希望它有很好的通用性,能适应不同的嗅探场景。根据这样的需求,我们可以在功能是做分离。一个嗅探器可以分为两部分:

  • 网络嗅探部分

  • 对嗅探内容进行处理部分。前者需要系统兼容,后者根据不同的系统需求编写不同的处理嗅探内容的插件。

嗅探的原理

我们所熟知的TCP/IP网络协议栈中,数据时同网络接口层出入网络层然后传入运输层,然后根据端口号分流到不同的进程中。这是一个多路复用的过程。但,正是这种层层上传的过程,数据被层层解包:上层接收的数据丢失了底层网络的数据包结构信息。例如,IP层分组进入到TCP层变为报文段丢失了IP地址等。在网络编程中,一种方法叫“混杂模式”的套接字编程,可以绕过上述的过程,直接在TCP层接收网络接口层的信息。我们称其为原生套接字。

关键是在建立套接字时指定SOCK_RAW

1
2
3

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)

还有要考虑系统的兼容性,不同系统在元素套接字上有不同的处理。详细差异见代码注释。

定义一个嗅探器的通用接口,采用OOP的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

class Sniffer:

def __init__(self, raw_sniffer):
self.sniffer = raw_sniffer
self.target_address = None

def recv_data(self, length=65536):
data, address = self.sniffer.recvfrom(length)
self.target_address = address
return address, data

recvfrom = self.recv_data

def get_current_target_address(self):
return self.target_address

考虑到网络嗅探采集数据的通用性,这里采用上下文管理的方式实现该函数。具体如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import socket
import os
import contextlib

@contextlib.contextmanager # 上下文管理装饰器
def make_sniffer(host=None, port=None):
if host is None:
host = socket.gethostbyname(socket.gethostname())
if port is None:
port = 0
if os.name == 'nt':
protocol = socket.IPPROTO_IP
else:
protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)
sniffer.bind((host, port))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
try:
yield Sniffer(sniffer)
finally:
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

这样,当我们要实现具体的场景需求是,只要在这个sniffer上下文下就可以了。例如:

1
2
3
4
with make_sniffer() as sniffer:
# 调用recv_data接口接收数据和address,
# 根据这些信息实现满足特定场景的需求

基于通用嗅探器的一个例子

我们编写一个统计局域网下各IP的数据流量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

import socket
import os
import contextlib

def ip_counter():
with make_sniffer() as sniffer:
import collections
import time
import threading

c = collections.Counter()
def task(c, interval):
# add bit counter
while True:
time.sleep(interval)
os.system("cls")
for ip, count in sorted(c.items(), key=lambda x: x[1], reverse=True):
print(ip, count)
threading.Thread(target=task, args=(c, 1)).start()

while True:
_, address = sniffer.recvfrom(65536)
ip = address[0]
c.update({ip: 1})

if __name__ == '__main__':
ip_counter()

运行:

1
2
3
4
5
6
7
192.168.0.111 17
14.114.244.1 11
220.249.245.146 2
192.168.0.1 1
120.4.90.229 1
64.233.188.188 1
101.27.78.51 1

可以看到局域网下不同主机的数据活跃度。

详细代码见simple-sniffer

转载请包括本文地址:https://allenwind.github.io/blog/2396
更多文章请参考:https://allenwind.github.io/blog/archives/