Python端口检测案例实操?

wen python案例 2

本文目录导读:

  1. 扫描特定端口并输出结果
  2. 快速扫描

我来为您介绍一个Python端口检测的实操案例,包含完整的代码示例和多种实现方式。

基础TCP端口扫描器

import socket
import threading
from datetime import datetime
class PortScanner:
    def __init__(self, target, start_port=1, end_port=1024):
        self.target = target
        self.start_port = start_port
        self.end_port = end_port
        self.open_ports = []
    def scan_port(self, port):
        """扫描单个端口"""
        try:
            # 创建socket对象
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)  # 设置超时时间
            # 尝试连接
            result = sock.connect_ex((self.target, port))
            if result == 0:  # 连接成功
                self.open_ports.append(port)
                print(f"端口 {port} 开放")
            sock.close()
        except socket.gaierror:
            print("主机名解析错误")
        except socket.error:
            print(f"无法连接到端口 {port}")
    def scan_multithreaded(self, threads=100):
        """多线程扫描端口"""
        print(f"\n开始扫描 {self.target}")
        print(f"端口范围: {self.start_port}-{self.end_port}")
        print(f"开始时间: {datetime.now()}")
        print("-" * 50)
        # 创建线程池
        thread_list = []
        for port in range(self.start_port, self.end_port + 1):
            thread = threading.Thread(target=self.scan_port, args=(port,))
            thread_list.append(thread)
            # 控制线程数量
            if len(thread_list) >= threads:
                for t in thread_list:
                    t.start()
                for t in thread_list:
                    t.join()
                thread_list = []
        # 处理剩余的线程
        if thread_list:
            for t in thread_list:
                t.start()
            for t in thread_list:
                t.join()
        print("-" * 50)
        print(f"扫描完成时间: {datetime.now()}")
        print(f"开放端口总数: {len(self.open_ports)}")
        return self.open_ports
# 使用示例
if __name__ == "__main__":
    scanner = PortScanner("127.0.0.1", 1, 1000)
    open_ports = scanner.scan_multithreaded(threads=50)
    print(f"开放的端口: {open_ports}")

增强版端口扫描(带服务识别)

import socket
import threading
from datetime import datetime
import ipaddress
class AdvancedPortScanner:
    def __init__(self):
        self.results = {}
        self.known_services = {
            21: "FTP",
            22: "SSH",
            23: "Telnet",
            25: "SMTP",
            53: "DNS",
            80: "HTTP",
            110: "POP3",
            135: "RPC",
            139: "NetBIOS",
            143: "IMAP",
            443: "HTTPS",
            445: "SMB",
            993: "IMAPS",
            995: "POP3S",
            1433: "MSSQL",
            1521: "Oracle",
            3306: "MySQL",
            3389: "RDP",
            5432: "PostgreSQL",
            6379: "Redis",
            8080: "HTTP-Proxy",
            8443: "HTTPS-Alt",
            27017: "MongoDB"
        }
    def get_service_name(self, port):
        """获取常见端口对应的服务名称"""
        return self.known_services.get(port, "Unknown")
    def get_banner(self, host, port):
        """尝试获取服务banner信息"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(2)
            sock.connect((host, port))
            # 发送探测包
            if port == 80 or port == 8080:
                sock.send(b"GET / HTTP/1.0\r\n\r\n")
            # 接收banner
            banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
            sock.close()
            return banner[:100] if banner else "No banner"
        except:
            return "Unable to get banner"
    def scan_port(self, host, port):
        """扫描端口并获取信息"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            result = sock.connect_ex((host, port))
            if result == 0:
                service = self.get_service_name(port)
                banner = self.get_banner(host, port)
                self.results[port] = {
                    'service': service,
                    'banner': banner
                }
                print(f"端口 {port:5d} 开放 - {service:15s} - {banner}")
            sock.close()
        except Exception as e:
            pass
    def scan_network(self, network, ports=None):
        """扫描整个网络"""
        if ports is None:
            ports = [21, 22, 23, 25, 80, 443, 8080, 3306, 3389]
        try:
            net = ipaddress.ip_network(network, strict=False)
        except:
            print(f"无效的网络地址: {network}")
            return
        print(f"扫描网络: {network}")
        print(f"检查端口: {ports}")
        print(f"开始时间: {datetime.now()}")
        print("=" * 60)
        threads = []
        for ip in net.hosts():
            host = str(ip)
            for port in ports:
                thread = threading.Thread(
                    target=self.scan_port, 
                    args=(host, port)
                )
                threads.append(thread)
                thread.start()
                # 控制并发数
                if len(threads) >= 100:
                    for t in threads:
                        t.join()
                    threads = []
        # 等待所有线程完成
        for t in threads:
            t.join()
        print("=" * 60)
        print(f"扫描完成时间: {datetime.now()}")
        self.print_summary()
    def print_summary(self):
        """打印扫描结果汇总"""
        if not self.results:
            print("未发现开放端口")
            return
        print(f"\n发现 {len(self.results)} 个开放端口:")
        for port, info in sorted(self.results.items()):
            print(f"  {port}: {info['service']}")
