005-网络层协议
网络层是OSI模型的第三层,负责数据包的路由选择和转发。本文介绍了网络层的主要功能(路径选择、数据包转发、拥塞控制等)和服务模型(数据报服务和虚电路服务)。重点讲解了IPv4协议的数据包格式和头部字段,包括版本、头部长度、总长度、标识、标志、片偏移等关键字段。通过Python示例展示了IPv4数据包的创建、校验和计算等核心处理逻辑。此外还涉及路由功能(静态/动态路由)、IP地址管理等内容,为理解网
005-网络层协议
难度:🟡 | 预计时间:200分钟 | 前置:004-物理层与数据链路层
学习目标
- 深入理解网络层的功能和服务模型
- 掌握IP协议的工作原理和数据包格式
- 了解IPv4地址分类和子网划分技术
- 理解路由算法和路由协议的工作机制
- 掌握ICMP、ARP等辅助协议的功能
- 了解IPv6协议的特点和过渡技术
- 理解网络地址转换(NAT)技术
内容正文
网络层概述
网络层(Network Layer)是OSI模型的第三层,负责在不同网络之间转发数据包,实现端到端的通信。网络层的核心任务是路径选择和数据包转发。
网络层的主要功能
- 路径选择:确定数据包从源到目标的最佳路径
- 数据包转发:根据路由表转发数据包
- 拥塞控制:避免网络过载
- 网络互连:连接不同类型的网络
- 地址管理:分配和管理网络地址
网络层服务模型
1. 数据报服务(无连接)
- 每个数据包独立路由
- 不保证顺序和可靠性
- 网络简单,适应性强
- Internet采用此模型
2. 虚电路服务(面向连接)
- 建立虚拟连接路径
- 保证顺序,可提供QoS
- 网络复杂,状态维护
- ATM、Frame Relay采用此模型
IP协议详解
IP协议(Internet Protocol)是网络层的核心协议,提供无连接的数据包传输服务。
IPv4协议
IPv4数据包格式
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|Version| IHL |Type of Service| Total Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Identification |Flags| Fragment Offset |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Time to Live | Protocol | Header Checksum |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Source Address |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Destination Address |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Options | Padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
IPv4头部字段详解
# 文件路径: ipv4_packet.py
# IPv4数据包处理
import struct
import socket
import binascii
from typing import Dict, Optional, Tuple
class IPv4Packet:
def __init__(self):
self.PROTOCOL_MAP = {
1: 'ICMP',
2: 'IGMP',
6: 'TCP',
17: 'UDP',
41: 'IPv6',
89: 'OSPF'
}
self.FLAG_NAMES = {
0x4000: 'DF (Don\'t Fragment)',
0x2000: 'MF (More Fragments)',
0x8000: 'Reserved'
}
def create_packet(self, src_ip: str, dst_ip: str, protocol: int,
payload: bytes, ttl: int = 64, tos: int = 0) -> bytes:
"""创建IPv4数据包"""
# IPv4头部字段
version = 4
ihl = 5 # 头部长度(20字节)
total_length = 20 + len(payload)
identification = 0x1234
flags_and_fragment = 0x4000 # DF标志
checksum = 0 # 先设为0,后面计算
# 转换IP地址
src_addr = struct.unpack('!I', socket.inet_aton(src_ip))[0]
dst_addr = struct.unpack('!I', socket.inet_aton(dst_ip))[0]
# 构建头部(不包括校验和)
header = struct.pack('!BBHHHBBHII',
(version << 4) | ihl, # Version + IHL
tos, # Type of Service
total_length, # Total Length
identification, # Identification
flags_and_fragment, # Flags + Fragment Offset
ttl, # Time to Live
protocol, # Protocol
checksum, # Header Checksum (0 for now)
src_addr, # Source Address
dst_addr) # Destination Address
# 计算头部校验和
checksum = self.calculate_checksum(header)
# 重新构建头部(包含校验和)
header = struct.pack('!BBHHHBBHII',
(version << 4) | ihl,
tos,
total_length,
identification,
flags_and_fragment,
ttl,
protocol,
checksum,
src_addr,
dst_addr)
return header + payload
def calculate_checksum(self, header: bytes) -> int:
"""计算IPv4头部校验和"""
# 确保长度为偶数
if len(header) % 2:
header += b'\x00'
checksum = 0
for i in range(0, len(header), 2):
word = (header[i] << 8) + header[i + 1]
checksum += word
# 处理进位
while checksum >> 16:
checksum = (checksum & 0xFFFF) + (checksum >> 16)
return ~checksum & 0xFFFF
def parse_packet(self, packet_data: bytes) -> Optional[Dict]:
"""解析IPv4数据包"""
if len(packet_data) < 20:
return None
try:
# 解析固定头部(20字节)
header = struct.unpack('!BBHHHBBHII', packet_data[:20])
version_ihl = header[0]
version = (version_ihl >> 4) & 0xF
ihl = version_ihl & 0xF
if version != 4:
return None
header_length = ihl * 4
if len(packet_data) < header_length:
return None
tos = header[1]
total_length = header[2]
identification = header[3]
flags_fragment = header[4]
ttl = header[5]
protocol = header[6]
checksum = header[7]
src_addr = header[8]
dst_addr = header[9]
# 解析标志位
flags = (flags_fragment >> 13) & 0x7
fragment_offset = flags_fragment & 0x1FFF
# 转换IP地址
src_ip = socket.inet_ntoa(struct.pack('!I', src_addr))
dst_ip = socket.inet_ntoa(struct.pack('!I', dst_addr))
# 验证校验和
header_bytes = packet_data[:header_length]
calculated_checksum = self.verify_checksum(header_bytes)
# 提取载荷
payload = packet_data[header_length:total_length] if total_length <= len(packet_data) else packet_data[header_length:]
# 解析选项(如果有)
options = b''
if header_length > 20:
options = packet_data[20:header_length]
return {
'version': version,
'header_length': header_length,
'tos': tos,
'total_length': total_length,
'identification': identification,
'flags': flags,
'fragment_offset': fragment_offset,
'ttl': ttl,
'protocol': protocol,
'protocol_name': self.PROTOCOL_MAP.get(protocol, 'Unknown'),
'checksum': checksum,
'checksum_valid': calculated_checksum,
'src_ip': src_ip,
'dst_ip': dst_ip,
'options': options,
'payload': payload,
'payload_length': len(payload)
}
except Exception as e:
print(f"解析数据包时出错: {e}")
return None
def verify_checksum(self, header: bytes) -> bool:
"""验证IPv4头部校验和"""
checksum = self.calculate_checksum(header)
return checksum == 0
def fragment_packet(self, packet: bytes, mtu: int) -> list:
"""分片IPv4数据包"""
packet_info = self.parse_packet(packet)
if not packet_info:
return []
header_length = packet_info['header_length']
payload = packet_info['payload']
# 计算每个分片的最大载荷长度(必须是8的倍数)
max_payload_per_fragment = ((mtu - header_length) // 8) * 8
if len(payload) <= max_payload_per_fragment:
return [packet] # 不需要分片
fragments = []
offset = 0
identification = packet_info['identification']
while offset < len(payload):
# 计算当前分片的载荷
fragment_payload = payload[offset:offset + max_payload_per_fragment]
# 设置标志位
more_fragments = (offset + len(fragment_payload)) < len(payload)
flags = 0x2000 if more_fragments else 0x0000 # MF标志
# 创建分片
fragment = self.create_fragment(
packet_info['src_ip'],
packet_info['dst_ip'],
packet_info['protocol'],
fragment_payload,
identification,
flags,
offset // 8, # 分片偏移以8字节为单位
packet_info['ttl'],
packet_info['tos']
)
fragments.append(fragment)
offset += len(fragment_payload)
return fragments
def create_fragment(self, src_ip: str, dst_ip: str, protocol: int,
payload: bytes, identification: int, flags: int,
fragment_offset: int, ttl: int = 64, tos: int = 0) -> bytes:
"""创建IPv4分片"""
version = 4
ihl = 5
total_length = 20 + len(payload)
flags_and_fragment = flags | fragment_offset
checksum = 0
src_addr = struct.unpack('!I', socket.inet_aton(src_ip))[0]
dst_addr = struct.unpack('!I', socket.inet_aton(dst_ip))[0]
header = struct.pack('!BBHHHBBHII',
(version << 4) | ihl,
tos,
total_length,
identification,
flags_and_fragment,
ttl,
protocol,
checksum,
src_addr,
dst_addr)
checksum = self.calculate_checksum(header)
header = struct.pack('!BBHHHBBHII',
(version << 4) | ihl,
tos,
total_length,
identification,
flags_and_fragment,
ttl,
protocol,
checksum,
src_addr,
dst_addr)
return header + payload
def display_packet(self, packet_info: Dict):
"""显示数据包信息"""
print("IPv4数据包信息:")
print(f" 版本: {packet_info['version']}")
print(f" 头部长度: {packet_info['header_length']} 字节")
print(f" 服务类型: 0x{packet_info['tos']:02x}")
print(f" 总长度: {packet_info['total_length']} 字节")
print(f" 标识: 0x{packet_info['identification']:04x}")
print(f" 标志: 0x{packet_info['flags']:x}")
print(f" 分片偏移: {packet_info['fragment_offset']}")
print(f" 生存时间: {packet_info['ttl']}")
print(f" 协议: {packet_info['protocol']} ({packet_info['protocol_name']})")
print(f" 头部校验和: 0x{packet_info['checksum']:04x} ({'有效' if packet_info['checksum_valid'] else '无效'})")
print(f" 源IP: {packet_info['src_ip']}")
print(f" 目标IP: {packet_info['dst_ip']}")
print(f" 载荷长度: {packet_info['payload_length']} 字节")
if packet_info['options']:
print(f" 选项: {binascii.hexlify(packet_info['options']).decode()}")
# 使用示例
ip_handler = IPv4Packet()
# 创建一个ICMP数据包
src_ip = "192.168.1.100"
dst_ip = "8.8.8.8"
protocol = 1 # ICMP
payload = b"Hello, IPv4!" + b"\x00" * 20
print("创建IPv4数据包:")
packet = ip_handler.create_packet(src_ip, dst_ip, protocol, payload)
print(f"数据包长度: {len(packet)} 字节")
print(f"数据包内容: {binascii.hexlify(packet[:40]).decode()}...")
print("\n" + "="*60 + "\n")
# 解析数据包
print("解析IPv4数据包:")
packet_info = ip_handler.parse_packet(packet)
if packet_info:
ip_handler.display_packet(packet_info)
print("\n" + "="*60 + "\n")
# 测试分片
print("测试IPv4分片:")
large_payload = b"A" * 2000 # 大载荷
large_packet = ip_handler.create_packet(src_ip, dst_ip, protocol, large_payload)
fragments = ip_handler.fragment_packet(large_packet, mtu=1500)
print(f"原始数据包长度: {len(large_packet)} 字节")
print(f"分片数量: {len(fragments)}")
for i, fragment in enumerate(fragments):
frag_info = ip_handler.parse_packet(fragment)
if frag_info:
print(f"分片 {i+1}: 长度={frag_info['total_length']}, 偏移={frag_info['fragment_offset']}, MF={'是' if frag_info['flags'] & 0x1 else '否'}")
IPv4地址分类和特殊地址
地址分类
# 文件路径: ipv4_addressing.py
# IPv4地址分类和管理
import ipaddress
from typing import Dict, List, Tuple
class IPv4AddressManager:
def __init__(self):
# 地址类别定义
self.address_classes = {
'A': {'range': (1, 126), 'default_mask': '/8', 'hosts_per_network': 16777214},
'B': {'range': (128, 191), 'default_mask': '/16', 'hosts_per_network': 65534},
'C': {'range': (192, 223), 'default_mask': '/24', 'hosts_per_network': 254},
'D': {'range': (224, 239), 'default_mask': 'N/A', 'hosts_per_network': 'Multicast'},
'E': {'range': (240, 255), 'default_mask': 'N/A', 'hosts_per_network': 'Reserved'}
}
# 私有地址范围
self.private_ranges = [
ipaddress.IPv4Network('10.0.0.0/8'),
ipaddress.IPv4Network('172.16.0.0/12'),
ipaddress.IPv4Network('192.168.0.0/16')
]
# 特殊地址
self.special_addresses = {
'0.0.0.0/8': '本网络',
'127.0.0.0/8': '环回地址',
'169.254.0.0/16': '链路本地地址(APIPA)',
'224.0.0.0/4': '多播地址',
'255.255.255.255/32': '有限广播地址'
}
def classify_address(self, ip_str: str) -> Dict:
"""分类IP地址"""
try:
ip = ipaddress.IPv4Address(ip_str)
first_octet = int(str(ip).split('.')[0])
# 确定地址类别
address_class = None
for class_name, class_info in self.address_classes.items():
if class_info['range'][0] <= first_octet <= class_info['range'][1]:
address_class = class_name
break
# 检查特殊属性
is_private = any(ip in network for network in self.private_ranges)
is_loopback = ip.is_loopback
is_multicast = ip.is_multicast
is_reserved = ip.is_reserved
is_link_local = ip.is_link_local
# 检查特殊地址范围
special_type = None
for addr_range, description in self.special_addresses.items():
if ip in ipaddress.IPv4Network(addr_range):
special_type = description
break
return {
'ip': str(ip),
'class': address_class,
'default_mask': self.address_classes[address_class]['default_mask'] if address_class else 'N/A',
'hosts_per_network': self.address_classes[address_class]['hosts_per_network'] if address_class else 'N/A',
'is_private': is_private,
'is_loopback': is_loopback,
'is_multicast': is_multicast,
'is_reserved': is_reserved,
'is_link_local': is_link_local,
'special_type': special_type
}
except ValueError as e:
return {'error': str(e)}
def subnet_calculator(self, network_str: str) -> Dict:
"""子网计算器"""
try:
network = ipaddress.IPv4Network(network_str, strict=False)
# 基本信息
network_address = network.network_address
broadcast_address = network.broadcast_address
netmask = network.netmask
prefix_length = network.prefixlen
# 主机信息
num_addresses = network.num_addresses
num_hosts = num_addresses - 2 if num_addresses > 2 else 0
# 第一个和最后一个主机地址
first_host = network_address + 1 if num_hosts > 0 else None
last_host = broadcast_address - 1 if num_hosts > 0 else None
# 通配符掩码
wildcard_mask = ipaddress.IPv4Address(int(netmask) ^ 0xFFFFFFFF)
return {
'network': str(network),
'network_address': str(network_address),
'broadcast_address': str(broadcast_address),
'netmask': str(netmask),
'wildcard_mask': str(wildcard_mask),
'prefix_length': prefix_length,
'num_addresses': num_addresses,
'num_hosts': num_hosts,
'first_host': str(first_host) if first_host else 'N/A',
'last_host': str(last_host) if last_host else 'N/A',
'address_class': self.classify_address(str(network_address))['class']
}
except ValueError as e:
return {'error': str(e)}
def vlsm_subnetting(self, network_str: str, host_requirements: List[int]) -> List[Dict]:
"""可变长子网掩码(VLSM)子网划分"""
try:
main_network = ipaddress.IPv4Network(network_str, strict=False)
# 按主机需求降序排序
sorted_requirements = sorted(host_requirements, reverse=True)
subnets = []
current_network = main_network
for hosts_needed in sorted_requirements:
# 计算所需的主机位数(+2为网络地址和广播地址)
total_addresses_needed = hosts_needed + 2
# 计算所需的主机位数
host_bits = 0
while (2 ** host_bits) < total_addresses_needed:
host_bits += 1
# 计算新的前缀长度
new_prefix_length = 32 - host_bits
# 检查是否可以从当前网络中分配
if new_prefix_length < current_network.prefixlen:
subnets.append({
'error': f'无法为 {hosts_needed} 个主机分配子网',
'hosts_needed': hosts_needed
})
continue
# 创建子网
try:
subnet = list(current_network.subnets(new_prefix=new_prefix_length))[0]
subnet_info = self.subnet_calculator(str(subnet))
subnet_info['hosts_needed'] = hosts_needed
subnet_info['hosts_available'] = subnet_info['num_hosts']
subnet_info['efficiency'] = f"{(hosts_needed / subnet_info['num_hosts'] * 100):.1f}%" if subnet_info['num_hosts'] > 0 else 'N/A'
subnets.append(subnet_info)
# 更新当前可用网络(下一个子网)
remaining_subnets = list(current_network.subnets(new_prefix=new_prefix_length))
if len(remaining_subnets) > 1:
# 找到下一个可用的网络空间
next_network_start = subnet.broadcast_address + 1
remaining_space = ipaddress.IPv4Network(f"{next_network_start}/{current_network.prefixlen}", strict=False)
current_network = remaining_space
else:
# 没有更多空间
current_network = None
break
except ValueError:
subnets.append({
'error': f'无法创建子网,主机需求: {hosts_needed}',
'hosts_needed': hosts_needed
})
return subnets
except Exception as e:
return [{'error': str(e)}]
def supernetting(self, networks: List[str]) -> Dict:
"""超网聚合(路由汇总)"""
try:
# 转换为网络对象
network_objects = [ipaddress.IPv4Network(net, strict=False) for net in networks]
# 找到最小的公共超网
supernet = ipaddress.collapse_addresses(network_objects)
supernet_list = list(supernet)
if len(supernet_list) == 1:
# 可以完全聚合
result_network = supernet_list[0]
return {
'supernet': str(result_network),
'original_networks': networks,
'aggregated': True,
'prefix_length': result_network.prefixlen,
'address_space_saved': len(networks) - 1
}
else:
# 无法完全聚合
return {
'supernet': [str(net) for net in supernet_list],
'original_networks': networks,
'aggregated': False,
'reason': '网络不连续,无法完全聚合'
}
except Exception as e:
return {'error': str(e)}
# 使用示例
addr_mgr = IPv4AddressManager()
# 地址分类测试
test_addresses = [
'10.0.0.1', # 私有A类
'172.16.1.1', # 私有B类
'192.168.1.1', # 私有C类
'8.8.8.8', # 公网地址
'127.0.0.1', # 环回地址
'224.0.0.1', # 多播地址
'169.254.1.1' # 链路本地地址
]
print("IPv4地址分类测试:")
for addr in test_addresses:
info = addr_mgr.classify_address(addr)
print(f"{addr}: 类别={info.get('class', 'N/A')}, 私有={info.get('is_private', False)}, 特殊={info.get('special_type', '无')}")
print("\n" + "="*60 + "\n")
# 子网计算测试
print("子网计算测试:")
test_networks = ['192.168.1.0/24', '10.0.0.0/8', '172.16.0.0/16']
for network in test_networks:
info = addr_mgr.subnet_calculator(network)
if 'error' not in info:
print(f"网络: {info['network']}")
print(f" 网络地址: {info['network_address']}")
print(f" 广播地址: {info['broadcast_address']}")
print(f" 子网掩码: {info['netmask']}")
print(f" 可用主机: {info['num_hosts']}")
print(f" 主机范围: {info['first_host']} - {info['last_host']}")
print()
print("\n" + "="*60 + "\n")
# VLSM子网划分测试
print("VLSM子网划分测试:")
network = '192.168.1.0/24'
host_requirements = [50, 25, 10, 5] # 不同部门的主机需求
print(f"原网络: {network}")
print(f"主机需求: {host_requirements}")
print("\n子网划分结果:")
subnets = addr_mgr.vlsm_subnetting(network, host_requirements)
for i, subnet in enumerate(subnets):
if 'error' not in subnet:
print(f"子网 {i+1}: {subnet['network']}")
print(f" 需求主机: {subnet['hosts_needed']}")
print(f" 可用主机: {subnet['hosts_available']}")
print(f" 利用率: {subnet['efficiency']}")
print(f" 主机范围: {subnet['first_host']} - {subnet['last_host']}")
print()
else:
print(f"错误: {subnet['error']}")
路由算法和协议
路由是网络层的核心功能,负责为数据包选择从源到目标的最佳路径。
路由算法分类
距离向量算法(Bellman-Ford)
# 文件路径: routing_algorithms.py
# 路由算法实现
import sys
from typing import Dict, List, Tuple, Optional
import json
class DistanceVectorRouter:
def __init__(self, router_id: str):
self.router_id = router_id
self.routing_table = {} # {destination: (next_hop, distance)}
self.neighbors = {} # {neighbor_id: link_cost}
self.network_topology = {} # 网络拓扑信息
self.max_distance = float('inf')
def add_neighbor(self, neighbor_id: str, link_cost: int):
"""添加邻居路由器"""
self.neighbors[neighbor_id] = link_cost
# 直连路由
self.routing_table[neighbor_id] = (neighbor_id, link_cost)
def update_routing_table(self, neighbor_id: str, neighbor_table: Dict) -> bool:
"""根据邻居的路由表更新自己的路由表"""
updated = False
for destination, (next_hop, distance) in neighbor_table.items():
if destination == self.router_id:
continue # 忽略到自己的路由
# 计算通过该邻居到目标的总距离
new_distance = self.neighbors[neighbor_id] + distance
# 检查是否需要更新路由表
if (destination not in self.routing_table or
new_distance < self.routing_table[destination][1]):
self.routing_table[destination] = (neighbor_id, new_distance)
updated = True
print(f"路由器 {self.router_id}: 更新到 {destination} 的路由,下一跳={neighbor_id}, 距离={new_distance}")
return updated
def get_routing_table_copy(self) -> Dict:
"""获取路由表副本(用于发送给邻居)"""
return self.routing_table.copy()
def print_routing_table(self):
"""打印路由表"""
print(f"\n路由器 {self.router_id} 的路由表:")
print("目标\t\t下一跳\t\t距离")
print("-" * 40)
for destination, (next_hop, distance) in sorted(self.routing_table.items()):
print(f"{destination}\t\t{next_hop}\t\t{distance}")
class LinkStateRouter:
def __init__(self, router_id: str):
self.router_id = router_id
self.neighbors = {} # {neighbor_id: link_cost}
self.link_state_db = {} # 链路状态数据库
self.routing_table = {} # 最短路径树
self.sequence_number = 0
def add_neighbor(self, neighbor_id: str, link_cost: int):
"""添加邻居"""
self.neighbors[neighbor_id] = link_cost
def generate_lsa(self) -> Dict:
"""生成链路状态通告(LSA)"""
self.sequence_number += 1
return {
'router_id': self.router_id,
'sequence_number': self.sequence_number,
'neighbors': self.neighbors.copy()
}
def update_link_state_db(self, lsa: Dict):
"""更新链路状态数据库"""
router_id = lsa['router_id']
# 检查序列号(简化版本)
if (router_id not in self.link_state_db or
lsa['sequence_number'] > self.link_state_db[router_id]['sequence_number']):
self.link_state_db[router_id] = lsa
print(f"路由器 {self.router_id}: 更新 {router_id} 的链路状态信息")
return True
return False
def dijkstra_algorithm(self):
"""Dijkstra最短路径算法"""
# 初始化
distances = {self.router_id: 0}
previous = {}
unvisited = set([self.router_id])
# 添加所有已知的路由器
for router_id in self.link_state_db:
if router_id != self.router_id:
distances[router_id] = float('inf')
unvisited.add(router_id)
# 添加直连邻居
for neighbor_id in self.neighbors:
if neighbor_id not in distances:
distances[neighbor_id] = float('inf')
unvisited.add(neighbor_id)
while unvisited:
# 选择距离最小的未访问节点
current = min(unvisited, key=lambda x: distances[x])
if distances[current] == float('inf'):
break # 剩余节点不可达
unvisited.remove(current)
# 更新邻居的距离
current_neighbors = {}
if current == self.router_id:
current_neighbors = self.neighbors
elif current in self.link_state_db:
current_neighbors = self.link_state_db[current]['neighbors']
for neighbor, link_cost in current_neighbors.items():
if neighbor in unvisited:
new_distance = distances[current] + link_cost
if new_distance < distances[neighbor]:
distances[neighbor] = new_distance
previous[neighbor] = current
# 构建路由表
self.routing_table = {}
for destination, distance in distances.items():
if destination != self.router_id and distance != float('inf'):
# 找到下一跳
next_hop = destination
path = [destination]
while next_hop in previous:
next_hop = previous[next_hop]
path.append(next_hop)
if next_hop == self.router_id:
# 找到了路径,下一跳是路径中倒数第二个节点
if len(path) > 1:
next_hop = path[-2]
else:
next_hop = destination
break
self.routing_table[destination] = (next_hop, distance)
def print_routing_table(self):
"""打印路由表"""
print(f"\n路由器 {self.router_id} 的路由表 (OSPF):")
print("目标\t\t下一跳\t\t距离")
print("-" * 40)
for destination, (next_hop, distance) in sorted(self.routing_table.items()):
print(f"{destination}\t\t{next_hop}\t\t{distance}")
def print_link_state_db(self):
"""打印链路状态数据库"""
print(f"\n路由器 {self.router_id} 的链路状态数据库:")
for router_id, lsa in self.link_state_db.items():
print(f"路由器 {router_id} (序列号: {lsa['sequence_number']}):")
for neighbor, cost in lsa['neighbors'].items():
print(f" -> {neighbor}: {cost}")
class NetworkSimulator:
def __init__(self):
self.dv_routers = {} # 距离向量路由器
self.ls_routers = {} # 链路状态路由器
def create_dv_network(self):
"""创建距离向量路由网络"""
# 创建路由器
self.dv_routers = {
'A': DistanceVectorRouter('A'),
'B': DistanceVectorRouter('B'),
'C': DistanceVectorRouter('C'),
'D': DistanceVectorRouter('D')
}
# 配置网络拓扑
# A -- 1 -- B -- 3 -- D
# | | |
# 4 2 1
# | | |
# C ------- 1 ------- +
self.dv_routers['A'].add_neighbor('B', 1)
self.dv_routers['A'].add_neighbor('C', 4)
self.dv_routers['B'].add_neighbor('A', 1)
self.dv_routers['B'].add_neighbor('C', 2)
self.dv_routers['B'].add_neighbor('D', 3)
self.dv_routers['C'].add_neighbor('A', 4)
self.dv_routers['C'].add_neighbor('B', 2)
self.dv_routers['C'].add_neighbor('D', 1)
self.dv_routers['D'].add_neighbor('B', 3)
self.dv_routers['D'].add_neighbor('C', 1)
def simulate_dv_convergence(self, max_iterations: int = 10):
"""模拟距离向量路由收敛过程"""
print("开始距离向量路由收敛仿真...")
for iteration in range(max_iterations):
print(f"\n=== 迭代 {iteration + 1} ===")
updated = False
# 每个路由器与邻居交换路由表
for router_id, router in self.dv_routers.items():
for neighbor_id in router.neighbors:
neighbor_table = self.dv_routers[neighbor_id].get_routing_table_copy()
if router.update_routing_table(neighbor_id, neighbor_table):
updated = True
# 打印当前路由表
for router in self.dv_routers.values():
router.print_routing_table()
if not updated:
print(f"\n路由收敛完成,共用了 {iteration + 1} 次迭代")
break
else:
print(f"\n达到最大迭代次数 {max_iterations},可能未完全收敛")
def create_ls_network(self):
"""创建链路状态路由网络"""
# 创建路由器
self.ls_routers = {
'A': LinkStateRouter('A'),
'B': LinkStateRouter('B'),
'C': LinkStateRouter('C'),
'D': LinkStateRouter('D')
}
# 配置相同的网络拓扑
self.ls_routers['A'].add_neighbor('B', 1)
self.ls_routers['A'].add_neighbor('C', 4)
self.ls_routers['B'].add_neighbor('A', 1)
self.ls_routers['B'].add_neighbor('C', 2)
self.ls_routers['B'].add_neighbor('D', 3)
self.ls_routers['C'].add_neighbor('A', 4)
self.ls_routers['C'].add_neighbor('B', 2)
self.ls_routers['C'].add_neighbor('D', 1)
self.ls_routers['D'].add_neighbor('B', 3)
self.ls_routers['D'].add_neighbor('C', 1)
def simulate_ls_convergence(self):
"""模拟链路状态路由收敛过程"""
print("\n开始链路状态路由收敛仿真...")
# 1. 每个路由器生成LSA
lsas = {}
for router_id, router in self.ls_routers.items():
lsas[router_id] = router.generate_lsa()
# 2. 泛洪LSA到所有路由器
for router_id, router in self.ls_routers.items():
for lsa_id, lsa in lsas.items():
router.update_link_state_db(lsa)
# 3. 每个路由器运行Dijkstra算法
print("\n运行Dijkstra算法计算最短路径...")
for router in self.ls_routers.values():
router.dijkstra_algorithm()
# 4. 显示结果
for router in self.ls_routers.values():
router.print_link_state_db()
router.print_routing_table()
# 使用示例
simulator = NetworkSimulator()
# 距离向量路由仿真
print("=" * 60)
print("距离向量路由算法仿真")
print("=" * 60)
simulator.create_dv_network()
simulator.simulate_dv_convergence()
# 链路状态路由仿真
print("\n" + "=" * 60)
print("链路状态路由算法仿真")
print("=" * 60)
simulator.create_ls_network()
simulator.simulate_ls_convergence()
常用路由协议
RIP(Routing Information Protocol)
RIP特点:
- 基于距离向量算法
- 最大跳数15(16表示不可达)
- 30秒周期性更新
- 简单但收敛慢
OSPF(Open Shortest Path First)
OSPF特点:
- 基于链路状态算法
- 支持VLSM和CIDR
- 快速收敛
- 支持区域划分
- 支持负载均衡
BGP(Border Gateway Protocol)
BGP特点:
- 基于路径向量算法
- 用于自治系统间路由
- 支持策略路由
- 防止路由环路
- 互联网的核心路由协议
ICMP协议
ICMP(Internet Control Message Protocol)用于报告网络错误和提供网络诊断信息。
ICMP消息类型
# 文件路径: icmp_protocol.py
# ICMP协议实现
import struct
import socket
import time
import os
from typing import Dict, Optional, Tuple
class ICMPProtocol:
def __init__(self):
# ICMP消息类型
self.MESSAGE_TYPES = {
0: 'Echo Reply',
3: 'Destination Unreachable',
4: 'Source Quench',
5: 'Redirect',
8: 'Echo Request',
11: 'Time Exceeded',
12: 'Parameter Problem',
13: 'Timestamp Request',
14: 'Timestamp Reply'
}
# 目标不可达代码
self.DEST_UNREACH_CODES = {
0: 'Network Unreachable',
1: 'Host Unreachable',
2: 'Protocol Unreachable',
3: 'Port Unreachable',
4: 'Fragmentation Needed and DF Set',
5: 'Source Route Failed'
}
# 超时代码
self.TIME_EXCEEDED_CODES = {
0: 'TTL Exceeded in Transit',
1: 'Fragment Reassembly Time Exceeded'
}
def create_icmp_packet(self, msg_type: int, code: int, data: bytes = b'') -> bytes:
"""创建ICMP数据包"""
# ICMP头部:类型(1) + 代码(1) + 校验和(2) + 其他(4)
checksum = 0 # 先设为0
other = 0 # 根据消息类型设置
# 对于Echo Request/Reply,使用标识符和序列号
if msg_type in [0, 8]:
identifier = os.getpid() & 0xFFFF
sequence = 1
other = (identifier << 16) | sequence
# 构建头部(不包括校验和)
header = struct.pack('!BBHI', msg_type, code, checksum, other)
# 计算校验和
packet = header + data
checksum = self.calculate_checksum(packet)
# 重新构建头部(包含校验和)
header = struct.pack('!BBHI', msg_type, code, checksum, other)
return header + data
def calculate_checksum(self, data: bytes) -> int:
"""计算ICMP校验和"""
# 确保长度为偶数
if len(data) % 2:
data += b'\x00'
checksum = 0
for i in range(0, len(data), 2):
word = (data[i] << 8) + data[i + 1]
checksum += word
# 处理进位
while checksum >> 16:
checksum = (checksum & 0xFFFF) + (checksum >> 16)
return ~checksum & 0xFFFF
def parse_icmp_packet(self, packet_data: bytes) -> Optional[Dict]:
"""解析ICMP数据包"""
if len(packet_data) < 8:
return None
try:
# 解析ICMP头部
msg_type, code, checksum, other = struct.unpack('!BBHI', packet_data[:8])
# 提取数据部分
data = packet_data[8:]
# 验证校验和
packet_copy = packet_data[:2] + b'\x00\x00' + packet_data[4:]
calculated_checksum = self.calculate_checksum(packet_copy)
checksum_valid = (calculated_checksum == 0)
# 解析其他字段(根据消息类型)
identifier = None
sequence = None
if msg_type in [0, 8]: # Echo Request/Reply
identifier = (other >> 16) & 0xFFFF
sequence = other & 0xFFFF
return {
'type': msg_type,
'type_name': self.MESSAGE_TYPES.get(msg_type, 'Unknown'),
'code': code,
'code_name': self.get_code_name(msg_type, code),
'checksum': checksum,
'checksum_valid': checksum_valid,
'identifier': identifier,
'sequence': sequence,
'data': data,
'data_length': len(data)
}
except Exception as e:
print(f"解析ICMP数据包时出错: {e}")
return None
def get_code_name(self, msg_type: int, code: int) -> str:
"""获取代码名称"""
if msg_type == 3:
return self.DEST_UNREACH_CODES.get(code, f'Unknown Code {code}')
elif msg_type == 11:
return self.TIME_EXCEEDED_CODES.get(code, f'Unknown Code {code}')
else:
return f'Code {code}'
def ping(self, host: str, count: int = 4, timeout: int = 3) -> Dict:
"""实现ping功能"""
try:
# 解析主机名
dest_addr = socket.gethostbyname(host)
print(f"PING {host} ({dest_addr})")
# 创建原始套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
sock.settimeout(timeout)
statistics = {
'packets_sent': 0,
'packets_received': 0,
'packet_loss': 0,
'min_rtt': float('inf'),
'max_rtt': 0,
'avg_rtt': 0,
'rtts': []
}
for i in range(count):
# 创建ICMP Echo Request
data = f"Hello, ICMP! {i}".encode() + struct.pack('!d', time.time())
packet = self.create_icmp_packet(8, 0, data) # Echo Request
# 发送数据包
send_time = time.time()
sock.sendto(packet, (dest_addr, 0))
statistics['packets_sent'] += 1
try:
# 接收响应
while True:
recv_data, addr = sock.recvfrom(1024)
recv_time = time.time()
# 跳过IP头部(通常20字节)
icmp_data = recv_data[20:]
# 解析ICMP响应
icmp_info = self.parse_icmp_packet(icmp_data)
if icmp_info and icmp_info['type'] == 0: # Echo Reply
rtt = (recv_time - send_time) * 1000
statistics['packets_received'] += 1
statistics['rtts'].append(rtt)
print(f"Reply from {addr[0]}: bytes={len(icmp_info['data'])} time={rtt:.1f}ms TTL=64")
break
except socket.timeout:
print(f"Request timeout for icmp_seq {i}")
if i < count - 1:
time.sleep(1)
sock.close()
# 计算统计信息
if statistics['rtts']:
statistics['min_rtt'] = min(statistics['rtts'])
statistics['max_rtt'] = max(statistics['rtts'])
statistics['avg_rtt'] = sum(statistics['rtts']) / len(statistics['rtts'])
statistics['packet_loss'] = ((statistics['packets_sent'] - statistics['packets_received']) / statistics['packets_sent']) * 100
# 打印统计信息
print(f"\n--- {host} ping statistics ---")
print(f"{statistics['packets_sent']} packets transmitted, {statistics['packets_received']} received, {statistics['packet_loss']:.1f}% packet loss")
if statistics['rtts']:
print(f"round-trip min/avg/max = {statistics['min_rtt']:.1f}/{statistics['avg_rtt']:.1f}/{statistics['max_rtt']:.1f} ms")
return statistics
except Exception as e:
print(f"Ping错误: {e}")
return {'error': str(e)}
def traceroute(self, host: str, max_hops: int = 30) -> List[Dict]:
"""实现traceroute功能"""
try:
dest_addr = socket.gethostbyname(host)
print(f"traceroute to {host} ({dest_addr}), {max_hops} hops max")
results = []
for ttl in range(1, max_hops + 1):
# 创建套接字
send_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
recv_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
recv_sock.settimeout(3)
# 设置TTL
send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl)
# 创建ICMP Echo Request
data = f"traceroute {ttl}".encode()
packet = self.create_icmp_packet(8, 0, data)
# 发送数据包
send_time = time.time()
send_sock.sendto(packet, (dest_addr, 0))
hop_info = {'hop': ttl, 'ip': None, 'hostname': None, 'rtt': None, 'type': None}
try:
# 接收响应
recv_data, addr = recv_sock.recvfrom(1024)
recv_time = time.time()
rtt = (recv_time - send_time) * 1000
# 解析响应
icmp_data = recv_data[20:] # 跳过IP头部
icmp_info = self.parse_icmp_packet(icmp_data)
hop_info['ip'] = addr[0]
hop_info['rtt'] = rtt
try:
hop_info['hostname'] = socket.gethostbyaddr(addr[0])[0]
except:
hop_info['hostname'] = addr[0]
if icmp_info:
if icmp_info['type'] == 11: # Time Exceeded
hop_info['type'] = 'Time Exceeded'
print(f"{ttl:2d} {hop_info['hostname']} ({hop_info['ip']}) {rtt:.1f} ms")
elif icmp_info['type'] == 0: # Echo Reply
hop_info['type'] = 'Destination Reached'
print(f"{ttl:2d} {hop_info['hostname']} ({hop_info['ip']}) {rtt:.1f} ms")
results.append(hop_info)
break
except socket.timeout:
hop_info['type'] = 'Timeout'
print(f"{ttl:2d} * * *")
results.append(hop_info)
send_sock.close()
recv_sock.close()
return results
except Exception as e:
print(f"Traceroute错误: {e}")
return [{'error': str(e)}]
def display_icmp_info(self, icmp_info: Dict):
"""显示ICMP数据包信息"""
print("ICMP数据包信息:")
print(f" 类型: {icmp_info['type']} ({icmp_info['type_name']})")
print(f" 代码: {icmp_info['code']} ({icmp_info['code_name']})")
print(f" 校验和: 0x{icmp_info['checksum']:04x} ({'有效' if icmp_info['checksum_valid'] else '无效'})")
if icmp_info['identifier'] is not None:
print(f" 标识符: {icmp_info['identifier']}")
if icmp_info['sequence'] is not None:
print(f" 序列号: {icmp_info['sequence']}")
print(f" 数据长度: {icmp_info['data_length']} 字节")
# 使用示例(需要管理员权限)
if __name__ == "__main__":
icmp = ICMPProtocol()
# 创建ICMP Echo Request
print("创建ICMP Echo Request:")
data = b"Hello, ICMP!"
packet = icmp.create_icmp_packet(8, 0, data)
print(f"数据包长度: {len(packet)} 字节")
# 解析数据包
print("\n解析ICMP数据包:")
icmp_info = icmp.parse_icmp_packet(packet)
if icmp_info:
icmp.display_icmp_info(icmp_info)
# 注意:ping和traceroute需要管理员权限
# icmp.ping('8.8.8.8', count=3)
# icmp.traceroute('8.8.8.8', max_hops=15)
ARP协议
ARP(Address Resolution Protocol)用于将IP地址解析为MAC地址,是网络层与数据链路层之间的桥梁。
ARP工作原理
ARP数据包格式
# 文件路径: arp_protocol.py
# ARP协议实现
import struct
import socket
import time
from typing import Dict, List, Optional
class ARPProtocol:
def __init__(self):
# ARP操作类型
self.ARP_OPERATIONS = {
1: 'ARP Request',
2: 'ARP Reply',
3: 'RARP Request',
4: 'RARP Reply'
}
# 硬件类型
self.HARDWARE_TYPES = {
1: 'Ethernet'
}
# 协议类型
self.PROTOCOL_TYPES = {
0x0800: 'IPv4'
}
# ARP缓存表
self.arp_cache = {} # {ip: {'mac': mac, 'timestamp': time, 'static': bool}}
self.cache_timeout = 300 # 5分钟超时
def create_arp_packet(self, operation: int, sender_mac: str, sender_ip: str,
target_mac: str, target_ip: str) -> bytes:
"""创建ARP数据包"""
# ARP头部字段
hardware_type = 1 # Ethernet
protocol_type = 0x0800 # IPv4
hardware_len = 6 # MAC地址长度
protocol_len = 4 # IP地址长度
# 转换MAC地址
sender_mac_bytes = bytes.fromhex(sender_mac.replace(':', ''))
target_mac_bytes = bytes.fromhex(target_mac.replace(':', ''))
# 转换IP地址
sender_ip_bytes = socket.inet_aton(sender_ip)
target_ip_bytes = socket.inet_aton(target_ip)
# 构建ARP数据包
packet = struct.pack('!HHBBH',
hardware_type,
protocol_type,
hardware_len,
protocol_len,
operation)
packet += sender_mac_bytes
packet += sender_ip_bytes
packet += target_mac_bytes
packet += target_ip_bytes
return packet
def parse_arp_packet(self, packet_data: bytes) -> Optional[Dict]:
"""解析ARP数据包"""
if len(packet_data) < 28:
return None
try:
# 解析ARP头部
hardware_type, protocol_type, hardware_len, protocol_len, operation = struct.unpack('!HHBBH', packet_data[:8])
# 解析地址字段
offset = 8
sender_mac = packet_data[offset:offset + hardware_len]
offset += hardware_len
sender_ip = packet_data[offset:offset + protocol_len]
offset += protocol_len
target_mac = packet_data[offset:offset + hardware_len]
offset += hardware_len
target_ip = packet_data[offset:offset + protocol_len]
# 格式化地址
sender_mac_str = ':'.join(f'{b:02x}' for b in sender_mac)
target_mac_str = ':'.join(f'{b:02x}' for b in target_mac)
sender_ip_str = socket.inet_ntoa(sender_ip)
target_ip_str = socket.inet_ntoa(target_ip)
return {
'hardware_type': hardware_type,
'hardware_type_name': self.HARDWARE_TYPES.get(hardware_type, 'Unknown'),
'protocol_type': protocol_type,
'protocol_type_name': self.PROTOCOL_TYPES.get(protocol_type, 'Unknown'),
'hardware_len': hardware_len,
'protocol_len': protocol_len,
'operation': operation,
'operation_name': self.ARP_OPERATIONS.get(operation, 'Unknown'),
'sender_mac': sender_mac_str,
'sender_ip': sender_ip_str,
'target_mac': target_mac_str,
'target_ip': target_ip_str
}
except Exception as e:
print(f"解析ARP数据包时出错: {e}")
return None
def add_arp_entry(self, ip: str, mac: str, static: bool = False):
"""添加ARP缓存条目"""
self.arp_cache[ip] = {
'mac': mac,
'timestamp': time.time(),
'static': static
}
print(f"ARP缓存添加: {ip} -> {mac} ({'静态' if static else '动态'})")
def get_arp_entry(self, ip: str) -> Optional[str]:
"""获取ARP缓存条目"""
if ip in self.arp_cache:
entry = self.arp_cache[ip]
# 检查是否超时(静态条目不超时)
if entry['static'] or (time.time() - entry['timestamp']) < self.cache_timeout:
return entry['mac']
else:
# 删除超时条目
del self.arp_cache[ip]
print(f"ARP缓存条目超时删除: {ip}")
return None
def clear_arp_cache(self):
"""清空ARP缓存"""
# 只删除动态条目
dynamic_entries = [ip for ip, entry in self.arp_cache.items() if not entry['static']]
for ip in dynamic_entries:
del self.arp_cache[ip]
print(f"清空动态ARP缓存,删除 {len(dynamic_entries)} 个条目")
def display_arp_cache(self):
"""显示ARP缓存表"""
print("\nARP缓存表:")
print("IP地址\t\t\tMAC地址\t\t\t类型\t剩余时间")
print("-" * 70)
current_time = time.time()
for ip, entry in sorted(self.arp_cache.items()):
if entry['static']:
remaining = "永久"
entry_type = "静态"
else:
remaining_seconds = self.cache_timeout - (current_time - entry['timestamp'])
if remaining_seconds > 0:
remaining = f"{remaining_seconds:.0f}秒"
entry_type = "动态"
else:
remaining = "已过期"
entry_type = "动态"
print(f"{ip:<15}\t{entry['mac']}\t{entry_type}\t{remaining}")
def simulate_arp_resolution(self, source_ip: str, source_mac: str, target_ip: str) -> Optional[str]:
"""模拟ARP地址解析过程"""
print(f"\n开始ARP地址解析: {source_ip} 查询 {target_ip} 的MAC地址")
# 1. 检查ARP缓存
cached_mac = self.get_arp_entry(target_ip)
if cached_mac:
print(f"ARP缓存命中: {target_ip} -> {cached_mac}")
return cached_mac
print(f"ARP缓存未命中,发送ARP请求")
# 2. 创建ARP请求
arp_request = self.create_arp_packet(
operation=1, # ARP Request
sender_mac=source_mac,
sender_ip=source_ip,
target_mac="00:00:00:00:00:00", # 未知MAC地址
target_ip=target_ip
)
print(f"发送ARP请求广播: 谁有 {target_ip}?告诉 {source_ip}")
# 3. 解析ARP请求(用于验证)
request_info = self.parse_arp_packet(arp_request)
if request_info:
print(f"ARP请求详情: {request_info['operation_name']}")
print(f" 发送者: {request_info['sender_ip']} ({request_info['sender_mac']})")
print(f" 目标: {request_info['target_ip']} ({request_info['target_mac']})")
# 4. 模拟ARP响应(实际环境中由目标主机发送)
# 这里我们模拟一个响应
import random
simulated_target_mac = f"aa:bb:cc:{random.randint(0,255):02x}:{random.randint(0,255):02x}:{random.randint(0,255):02x}"
arp_reply = self.create_arp_packet(
operation=2, # ARP Reply
sender_mac=simulated_target_mac,
sender_ip=target_ip,
target_mac=source_mac,
target_ip=source_ip
)
print(f"接收到ARP响应: {target_ip} 在 {simulated_target_mac}")
# 5. 解析ARP响应
reply_info = self.parse_arp_packet(arp_reply)
if reply_info:
print(f"ARP响应详情: {reply_info['operation_name']}")
print(f" 发送者: {reply_info['sender_ip']} ({reply_info['sender_mac']})")
print(f" 目标: {reply_info['target_ip']} ({reply_info['target_mac']})")
# 6. 更新ARP缓存
self.add_arp_entry(reply_info['sender_ip'], reply_info['sender_mac'])
return reply_info['sender_mac']
return None
def display_arp_packet(self, arp_info: Dict):
"""显示ARP数据包信息"""
print("ARP数据包信息:")
print(f" 硬件类型: {arp_info['hardware_type']} ({arp_info['hardware_type_name']})")
print(f" 协议类型: 0x{arp_info['protocol_type']:04x} ({arp_info['protocol_type_name']})")
print(f" 硬件地址长度: {arp_info['hardware_len']}")
print(f" 协议地址长度: {arp_info['protocol_len']}")
print(f" 操作: {arp_info['operation']} ({arp_info['operation_name']})")
print(f" 发送者MAC: {arp_info['sender_mac']}")
print(f" 发送者IP: {arp_info['sender_ip']}")
print(f" 目标MAC: {arp_info['target_mac']}")
print(f" 目标IP: {arp_info['target_ip']}")
# 使用示例
arp = ARPProtocol()
# 添加一些静态ARP条目
arp.add_arp_entry("192.168.1.1", "aa:bb:cc:dd:ee:ff", static=True)
arp.add_arp_entry("192.168.1.100", "11:22:33:44:55:66")
# 显示ARP缓存
arp.display_arp_cache()
print("\n" + "="*60 + "\n")
# 模拟ARP地址解析
source_ip = "192.168.1.10"
source_mac = "aa:bb:cc:11:22:33"
target_ip = "192.168.1.20"
resolved_mac = arp.simulate_arp_resolution(source_ip, source_mac, target_ip)
if resolved_mac:
print(f"\n地址解析成功: {target_ip} -> {resolved_mac}")
# 再次显示ARP缓存
arp.display_arp_cache()
print("\n" + "="*60 + "\n")
# 测试ARP缓存命中
print("测试ARP缓存命中:")
cached_mac = arp.simulate_arp_resolution(source_ip, source_mac, target_ip)
if cached_mac:
print(f"缓存命中: {target_ip} -> {cached_mac}")
IPv6协议
IPv6(Internet Protocol version 6)是下一代互联网协议,解决了IPv4地址耗尽问题。
IPv6特点
- 地址空间:128位地址,提供340万亿亿亿个地址
- 简化头部:固定40字节头部,提高处理效率
- 自动配置:支持无状态地址自动配置(SLAAC)
- 内置安全:强制支持IPSec
- 移动性:更好的移动IP支持
- QoS支持:流标签字段支持服务质量
IPv6地址格式
# 文件路径: ipv6_addressing.py
# IPv6地址处理
import ipaddress
import socket
from typing import Dict, List, Optional
class IPv6AddressManager:
def __init__(self):
# IPv6地址类型
self.address_types = {
'unicast': {
'global': '2000::/3',
'link_local': 'fe80::/10',
'unique_local': 'fc00::/7',
'loopback': '::1/128'
},
'multicast': {
'all_nodes': 'ff02::1',
'all_routers': 'ff02::2',
'solicited_node': 'ff02::1:ff00:0/104'
},
'anycast': {
'subnet_router': '::0/128'
}
}
# 知名端口前缀
self.well_known_prefixes = {
'2001:db8::/32': '文档用途(RFC 3849)',
'::1/128': '环回地址',
'::/128': '未指定地址',
'fe80::/10': '链路本地地址',
'ff00::/8': '多播地址',
'2001::/16': '特殊用途地址'
}
def format_ipv6_address(self, addr_str: str) -> Dict:
"""格式化IPv6地址"""
try:
addr = ipaddress.IPv6Address(addr_str)
# 不同表示形式
full_form = str(addr.exploded) # 完整形式
compressed_form = str(addr.compressed) # 压缩形式
# 地址分析
is_global = addr.is_global
is_link_local = addr.is_link_local
is_loopback = addr.is_loopback
is_multicast = addr.is_multicast
is_private = addr.is_private
is_reserved = addr.is_reserved
is_unspecified = addr.is_unspecified
# 确定地址类型
addr_type = self.classify_ipv6_address(addr)
return {
'original': addr_str,
'full_form': full_form,
'compressed_form': compressed_form,
'address_type': addr_type,
'is_global': is_global,
'is_link_local': is_link_local,
'is_loopback': is_loopback,
'is_multicast': is_multicast,
'is_private': is_private,
'is_reserved': is_reserved,
'is_unspecified': is_unspecified
}
except ValueError as e:
return {'error': str(e)}
def classify_ipv6_address(self, addr: ipaddress.IPv6Address) -> str:
"""分类IPv6地址"""
if addr.is_multicast:
return 'Multicast'
elif addr.is_link_local:
return 'Link-Local Unicast'
elif addr.is_loopback:
return 'Loopback'
elif addr.is_unspecified:
return 'Unspecified'
elif addr.is_global:
return 'Global Unicast'
elif addr.is_private:
return 'Unique Local Unicast'
else:
return 'Unknown'
def subnet_ipv6_network(self, network_str: str, new_prefix_len: int) -> List[str]:
"""IPv6子网划分"""
try:
network = ipaddress.IPv6Network(network_str, strict=False)
if new_prefix_len <= network.prefixlen:
return [str(network)] # 无法进一步划分
subnets = list(network.subnets(new_prefix=new_prefix_len))
return [str(subnet) for subnet in subnets]
except ValueError as e:
return [f'Error: {e}']
def generate_eui64_address(self, prefix: str, mac_address: str) -> str:
"""生成EUI-64接口标识符"""
try:
# 解析MAC地址
mac_parts = mac_address.replace(':', '').replace('-', '')
if len(mac_parts) != 12:
raise ValueError("无效的MAC地址格式")
# 分割MAC地址
oui = mac_parts[:6] # 组织唯一标识符
nic = mac_parts[6:] # 网络接口控制器
# 插入FFFE
eui64 = oui + 'fffe' + nic
# 翻转第7位(U/L位)
first_byte = int(eui64[:2], 16)
first_byte ^= 0x02 # 翻转第7位
eui64 = f'{first_byte:02x}' + eui64[2:]
# 格式化为IPv6接口标识符
interface_id = ':'.join([eui64[i:i+4] for i in range(0, 16, 4)])
# 组合前缀和接口标识符
network = ipaddress.IPv6Network(prefix, strict=False)
prefix_part = str(network.network_address).split('::')[0]
ipv6_address = f"{prefix_part}::{interface_id}"
# 验证并格式化
addr = ipaddress.IPv6Address(ipv6_address)
return {
'mac_address': mac_address,
'eui64_interface_id': interface_id,
'ipv6_address': str(addr.compressed),
'full_address': str(addr.exploded)
}
except Exception as e:
return {'error': str(e)}
def display_address_info(self, addr_info: Dict):
"""显示IPv6地址信息"""
if 'error' in addr_info:
print(f"错误: {addr_info['error']}")
return
print("IPv6地址信息:")
print(f" 原始地址: {addr_info['original']}")
print(f" 完整形式: {addr_info['full_form']}")
print(f" 压缩形式: {addr_info['compressed_form']}")
print(f" 地址类型: {addr_info['address_type']}")
print(f" 全球唯一: {addr_info['is_global']}")
print(f" 链路本地: {addr_info['is_link_local']}")
print(f" 环回地址: {addr_info['is_loopback']}")
print(f" 多播地址: {addr_info['is_multicast']}")
# 使用示例
ipv6_mgr = IPv6AddressManager()
# 测试不同类型的IPv6地址
test_addresses = [
'2001:db8:85a3::8a2e:370:7334', # 全球单播
'fe80::1', # 链路本地
'::1', # 环回
'ff02::1', # 多播
'::', # 未指定
'fc00::1' # 唯一本地
]
print("IPv6地址分类测试:")
for addr in test_addresses:
print(f"\n地址: {addr}")
info = ipv6_mgr.format_ipv6_address(addr)
ipv6_mgr.display_address_info(info)
print("\n" + "="*60 + "\n")
# EUI-64地址生成测试
print("EUI-64地址生成测试:")
prefix = "2001:db8::/64"
mac = "aa:bb:cc:dd:ee:ff"
eui64_result = ipv6_mgr.generate_eui64_address(prefix, mac)
if 'error' not in eui64_result:
print(f"MAC地址: {eui64_result['mac_address']}")
print(f"EUI-64接口ID: {eui64_result['eui64_interface_id']}")
print(f"IPv6地址: {eui64_result['ipv6_address']}")
print(f"完整地址: {eui64_result['full_address']}")
else:
print(f"错误: {eui64_result['error']}")
print("\n" + "="*60 + "\n")
# 子网划分测试
print("IPv6子网划分测试:")
network = "2001:db8::/48"
new_prefix = 64
subnets = ipv6_mgr.subnet_ipv6_network(network, new_prefix)
print(f"原网络: {network}")
print(f"划分为 /{new_prefix} 子网:")
for i, subnet in enumerate(subnets[:10]): # 只显示前10个
print(f" 子网 {i+1}: {subnet}")
if len(subnets) > 10:
print(f" ... 还有 {len(subnets) - 10} 个子网")
网络地址转换(NAT)
NAT(Network Address Translation)允许私有网络中的多个设备共享一个公网IP地址。
NAT类型
NAT实现
# 文件路径: nat_implementation.py
# NAT网络地址转换实现
import time
import random
from typing import Dict, List, Optional, Tuple
class NATTable:
def __init__(self):
self.static_mappings = {} # 静态映射
self.dynamic_mappings = {} # 动态映射
self.port_mappings = {} # 端口映射
self.next_port = 1024 # 下一个可用端口
self.timeout = 300 # 5分钟超时
def add_static_mapping(self, internal_ip: str, external_ip: str):
"""添加静态NAT映射"""
self.static_mappings[internal_ip] = {
'external_ip': external_ip,
'type': 'static',
'timestamp': time.time()
}
print(f"添加静态NAT映射: {internal_ip} -> {external_ip}")
def add_dynamic_mapping(self, internal_ip: str, external_ip_pool: List[str]) -> Optional[str]:
"""添加动态NAT映射"""
# 检查是否已有映射
if internal_ip in self.dynamic_mappings:
mapping = self.dynamic_mappings[internal_ip]
if time.time() - mapping['timestamp'] < self.timeout:
return mapping['external_ip']
# 从地址池中选择可用地址
used_ips = {mapping['external_ip'] for mapping in self.dynamic_mappings.values()}
available_ips = [ip for ip in external_ip_pool if ip not in used_ips]
if not available_ips:
return None # 地址池耗尽
external_ip = random.choice(available_ips)
self.dynamic_mappings[internal_ip] = {
'external_ip': external_ip,
'type': 'dynamic',
'timestamp': time.time()
}
print(f"添加动态NAT映射: {internal_ip} -> {external_ip}")
return external_ip
def add_port_mapping(self, internal_ip: str, internal_port: int,
external_ip: str, protocol: str = 'TCP') -> Optional[int]:
"""添加端口地址转换映射(PAT/NAPT)"""
# 生成映射键
internal_key = f"{internal_ip}:{internal_port}:{protocol}"
# 检查是否已有映射
if internal_key in self.port_mappings:
mapping = self.port_mappings[internal_key]
if time.time() - mapping['timestamp'] < self.timeout:
return mapping['external_port']
# 分配外部端口
external_port = self.allocate_external_port(external_ip, protocol)
if external_port is None:
return None
# 创建映射
external_key = f"{external_ip}:{external_port}:{protocol}"
mapping_info = {
'internal_ip': internal_ip,
'internal_port': internal_port,
'external_ip': external_ip,
'external_port': external_port,
'protocol': protocol,
'timestamp': time.time(),
'type': 'port'
}
self.port_mappings[internal_key] = mapping_info
self.port_mappings[external_key] = mapping_info
print(f"添加端口映射: {internal_ip}:{internal_port} -> {external_ip}:{external_port} ({protocol})")
return external_port
def allocate_external_port(self, external_ip: str, protocol: str) -> Optional[int]:
"""分配外部端口"""
# 查找可用端口
for port in range(self.next_port, 65536):
external_key = f"{external_ip}:{port}:{protocol}"
if external_key not in self.port_mappings:
self.next_port = port + 1
return port
# 从1024开始重新查找
for port in range(1024, self.next_port):
external_key = f"{external_ip}:{port}:{protocol}"
if external_key not in self.port_mappings:
return port
return None # 端口耗尽
def translate_outbound(self, internal_ip: str, internal_port: int,
external_ip: str, protocol: str = 'TCP') -> Optional[Tuple[str, int]]:
"""出站地址转换"""
# 1. 检查静态映射
if internal_ip in self.static_mappings:
mapped_ip = self.static_mappings[internal_ip]['external_ip']
return (mapped_ip, internal_port)
# 2. 检查动态映射
if internal_ip in self.dynamic_mappings:
mapping = self.dynamic_mappings[internal_ip]
if time.time() - mapping['timestamp'] < self.timeout:
return (mapping['external_ip'], internal_port)
# 3. 使用端口地址转换
external_port = self.add_port_mapping(internal_ip, internal_port, external_ip, protocol)
if external_port:
return (external_ip, external_port)
return None
def translate_inbound(self, external_ip: str, external_port: int,
protocol: str = 'TCP') -> Optional[Tuple[str, int]]:
"""入站地址转换"""
external_key = f"{external_ip}:{external_port}:{protocol}"
if external_key in self.port_mappings:
mapping = self.port_mappings[external_key]
if time.time() - mapping['timestamp'] < self.timeout:
return (mapping['internal_ip'], mapping['internal_port'])
else:
# 删除过期映射
self.remove_mapping(external_key)
return None
def remove_mapping(self, key: str):
"""删除映射"""
if key in self.port_mappings:
mapping = self.port_mappings[key]
internal_key = f"{mapping['internal_ip']}:{mapping['internal_port']}:{mapping['protocol']}"
external_key = f"{mapping['external_ip']}:{mapping['external_port']}:{mapping['protocol']}"
self.port_mappings.pop(internal_key, None)
self.port_mappings.pop(external_key, None)
print(f"删除过期映射: {internal_key} <-> {external_key}")
def cleanup_expired_mappings(self):
"""清理过期映射"""
current_time = time.time()
expired_keys = []
# 检查动态映射
for internal_ip, mapping in list(self.dynamic_mappings.items()):
if current_time - mapping['timestamp'] > self.timeout:
expired_keys.append(internal_ip)
for key in expired_keys:
del self.dynamic_mappings[key]
print(f"删除过期动态映射: {key}")
# 检查端口映射
expired_keys = []
for key, mapping in list(self.port_mappings.items()):
if current_time - mapping['timestamp'] > self.timeout:
expired_keys.append(key)
for key in expired_keys:
self.remove_mapping(key)
def display_nat_table(self):
"""显示NAT表"""
print("\nNAT转换表:")
print("=" * 80)
# 静态映射
if self.static_mappings:
print("静态映射:")
print("内部IP\t\t外部IP\t\t类型")
print("-" * 50)
for internal_ip, mapping in self.static_mappings.items():
print(f"{internal_ip}\t\t{mapping['external_ip']}\t\t{mapping['type']}")
print()
# 动态映射
if self.dynamic_mappings:
print("动态映射:")
print("内部IP\t\t外部IP\t\t剩余时间")
print("-" * 50)
current_time = time.time()
for internal_ip, mapping in self.dynamic_mappings.items():
remaining = self.timeout - (current_time - mapping['timestamp'])
print(f"{internal_ip}\t\t{mapping['external_ip']}\t\t{remaining:.0f}秒")
print()
# 端口映射
port_mappings_display = {}
for key, mapping in self.port_mappings.items():
if ':' in key and mapping['type'] == 'port':
internal_key = f"{mapping['internal_ip']}:{mapping['internal_port']}"
external_key = f"{mapping['external_ip']}:{mapping['external_port']}"
if internal_key not in port_mappings_display:
port_mappings_display[internal_key] = {
'external': external_key,
'protocol': mapping['protocol'],
'timestamp': mapping['timestamp']
}
if port_mappings_display:
print("端口映射 (PAT/NAPT):")
print("内部地址:端口\t\t外部地址:端口\t\t协议\t剩余时间")
print("-" * 70)
current_time = time.time()
for internal, info in port_mappings_display.items():
remaining = self.timeout - (current_time - info['timestamp'])
print(f"{internal}\t\t{info['external']}\t\t{info['protocol']}\t{remaining:.0f}秒")
# 使用示例
nat_table = NATTable()
# 配置静态NAT
nat_table.add_static_mapping("192.168.1.100", "203.0.113.10")
# 配置动态NAT地址池
external_pool = ["203.0.113.20", "203.0.113.21", "203.0.113.22"]
# 模拟网络连接
print("\n模拟网络连接:")
print("=" * 50)
# 内部主机发起连接
internal_connections = [
("192.168.1.10", 12345, "TCP"),
("192.168.1.20", 23456, "TCP"),
("192.168.1.30", 34567, "UDP"),
("192.168.1.10", 45678, "TCP") # 同一主机的另一个连接
]
external_ip = "203.0.113.1" # NAT设备的外部IP
for internal_ip, internal_port, protocol in internal_connections:
print(f"\n内部连接: {internal_ip}:{internal_port} ({protocol})")
# 出站转换
result = nat_table.translate_outbound(internal_ip, internal_port, external_ip, protocol)
if result:
ext_ip, ext_port = result
print(f"转换后: {ext_ip}:{ext_port}")
# 模拟入站响应
inbound_result = nat_table.translate_inbound(ext_ip, ext_port, protocol)
if inbound_result:
in_ip, in_port = inbound_result
print(f"入站转换: {ext_ip}:{ext_port} -> {in_ip}:{in_port}")
else:
print("转换失败")
# 显示NAT表
nat_table.display_nat_table()
# 清理过期映射
print("\n等待5秒后清理过期映射...")
time.sleep(5)
nat_table.cleanup_expired_mappings()
nat_table.display_nat_table()
实践练习
练习1:IP数据包分析
- 使用Wireshark捕获并分析IP数据包
- 识别IP头部各字段的值
- 观察IP分片和重组过程
- 验收标准:能够正确解读IP头部信息,理解分片机制
练习2:路由表配置
- 在虚拟机中配置静态路由
- 测试不同网段间的连通性
- 使用traceroute跟踪数据包路径
- 验收标准:成功配置路由,理解路由选择过程
练习3:ICMP协议测试
- 使用ping命令测试网络连通性
- 分析ICMP错误消息类型
- 实现简单的网络诊断工具
- 验收标准:掌握ICMP协议的应用和故障诊断
练习4:IPv6地址配置
- 配置IPv6地址和路由
- 测试IPv6连通性
- 比较IPv4和IPv6的性能差异
- 验收标准:成功部署IPv6网络,理解双栈配置
练习5:NAT配置实验
- 在路由器上配置NAT规则
- 测试内网到外网的访问
- 分析NAT转换表的变化
- 验收标准:理解NAT工作原理,能够排查NAT相关问题
常见问题
Q1:为什么需要IP分片?如何避免分片?
A:IP分片是为了适应不同网络的MTU限制。避免分片的方法:
- 使用Path MTU Discovery
- 设置合适的MSS值
- 应用层控制数据包大小
Q2:什么是路由环路?如何检测和避免?
A:路由环路是数据包在网络中循环转发的现象。检测方法:
- TTL字段递减到0
- 使用traceroute观察路径
- 路由协议的环路检测机制
Q3:IPv4地址耗尽后有哪些解决方案?
A:主要解决方案包括:
- NAT技术延缓地址耗尽
- IPv6协议提供更大地址空间
- CIDR和VLSM提高地址利用率
- 私有地址和地址回收
Q4:ICMP协议有哪些安全风险?
A:ICMP的安全风险:
- Ping洪水攻击
- ICMP重定向攻击
- 信息泄露(网络拓扑探测)
- 防护措施:限制ICMP流量,过滤敏感消息类型
Q5:NAT对网络应用有什么影响?
A:NAT的影响:
- 破坏端到端连接模型
- 影响P2P应用
- 需要ALG支持某些协议
- 解决方案:UPnP、STUN、TURN等穿透技术
Q6:如何选择合适的路由协议?
A:选择考虑因素:
- 网络规模(RIP适合小型,OSPF适合大型)
- 收敛速度要求
- 带宽和CPU资源
- 管理复杂度
- 厂商支持情况
总结
本章深入学习了网络层协议的核心内容:
- IP协议:理解了IPv4头部结构、地址分类、分片机制和路由原理
- 路由协议:掌握了距离向量和链路状态算法的工作原理
- ICMP协议:学习了网络诊断和错误报告机制
- ARP协议:理解了地址解析的工作过程
- IPv6协议:了解了下一代互联网协议的特点和优势
- NAT技术:掌握了网络地址转换的实现原理
网络层是TCP/IP协议栈的核心,负责数据包的路由和转发。理解网络层协议对于网络工程师和系统管理员至关重要。
下一步
- 前往:006-传输层协议
- 扩展学习:
- 高级路由协议(BGP、IS-IS)
- IPv6过渡技术
- SDN和网络虚拟化
- 网络安全协议(IPSec)
参考与引用
- RFC 791 - Internet Protocol
- RFC 2460 - Internet Protocol, Version 6 (IPv6)
- RFC 792 - Internet Control Message Protocol
- RFC 826 - Ethernet Address Resolution Protocol
- RFC 3022 - Traditional IP Network Address Translator
- 《计算机网络:自顶向下方法》第7版 - James F. Kurose
- 《TCP/IP详解 卷1:协议》- W. Richard Stevens
- Cisco网络技术学院教程:路由和交换基础
更新记录
- 更新时间: 2024-12-19 | 更新内容: 创建网络层协议章节,包含IP、路由、ICMP、ARP、IPv6、NAT等内容 | 更新人: Assistant
更多推荐
所有评论(0)