本文目录导读:
我来为您介绍一个Python端口检测的实操案例,包含完整的代码示例和多种实现方式。
基础TCP端口扫描器
import socket
import threading
from datetime import datetime
class PortScanner:
def __init__(self, target, start_port=1, end_port=1024):
self.target = target
self.start_port = start_port
self.end_port = end_port
self.open_ports = []
def scan_port(self, port):
"""扫描单个端口"""
try:
# 创建socket对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 设置超时时间
# 尝试连接
result = sock.connect_ex((self.target, port))
if result == 0: # 连接成功
self.open_ports.append(port)
print(f"端口 {port} 开放")
sock.close()
except socket.gaierror:
print("主机名解析错误")
except socket.error:
print(f"无法连接到端口 {port}")
def scan_multithreaded(self, threads=100):
"""多线程扫描端口"""
print(f"\n开始扫描 {self.target}")
print(f"端口范围: {self.start_port}-{self.end_port}")
print(f"开始时间: {datetime.now()}")
print("-" * 50)
# 创建线程池
thread_list = []
for port in range(self.start_port, self.end_port + 1):
thread = threading.Thread(target=self.scan_port, args=(port,))
thread_list.append(thread)
# 控制线程数量
if len(thread_list) >= threads:
for t in thread_list:
t.start()
for t in thread_list:
t.join()
thread_list = []
# 处理剩余的线程
if thread_list:
for t in thread_list:
t.start()
for t in thread_list:
t.join()
print("-" * 50)
print(f"扫描完成时间: {datetime.now()}")
print(f"开放端口总数: {len(self.open_ports)}")
return self.open_ports
# 使用示例
if __name__ == "__main__":
scanner = PortScanner("127.0.0.1", 1, 1000)
open_ports = scanner.scan_multithreaded(threads=50)
print(f"开放的端口: {open_ports}")
增强版端口扫描(带服务识别)
import socket
import threading
from datetime import datetime
import ipaddress
class AdvancedPortScanner:
def __init__(self):
self.results = {}
self.known_services = {
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
80: "HTTP",
110: "POP3",
135: "RPC",
139: "NetBIOS",
143: "IMAP",
443: "HTTPS",
445: "SMB",
993: "IMAPS",
995: "POP3S",
1433: "MSSQL",
1521: "Oracle",
3306: "MySQL",
3389: "RDP",
5432: "PostgreSQL",
6379: "Redis",
8080: "HTTP-Proxy",
8443: "HTTPS-Alt",
27017: "MongoDB"
}
def get_service_name(self, port):
"""获取常见端口对应的服务名称"""
return self.known_services.get(port, "Unknown")
def get_banner(self, host, port):
"""尝试获取服务banner信息"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((host, port))
# 发送探测包
if port == 80 or port == 8080:
sock.send(b"GET / HTTP/1.0\r\n\r\n")
# 接收banner
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
sock.close()
return banner[:100] if banner else "No banner"
except:
return "Unable to get banner"
def scan_port(self, host, port):
"""扫描端口并获取信息"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
if result == 0:
service = self.get_service_name(port)
banner = self.get_banner(host, port)
self.results[port] = {
'service': service,
'banner': banner
}
print(f"端口 {port:5d} 开放 - {service:15s} - {banner}")
sock.close()
except Exception as e:
pass
def scan_network(self, network, ports=None):
"""扫描整个网络"""
if ports is None:
ports = [21, 22, 23, 25, 80, 443, 8080, 3306, 3389]
try:
net = ipaddress.ip_network(network, strict=False)
except:
print(f"无效的网络地址: {network}")
return
print(f"扫描网络: {network}")
print(f"检查端口: {ports}")
print(f"开始时间: {datetime.now()}")
print("=" * 60)
threads = []
for ip in net.hosts():
host = str(ip)
for port in ports:
thread = threading.Thread(
target=self.scan_port,
args=(host, port)
)
threads.append(thread)
thread.start()
# 控制并发数
if len(threads) >= 100:
for t in threads:
t.join()
threads = []
# 等待所有线程完成
for t in threads:
t.join()
print("=" * 60)
print(f"扫描完成时间: {datetime.now()}")
self.print_summary()
def print_summary(self):
"""打印扫描结果汇总"""
if not self.results:
print("未发现开放端口")
return
print(f"\n发现 {len(self.results)} 个开放端口:")
for port, info in sorted(self.results.items()):
print(f" {port}: {info['service']}")
# 使用示例
if __name__ == "__main__":
scanner = AdvancedPortScanner()
# 扫描单个主机
print("=== 扫描本地主机 ===")
for port in [80, 443, 3306, 8080, 22, 21]:
scanner.scan_port("127.0.0.1", port)
# 扫描整个网络
# scanner.scan_network("192.168.1.0/24")
端口可用性检查工具
import socket
import sys
import time
class PortChecker:
def __init__(self, timeout=2):
self.timeout = timeout
def check_port(self, host, port):
"""检查端口是否可用"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
result = sock.connect_ex((host, port))
if result == 0:
return True, "端口开放"
else:
return False, f"端口关闭 (错误码: {result})"
except socket.gaierror:
return False, "域名解析失败"
except socket.timeout:
return False, "连接超时"
except ConnectionRefusedError:
return False, "连接被拒绝"
except Exception as e:
return False, f"未知错误: {str(e)}"
finally:
sock.close()
def check_multiple_ports(self, host, ports, delay=0.1):
"""检查多个端口"""
results = {}
print(f"正在检查主机: {host}")
print(f"检查端口: {ports}")
print("-" * 40)
for port in ports:
success, message = self.check_port(host, port)
status = "✓ 开放" if success else "✗ 关闭"
results[port] = success
print(f"端口 {port:5d}: {status:10s} - {message}")
time.sleep(delay) # 避免扫描过快
return results
def wait_for_port(self, host, port, timeout=30):
"""等待端口变为可用"""
start_time = time.time()
while time.time() - start_time < timeout:
success, _ = self.check_port(host, port)
if success:
print(f"端口 {port} 在 {time.time() - start_time:.1f} 秒后可用")
return True
time.sleep(1)
print(f"超时: {timeout} 秒内端口 {port} 未开放")
return False
# 使用示例
if __name__ == "__main__":
checker = PortChecker()
# 检查单个端口
host = "localhost"
port = 80
is_open, message = checker.check_port(host, port)
print(f"端口 {port} 在 {host} 上的状态: {message}")
# 检查多个端口
ports_to_check = [80, 443, 3306, 5432, 6379]
results = checker.check_multiple_ports(host, ports_to_check)
# 等待端口可用
# checker.wait_for_port("localhost", 8080, timeout=10)
实际应用案例:监控Web服务
import socket
import time
import smtplib
from email.mime.text import MIMEText
import json
from datetime import datetime
class ServiceMonitor:
def __init__(self, config_file='monitor_config.json'):
self.config_file = config_file
self.services = self.load_config()
self.check_interval = 60 # 默认60秒检查一次
def load_config(self):
"""加载配置文件"""
default_config = {
"services": [
{"host": "localhost", "port": 80, "name": "Web Server"},
{"host": "localhost", "port": 3306, "name": "MySQL"},
{"host": "google.com", "port": 443, "name": "Google HTTPS"}
],
"email_alerts": {
"enabled": False,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"from_addr": "",
"to_addr": "",
"password": ""
}
}
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except:
print(f"配置文件不存在,使用默认配置")
return default_config
def check_service(self, host, port, timeout=5):
"""检查服务是否可用"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((host, port))
return result == 0
except:
return False
finally:
sock.close()
def send_alert(self, service_name, host, port, status):
"""发送告警通知(示例用邮件)"""
if not self.services.get('email_alerts', {}).get('enabled'):
return
msg = MIMEText(f"""
服务状态变更通知
时间: {datetime.now()}
服务: {service_name}
主机: {host}:{port}
状态: {status}
请及时处理!
""")
msg['Subject'] = f"服务告警: {service_name} - {status}"
msg['From'] = self.services['email_alerts']['from_addr']
msg['To'] = self.services['email_alerts']['to_addr']
try:
server = smtplib.SMTP(
self.services['email_alerts']['smtp_server'],
self.services['email_alerts']['smtp_port']
)
server.starttls()
server.login(
self.services['email_alerts']['from_addr'],
self.services['email_alerts']['password']
)
server.send_message(msg)
server.quit()
print("告警邮件已发送")
except Exception as e:
print(f"发送告警失败: {e}")
def monitor(self):
"""启动监控"""
print("=" * 60)
print(f"服务监控启动 - {datetime.now()}")
print("监控的服务:")
for service in self.services['services']:
print(f" {service['name']:20s} - {service['host']:15s}:{service['port']}")
print("=" * 60)
errors = {}
while True:
for service in self.services['services']:
host = service['host']
port = service['port']
name = service['name']
is_available = self.check_service(host, port)
status = "✓ 可用" if is_available else "✗ 不可用"
print(f"{datetime.now().strftime('%H:%M:%S')} {name:20s} - {status}")
# 检测状态变化
if not is_available:
if name not in errors:
errors[name] = 0
errors[name] += 1
# 连续3次失败则告警
if errors[name] >= 3:
self.send_alert(name, host, port, "服务异常")
else:
if name in errors:
self.send_alert(name, host, port, "服务恢复")
del errors[name]
time.sleep(self.check_interval)
# 使用示例
if __name__ == "__main__":
monitor = ServiceMonitor()
# monitor.monitor() # 启动监控(建议在后台运行)
# 测试单次检查
print("\n=== 单次服务检查 ===")
for service in monitor.services['services']:
available = monitor.check_service(service['host'], service['port'])
print(f"{service['name']:20s}: {'可用' if available else '不可用'}")
命令行工具版本
#!/usr/bin/env python3
import socket
import sys
import argparse
import json
from concurrent.futures import ThreadPoolExecutor
def scan_port(host, port):
"""扫描单个端口(用于线程池)"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
sock.close()
return port, result == 0
except:
return port, False
def scan_ports(host, start_port, end_port, threads=50):
"""使用线程池扫描端口范围"""
print(f"正在扫描 {host} 的端口 {start_port}-{end_port}")
print(f"使用 {threads} 个线程")
open_ports = []
ports = range(start_port, end_port + 1)
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = [executor.submit(scan_port, host, port) for port in ports]
for future in futures:
port, is_open = future.result()
if is_open:
open_ports.append(port)
print(f"端口 {port} 开放")
return sorted(open_ports)
def main():
parser = argparse.ArgumentParser(description='Python端口扫描工具')
parser.add_argument('host', help='目标主机IP或域名')
parser.add_argument('-p', '--ports', default='1-1024',
help='端口范围,如 80,443 或 1-1024')
parser.add_argument('-t', '--threads', type=int, default=50,
help='线程数(默认50)')
parser.add_argument('-o', '--output', help='输出文件(JSON格式)')
parser.add_argument('--timeout', type=int, default=1,
help='超时时间(默认1秒)')
args = parser.parse_args()
# 解析端口范围
if '-' in args.ports:
start_port, end_port = map(int, args.ports.split('-'))
else:
start_port = end_port = int(args.ports)
# 设置全局超时
socket.setdefaulttimeout(args.timeout)
# 扫描端口
open_ports = scan_ports(args.host, start_port, end_port, args.threads)
# 输出结果
print(f"\n扫描完成!发现 {len(open_ports)} 个开放端口")
print(f"开放端口列表: {open_ports}")
# 保存到文件
if args.output:
result = {
'host': args.host,
'port_range': f"{start_port}-{end_port}",
'scan_time': str(datetime.now()),
'open_ports': open_ports
}
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"结果已保存到 {args.output}")
if __name__ == "__main__":
main()
使用说明
安装依赖
# 基本版本不需要额外安装 # 邮件告警需要 pip install secure-smtplib
运行示例
-
基础端口扫描:
# 扫描本地主机端口1-1000 scanner = PortScanner("127.0.0.1", 1, 1000) open_ports = scanner.scan_multithreaded(threads=50) -
命令行工具:
# 扫描本地主机常用端口 python port_scanner.py 127.0.0.1 -p 20-100
扫描特定端口并输出结果
python port_scanner.py example.com -p 80,443,3306 -o results.json
快速扫描
python port_scanner.py 192.168.1.1 -p 1-1000 -t 100
3. **服务监控**:
```python
# 创建配置文件和监控
monitor = ServiceMonitor()
monitor.monitor()
注意事项
- 法律合规:仅扫描自己拥有或授权的服务器
- 网络影响:大量扫描可能影响网络性能
- 安全考虑:某些端口扫描可能被IDS/IPS检测
- 性能优化:根据网络状况调整线程数和超时时间
- 错误处理:生产环境需要完善的错误处理机制
这个案例提供了从基础到高级的完整端口检测解决方案,可以根据实际需求选择合适的版本使用。
标签: Python