# 使用示例
if __name__ == "__main__":
    scanner = AdvancedPortScanner()
    # 扫描单个主机
    print("=== 扫描本地主机 ===")
    for port in [80, 443, 3306, 8080, 22, 21]:
        scanner.scan_port("127.0.0.1", port)
    # 扫描整个网络
    # scanner.scan_network("192.168.1.0/24")

端口可用性检查工具

import socket
import sys
import time
class PortChecker:
    def __init__(self, timeout=2):
        self.timeout = timeout
    def check_port(self, host, port):
        """检查端口是否可用"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        try:
            result = sock.connect_ex((host, port))
            if result == 0:
                return True, "端口开放"
            else:
                return False, f"端口关闭 (错误码: {result})"
        except socket.gaierror:
            return False, "域名解析失败"
        except socket.timeout:
            return False, "连接超时"
        except ConnectionRefusedError:
            return False, "连接被拒绝"
        except Exception as e:
            return False, f"未知错误: {str(e)}"
        finally:
            sock.close()
    def check_multiple_ports(self, host, ports, delay=0.1):
        """检查多个端口"""
        results = {}
        print(f"正在检查主机: {host}")
        print(f"检查端口: {ports}")
        print("-" * 40)
        for port in ports:
            success, message = self.check_port(host, port)
            status = "✓ 开放" if success else "✗ 关闭"
            results[port] = success
            print(f"端口 {port:5d}: {status:10s} - {message}")
            time.sleep(delay)  # 避免扫描过快
        return results
    def wait_for_port(self, host, port, timeout=30):
        """等待端口变为可用"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            success, _ = self.check_port(host, port)
            if success:
                print(f"端口 {port} 在 {time.time() - start_time:.1f} 秒后可用")
                return True
            time.sleep(1)
        print(f"超时: {timeout} 秒内端口 {port} 未开放")
        return False
# 使用示例
if __name__ == "__main__":
    checker = PortChecker()
    # 检查单个端口
    host = "localhost"
    port = 80
    is_open, message = checker.check_port(host, port)
    print(f"端口 {port} 在 {host} 上的状态: {message}")
    # 检查多个端口
    ports_to_check = [80, 443, 3306, 5432, 6379]
    results = checker.check_multiple_ports(host, ports_to_check)
    # 等待端口可用
    # checker.wait_for_port("localhost", 8080, timeout=10)

实际应用案例:监控Web服务

import socket
import time
import smtplib
from email.mime.text import MIMEText
import json
from datetime import datetime
class ServiceMonitor:
    def __init__(self, config_file='monitor_config.json'):
        self.config_file = config_file
        self.services = self.load_config()
        self.check_interval = 60  # 默认60秒检查一次
    def load_config(self):
        """加载配置文件"""
        default_config = {
            "services": [
                {"host": "localhost", "port": 80, "name": "Web Server"},
                {"host": "localhost", "port": 3306, "name": "MySQL"},
                {"host": "google.com", "port": 443, "name": "Google HTTPS"}
            ],
            "email_alerts": {
                "enabled": False,
                "smtp_server": "smtp.gmail.com",
                "smtp_port": 587,
                "from_addr": "",
                "to_addr": "",
                "password": ""
            }
        }
        try:
            with open(self.config_file, 'r') as f:
                return json.load(f)
        except:
            print(f"配置文件不存在,使用默认配置")
            return default_config
    def check_service(self, host, port, timeout=5):
        """检查服务是否可用"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        try:
            result = sock.connect_ex((host, port))
            return result == 0
        except:
            return False
        finally:
            sock.close()
    def send_alert(self, service_name, host, port, status):
        """发送告警通知(示例用邮件)"""
        if not self.services.get('email_alerts', {}).get('enabled'):
            return
        msg = MIMEText(f"""
