蜜罐系统配置与攻击者绕圈策略实现方案

蜜罐系统是一种主动防御技术,通过模拟易受攻击的服务来诱骗攻击者,同时收集攻击信息。下面我将提供一套完整的蜜罐系统配置方案,包括如何让攻击者在系统中 "绕圈"(即延长攻击者在蜜罐中的停留时间,增加攻击成本)的详细策略和代码实现。

一、蜜罐系统基础架构设计

1. 系统架构选择

蜜罐系统通常分为低交互和高交互两种类型:

  • 低交互蜜罐:模拟有限的服务和响应,资源消耗低但欺骗性较弱

  • 高交互蜜罐:提供完整的系统环境,欺骗性高但风险和管理成本也高

对于让攻击者 "绕圈" 的场景,建议采用中高交互蜜罐,结合以下组件:

  • 服务模拟层:模拟 SSH、HTTP、FTP 等服务

  • 延迟响应模块:人为增加响应时间

  • 虚假数据生成:创建看似有价值但无实际意义的虚假数据

  • 行为分析引擎:实时分析攻击者行为并动态调整策略

2. 技术栈选择

基于 Python 的实现方案(易于扩展和调试):

# 基础依赖 requirements.txt
Flask==2.2.5
SQLAlchemy==2.0.7
paramiko==3.1.0  # SSH模拟
twisted==22.10.0  # 异步网络框架
faker==15.3.4  # 虚假数据生成

二、核心绕圈策略实现

1. 延迟响应技术

通过在服务响应中引入人为延迟,延长攻击者的操作时间:

import time
import random

