实现一个简单的网络嗅探。
我们想写一个网络嗅探器,希望它有很好的通用性,能适应不同的嗅探场景。根据这样的需求,我们可以在功能是做分离。一个嗅探器可以分为两部分:
嗅探的原理
我们所熟知的TCP/IP网络协议栈中,数据时同网络接口层出入网络层然后传入运输层,然后根据端口号分流到不同的进程中。这是一个多路复用的过程。但,正是这种层层上传的过程,数据被层层解包:上层接收的数据丢失了底层网络的数据包结构信息。例如,IP层分组进入到TCP层变为报文段丢失了IP地址等。在网络编程中,一种方法叫“混杂模式”的套接字编程,可以绕过上述的过程,直接在TCP层接收网络接口层的信息。我们称其为原生套接字。
关键是在建立套接字时指定SOCK_RAW:
1 2 3
   |  sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)
 
 
  | 
还有要考虑系统的兼容性,不同系统在元素套接字上有不同的处理。详细差异见代码注释。
定义一个嗅探器的通用接口,采用OOP的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   |  class Sniffer:
      def __init__(self, raw_sniffer):         self.sniffer = raw_sniffer         self.target_address = None
      def recv_data(self, length=65536):         data, address = self.sniffer.recvfrom(length)         self.target_address = address         return address, data
      recvfrom = self.recv_data
      def get_current_target_address(self):         return self.target_address
 
 
  | 
 
考虑到网络嗅探采集数据的通用性,这里采用上下文管理的方式实现该函数。具体如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | import socket import os import contextlib
  @contextlib.contextmanager  def make_sniffer(host=None, port=None):     if host is None:         host = socket.gethostbyname(socket.gethostname())     if port is None:         port = 0     if os.name == 'nt':         protocol = socket.IPPROTO_IP     else:         protocol = socket.IPPROTO_ICMP              sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, protocol)     sniffer.bind((host, port))     sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)          if os.name == 'nt':         sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)     try:         yield Sniffer(sniffer)     finally:         if os.name == 'nt':             sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
 
   | 
 
这样,当我们要实现具体的场景需求是,只要在这个sniffer上下文下就可以了。例如:
1 2 3 4
   | with make_sniffer() as sniffer:          
 
   | 
 
基于通用嗅探器的一个例子
我们编写一个统计局域网下各IP的数据流量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
   |  import socket import os import contextlib
  def ip_counter():     with make_sniffer() as sniffer:         import collections         import time         import threading
          c = collections.Counter()         def task(c, interval):                          while True:                 time.sleep(interval)                 os.system("cls")                 for ip, count  in sorted(c.items(), key=lambda x: x[1], reverse=True):                     print(ip, count)         threading.Thread(target=task, args=(c, 1)).start()
          while True:             _, address = sniffer.recvfrom(65536)             ip = address[0]             c.update({ip: 1})
  if __name__ == '__main__':     ip_counter()
 
 
  | 
 
运行:
1 2 3 4 5 6 7
   | 192.168.0.111 17 14.114.244.1 11 220.249.245.146 2 192.168.0.1 1 120.4.90.229 1 64.233.188.188 1 101.27.78.51 1
   | 
 
可以看到局域网下不同主机的数据活跃度。
详细代码见simple-sniffer。
转载请包括本文地址:https://allenwind.github.io/blog/2396
更多文章请参考:https://allenwind.github.io/blog/archives/