服务状态变更通知
时间: {datetime.now()}
服务: {service_name}
主机: {host}:{port}
状态: {status}
请及时处理!
        """)
        msg['Subject'] = f"服务告警: {service_name} - {status}"
        msg['From'] = self.services['email_alerts']['from_addr']
        msg['To'] = self.services['email_alerts']['to_addr']
        try:
            server = smtplib.SMTP(
                self.services['email_alerts']['smtp_server'],
                self.services['email_alerts']['smtp_port']
            )
            server.starttls()
            server.login(
                self.services['email_alerts']['from_addr'],
                self.services['email_alerts']['password']
            )
            server.send_message(msg)
            server.quit()
            print("告警邮件已发送")
        except Exception as e:
            print(f"发送告警失败: {e}")
    def monitor(self):
        """启动监控"""
        print("=" * 60)
        print(f"服务监控启动 - {datetime.now()}")
        print("监控的服务:")
        for service in self.services['services']:
            print(f"  {service['name']:20s} - {service['host']:15s}:{service['port']}")
        print("=" * 60)
        errors = {}
        while True:
            for service in self.services['services']:
                host = service['host']
                port = service['port']
                name = service['name']
                is_available = self.check_service(host, port)
                status = "✓ 可用" if is_available else "✗ 不可用"
                print(f"{datetime.now().strftime('%H:%M:%S')} {name:20s} - {status}")
                # 检测状态变化
                if not is_available:
                    if name not in errors:
                        errors[name] = 0
                    errors[name] += 1
                    # 连续3次失败则告警
                    if errors[name] >= 3:
                        self.send_alert(name, host, port, "服务异常")
                else:
                    if name in errors:
                        self.send_alert(name, host, port, "服务恢复")
                        del errors[name]
            time.sleep(self.check_interval)
# 使用示例
if __name__ == "__main__":
    monitor = ServiceMonitor()
    # monitor.monitor()  # 启动监控(建议在后台运行)
    # 测试单次检查
    print("\n=== 单次服务检查 ===")
    for service in monitor.services['services']:
        available = monitor.check_service(service['host'], service['port'])
        print(f"{service['name']:20s}: {'可用' if available else '不可用'}")

命令行工具版本

#!/usr/bin/env python3
import socket
import sys
import argparse
import json
from concurrent.futures import ThreadPoolExecutor
def scan_port(host, port):
    """扫描单个端口(用于线程池)"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()
        return port, result == 0
    except:
        return port, False
def scan_ports(host, start_port, end_port, threads=50):
    """使用线程池扫描端口范围"""
    print(f"正在扫描 {host} 的端口 {start_port}-{end_port}")
    print(f"使用 {threads} 个线程")
    open_ports = []
    ports = range(start_port, end_port + 1)
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = [executor.submit(scan_port, host, port) for port in ports]
        for future in futures:
            port, is_open = future.result()
            if is_open:
                open_ports.append(port)
                print(f"端口 {port} 开放")
    return sorted(open_ports)
def main():
    parser = argparse.ArgumentParser(description='Python端口扫描工具')
    parser.add_argument('host', help='目标主机IP或域名')
    parser.add_argument('-p', '--ports', default='1-1024', 
                       help='端口范围,如 80,443 或 1-1024')
    parser.add_argument('-t', '--threads', type=int, default=50,
                       help='线程数(默认50)')
    parser.add_argument('-o', '--output', help='输出文件(JSON格式)')
    parser.add_argument('--timeout', type=int, default=1,
                       help='超时时间(默认1秒)')
    args = parser.parse_args()
    # 解析端口范围
    if '-' in args.ports:
        start_port, end_port = map(int, args.ports.split('-'))
    else:
        start_port = end_port = int(args.ports)
    # 设置全局超时
    socket.setdefaulttimeout(args.timeout)
    # 扫描端口
    open_ports = scan_ports(args.host, start_port, end_port, args.threads)
    # 输出结果
    print(f"\n扫描完成!发现 {len(open_ports)} 个开放端口")
    print(f"开放端口列表: {open_ports}")
    # 保存到文件
    if args.output:
        result = {
            'host': args.host,
            'port_range': f"{start_port}-{end_port}",
            'scan_time': str(datetime.now()),
            'open_ports': open_ports
        }
        with open(args.output, 'w') as f:
            json.dump(result, f, indent=2)
        print(f"结果已保存到 {args.output}")
if __name__ == "__main__":
    main()

使用说明

安装依赖

# 基本版本不需要额外安装
# 邮件告警需要
pip install secure-smtplib

运行示例

  1. 基础端口扫描

    # 扫描本地主机端口1-1000
    scanner = PortScanner("127.0.0.1", 1, 1000)
    open_ports = scanner.scan_multithreaded(threads=50)
  2. 命令行工具

    # 扫描本地主机常用端口
    python port_scanner.py 127.0.0.1 -p 20-100

扫描特定端口并输出结果

python port_scanner.py example.com -p 80,443,3306 -o results.json

快速扫描

python port_scanner.py 192.168.1.1 -p 1-1000 -t 100


3. **服务监控**:
```python
# 创建配置文件和监控
monitor = ServiceMonitor()
monitor.monitor()

注意事项

  1. 法律合规:仅扫描自己拥有或授权的服务器
  2. 网络影响:大量扫描可能影响网络性能
  3. 安全考虑:某些端口扫描可能被IDS/IPS检测
  4. 性能优化:根据网络状况调整线程数和超时时间
  5. 错误处理:生产环境需要完善的错误处理机制

这个案例提供了从基础到高级的完整端口检测解决方案,可以根据实际需求选择合适的版本使用。

标签: Python

抱歉,评论功能暂时关闭!