def delayed_response(min_delay=2, max_delay=10):
    """随机延迟响应装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            delay = random.uniform(min_delay, max_delay)
            time.sleep(delay)  # 随机延迟
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 在SSH命令处理中使用
@delayed_response(1, 5)
def handle_ssh_command(cmd):
    if "ls" in cmd:
        return "file1.txt\nfile2.log\nbackup.tar.gz"
    elif "cat" in cmd:
        return "This file contains sensitive data..."  # 虚假内容

2. 多层虚假文件系统

创建嵌套的虚假目录结构和看似有价值的文件,诱使攻击者深入探索:

from faker import Faker

fake = Faker()

def generate_fake_filesystem(depth=3, width=5):
    """生成虚假文件系统结构"""
    if depth <= 0:
        return {"type": "file", "name": fake.file_name(), "content": fake.text()}
    
    fs = {}
    for i in range(width):
        dir_name = fake.word() + "_" + str(random.randint(1000, 9999))
        fs[dir_name] = generate_fake_filesystem(depth-1, width)
    return {"type": "directory", "content": fs}

# 示例输出
{
    "projects_4521": {
        "type": "directory",
        "content": {
            "src_7812": {"type": "directory", ...},
            "config_9923": {"type": "file", "name": "database.conf", ...}
        }
    }
}

3. 动态服务行为调整

根据攻击者行为实时调整蜜罐响应策略:

class BehaviorAnalyzer:
    def __init__(self):
        self.attack_patterns = {
            'bruteforce': 0,
            'exploit_attempt': 0,
            'data_exfiltration': 0
        }
    
    def analyze(self, command):
        if "passwd" in command or "shadow" in command:
            self.attack_patterns['exploit_attempt'] += 1
            return "exploit_attempt"
        elif "wget" in command or "curl" in command:
            self.attack_patterns['data_exfiltration'] += 1
            return "data_exfiltration"
        return None

    def get_response_strategy(self):
        """根据攻击模式调整响应策略"""
        if self.attack_patterns['exploit_attempt'] > 3:
            return {"delay": (5, 15), "error_rate": 0.3}  # 增加延迟和错误率
        return {"delay": (1, 5), "error_rate": 0.1}

三、完整蜜罐服务实现

1. SSH蜜罐实现

import socket
import threading
import paramiko
from io import StringIO

class SSHHoneypot(paramiko.ServerInterface):
    def __init__(self):
        self.behavior_analyzer = BehaviorAnalyzer()
        self.fake_fs = generate_fake_filesystem()
    
    def check_auth_password(self, username, password):
        # 记录认证尝试
        log_authentication_attempt(username, password)
        return paramiko.AUTH_FAILED  # 始终返回认证失败但允许连接
    
    def check_channel_request(self, kind, chanid):
        return paramiko.OPEN_SUCCEEDED
    
    def get_allowed_auths(self, username):
        return "password,publickey"
    
    def handle_session(self, channel):
        try:
            channel.send("Welcome to Ubuntu 18.04.6 LTS (GNU/Linux 4.15.0-176-generic x86_64)\r\n")
            while True:
                channel.send("$ ")
                command = ""
                while not command.endswith("\n"):
                    recv = channel.recv(1024).decode("utf-8")
                    command += recv
                
                command = command.strip()
                if command == "exit":
                    channel.send("logout\r\n")
                    break
                
                # 分析行为并调整策略
                attack_type = self.behavior_analyzer.analyze(command)
                strategy = self.behavior_analyzer.get_response_strategy()
                
                # 应用延迟
                time.sleep(random.uniform(*strategy['delay']))
                
                # 随机错误响应
                if random.random() < strategy['error_rate']:
                    channel.send("bash: {}: command not found\r\n".format(command.split())
                    continue
                
                # 处理命令
                response = self.process_command(command)
                channel.send(response + "\r\n")
        except Exception as e:
            print("Session error:", e)
        finally:
            channel.close()
    
    def process_command(self, command):
        """处理SSH命令并返回虚假响应"""
        if command.startswith("ls"):
            return self.fake_ls(command)
        elif command.startswith("cat"):
            return self.fake_cat(command)
        elif command.startswith("cd"):
            return ""  # cd命令不返回内容
        else:
            return "{}: command executed successfully".format(command)
    
    def fake_ls(self, command):
        """生成虚假的ls命令输出"""
        parts = command.split()
        if len(parts) > 1:
            path = parts
            # 模拟路径解析
            return "file1.txt\nfile2.log\nbackup_{}.tar.gz".format(random.randint(100,999))
        return "Documents\nDownloads\nPictures\nbackup_{}.tar.gz".format(random.randint(100,999))
    
    def fake_cat(self, command):
        """生成虚假的文件内容"""
        return fake.text(max_nb_chars=random.randint(100,500))

2. HTTP蜜罐实现

from flask import Flask, request, Response
import random

app = Flask(__name__)

# 虚假用户数据库
fake_users = [
    {"username": "admin", "password": "admin123", "role": "administrator"},
    {"username": "test", "password": "test123", "role": "user"}
]

@app.route('/login', methods=['POST'])
def login():
    # 延迟响应
    time.sleep(random.uniform(2, 5))
    
    username = request.form.get('username')
    password = request.form.get('password')
    
    # 记录登录尝试
    log_http_request(request, username, password)
    
    # 随机返回成功或失败
    if random.random() < 0.3:  # 30%几率返回成功
        return {"status": "success", "token": fake.sha256()}
    return {"status": "error", "message": "Invalid credentials"}, 401

@app.route('/api/users')
def list_users():
    # 需要"认证"才能访问
    if not request.headers.get('Authorization'):
        time.sleep(random.uniform(3, 8))  # 更长的延迟
        return {"error": "Unauthorized"}, 401
    
    # 返回虚假用户列表
    time.sleep(random.uniform(1, 3))
    return {
        "users": [
            {"id": i, "username": fake.user_name(), "email": fake.email()}
            for i in range(1, random.randint(5, 15))
        ]
    }

@app.route('/download/<filename>')
def download(filename):
    # 模拟大文件下载,实际返回垃圾数据
    def generate():
        for _ in range(random.randint(100, 500)):
            time.sleep(0.1)  # 增加传输延迟
            yield fake.binary(length=1024)  # 每次1KB假数据
    return Response(generate(), mimetype='application/octet-stream')

四、部署与监控方案

1. Docker容器化部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app
COPY . .

RUN pip install -r requirements.txt

# SSH蜜罐端口
EXPOSE 22
# HTTP蜜罐端口
EXPOSE 80 443

CMD ["python", "src/main.py"]

2. Kubernetes集群部署(适合大规模部署)

参考智云蜜罐的 k8s 配置方案:

# pot-web-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: honeypot-web
  namespace: honeypot-system
spec:
  replicas: 3  # 多个副本应对高并发
  selector:
    matchLabels:
      app: honeypot-web
  template:
    metadata:
      labels:
        app: honeypot-web
    spec:
      containers:
      - name: honeypot
        image: honeypot:latest
        ports:
        - containerPort: 80
        - containerPort: 22
        resources:
          limits:
            cpu: "1"
            memory: "512Mi"

3. 监控与日志分析

# 日志记录模块
def log_authentication_attempt(username, password, ip=None):
    ip = ip or request.remote_addr
    timestamp = datetime.now().isoformat()
    log_entry = {
        "timestamp": timestamp,
        "event": "auth_attempt",
        "username": username,
        "password": password,
        "source_ip": ip,
        "service": "ssh"
    }
    # 存储到数据库或文件
    save_to_database(log_entry)
    
    # 实时分析
    if is_bruteforce_attempt(ip):
        alert_security_team(ip)
        maybe_block_ip(ip)  # 可选:在防火墙中临时封禁

五、高级绕圈策略

1. 动态端口跳转

def dynamic_port_redirect(original_port):
    """动态端口重定向,让攻击者不断寻找服务"""
    new_port = random.choice([8080, 8443, 2222, 3306])
    time.sleep(2)
    return f"Service moved to port {new_port}. Temporary redirect."

2. 虚假漏洞诱导

def fake_vulnerability_response():
    """模拟存在漏洞的服务响应"""
    return {
        "error": "Database connection failed",
        "detail": "MySQL server at 192.168.1.100 is not available",
        "debug": "Try using --skip-grant-tables option"
    }

3. 蜜罐链式反应

当检测到高级攻击者时,可以将其引导至另一个蜜罐系统:

def redirect_to_secondary_honeypot(ip):
    """将攻击者重定向到二级蜜罐"""
    secondary_ip = "192.168.100.100"  # 另一个蜜罐IP
    return f"Connection terminated. Try {secondary_ip} instead."

六、安全注意事项

  1. 严格网络隔离:确保蜜罐系统与真实业务网络完全隔离

  2. 数据保护:所有收集的日志和攻击数据应加密存储

  3. 法律合规:在部署前了解当地关于蜜罐使用的法律法规

  4. 资源限制:为蜜罐容器设置资源限制,防止被用作 DDoS 跳板

  5. 定期更新:及时更新蜜罐指纹和模拟服务,保持欺骗性

通过以上方案,您可以构建一个能够有效延长攻击者停留时间、增加其攻击成本的蜜罐系统。实际部署时,可根据具体需求调整参数和策略,并建议结合 HFish 等成熟蜜罐平台进行扩展。


蜜罐系统配置与攻击者绕圈策略实现方案
https://uniomo.com/archives/mi-guan-xi-tong-pei-zhi-yu-gong-ji-zhe-rao-quan-ce-lue-shi-xian-fang-an
作者
雨落秋垣
发布于
2025年10月16日
许可协议