蜜罐系统配置与攻击者绕圈策略实现方案
蜜罐系统是一种主动防御技术,通过模拟易受攻击的服务来诱骗攻击者,同时收集攻击信息。下面我将提供一套完整的蜜罐系统配置方案,包括如何让攻击者在系统中 "绕圈"(即延长攻击者在蜜罐中的停留时间,增加攻击成本)的详细策略和代码实现。
一、蜜罐系统基础架构设计
1. 系统架构选择
蜜罐系统通常分为低交互和高交互两种类型:
低交互蜜罐:模拟有限的服务和响应,资源消耗低但欺骗性较弱
高交互蜜罐:提供完整的系统环境,欺骗性高但风险和管理成本也高
对于让攻击者 "绕圈" 的场景,建议采用中高交互蜜罐,结合以下组件:
服务模拟层:模拟 SSH、HTTP、FTP 等服务
延迟响应模块:人为增加响应时间
虚假数据生成:创建看似有价值但无实际意义的虚假数据
行为分析引擎:实时分析攻击者行为并动态调整策略
2. 技术栈选择
基于 Python 的实现方案(易于扩展和调试):
# 基础依赖 requirements.txt
Flask==2.2.5
SQLAlchemy==2.0.7
paramiko==3.1.0 # SSH模拟
twisted==22.10.0 # 异步网络框架
faker==15.3.4 # 虚假数据生成二、核心绕圈策略实现
1. 延迟响应技术
通过在服务响应中引入人为延迟,延长攻击者的操作时间:
import time
import random
def delayed_response(min_delay=2, max_delay=10):
"""随机延迟响应装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
delay = random.uniform(min_delay, max_delay)
time.sleep(delay) # 随机延迟
return func(*args, **kwargs)
return wrapper
return decorator
# 在SSH命令处理中使用
@delayed_response(1, 5)
def handle_ssh_command(cmd):
if "ls" in cmd:
return "file1.txt\nfile2.log\nbackup.tar.gz"
elif "cat" in cmd:
return "This file contains sensitive data..." # 虚假内容2. 多层虚假文件系统
创建嵌套的虚假目录结构和看似有价值的文件,诱使攻击者深入探索:
from faker import Faker
fake = Faker()
def generate_fake_filesystem(depth=3, width=5):
"""生成虚假文件系统结构"""
if depth <= 0:
return {"type": "file", "name": fake.file_name(), "content": fake.text()}
fs = {}
for i in range(width):
dir_name = fake.word() + "_" + str(random.randint(1000, 9999))
fs[dir_name] = generate_fake_filesystem(depth-1, width)
return {"type": "directory", "content": fs}
# 示例输出
{
"projects_4521": {
"type": "directory",
"content": {
"src_7812": {"type": "directory", ...},
"config_9923": {"type": "file", "name": "database.conf", ...}
}
}
}3. 动态服务行为调整
根据攻击者行为实时调整蜜罐响应策略:
class BehaviorAnalyzer:
def __init__(self):
self.attack_patterns = {
'bruteforce': 0,
'exploit_attempt': 0,
'data_exfiltration': 0
}
def analyze(self, command):
if "passwd" in command or "shadow" in command:
self.attack_patterns['exploit_attempt'] += 1
return "exploit_attempt"
elif "wget" in command or "curl" in command:
self.attack_patterns['data_exfiltration'] += 1
return "data_exfiltration"
return None
def get_response_strategy(self):
"""根据攻击模式调整响应策略"""
if self.attack_patterns['exploit_attempt'] > 3:
return {"delay": (5, 15), "error_rate": 0.3} # 增加延迟和错误率
return {"delay": (1, 5), "error_rate": 0.1}三、完整蜜罐服务实现
1. SSH蜜罐实现
import socket
import threading
import paramiko
from io import StringIO
class SSHHoneypot(paramiko.ServerInterface):
def __init__(self):
self.behavior_analyzer = BehaviorAnalyzer()
self.fake_fs = generate_fake_filesystem()
def check_auth_password(self, username, password):
# 记录认证尝试
log_authentication_attempt(username, password)
return paramiko.AUTH_FAILED # 始终返回认证失败但允许连接
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
def get_allowed_auths(self, username):
return "password,publickey"
def handle_session(self, channel):
try:
channel.send("Welcome to Ubuntu 18.04.6 LTS (GNU/Linux 4.15.0-176-generic x86_64)\r\n")
while True:
channel.send("$ ")
command = ""
while not command.endswith("\n"):
recv = channel.recv(1024).decode("utf-8")
command += recv
command = command.strip()
if command == "exit":
channel.send("logout\r\n")
break
# 分析行为并调整策略
attack_type = self.behavior_analyzer.analyze(command)
strategy = self.behavior_analyzer.get_response_strategy()
# 应用延迟
time.sleep(random.uniform(*strategy['delay']))
# 随机错误响应
if random.random() < strategy['error_rate']:
channel.send("bash: {}: command not found\r\n".format(command.split())
continue
# 处理命令
response = self.process_command(command)
channel.send(response + "\r\n")
except Exception as e:
print("Session error:", e)
finally:
channel.close()
def process_command(self, command):
"""处理SSH命令并返回虚假响应"""
if command.startswith("ls"):
return self.fake_ls(command)
elif command.startswith("cat"):
return self.fake_cat(command)
elif command.startswith("cd"):
return "" # cd命令不返回内容
else:
return "{}: command executed successfully".format(command)
def fake_ls(self, command):
"""生成虚假的ls命令输出"""
parts = command.split()
if len(parts) > 1:
path = parts
# 模拟路径解析
return "file1.txt\nfile2.log\nbackup_{}.tar.gz".format(random.randint(100,999))
return "Documents\nDownloads\nPictures\nbackup_{}.tar.gz".format(random.randint(100,999))
def fake_cat(self, command):
"""生成虚假的文件内容"""
return fake.text(max_nb_chars=random.randint(100,500))2. HTTP蜜罐实现
from flask import Flask, request, Response
import random
app = Flask(__name__)
# 虚假用户数据库
fake_users = [
{"username": "admin", "password": "admin123", "role": "administrator"},
{"username": "test", "password": "test123", "role": "user"}
]
@app.route('/login', methods=['POST'])
def login():
# 延迟响应
time.sleep(random.uniform(2, 5))
username = request.form.get('username')
password = request.form.get('password')
# 记录登录尝试
log_http_request(request, username, password)
# 随机返回成功或失败
if random.random() < 0.3: # 30%几率返回成功
return {"status": "success", "token": fake.sha256()}
return {"status": "error", "message": "Invalid credentials"}, 401
@app.route('/api/users')
def list_users():
# 需要"认证"才能访问
if not request.headers.get('Authorization'):
time.sleep(random.uniform(3, 8)) # 更长的延迟
return {"error": "Unauthorized"}, 401
# 返回虚假用户列表
time.sleep(random.uniform(1, 3))
return {
"users": [
{"id": i, "username": fake.user_name(), "email": fake.email()}
for i in range(1, random.randint(5, 15))
]
}
@app.route('/download/<filename>')
def download(filename):
# 模拟大文件下载,实际返回垃圾数据
def generate():
for _ in range(random.randint(100, 500)):
time.sleep(0.1) # 增加传输延迟
yield fake.binary(length=1024) # 每次1KB假数据
return Response(generate(), mimetype='application/octet-stream')四、部署与监控方案
1. Docker容器化部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
# SSH蜜罐端口
EXPOSE 22
# HTTP蜜罐端口
EXPOSE 80 443
CMD ["python", "src/main.py"]2. Kubernetes集群部署(适合大规模部署)
参考智云蜜罐的 k8s 配置方案:
# pot-web-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: honeypot-web
namespace: honeypot-system
spec:
replicas: 3 # 多个副本应对高并发
selector:
matchLabels:
app: honeypot-web
template:
metadata:
labels:
app: honeypot-web
spec:
containers:
- name: honeypot
image: honeypot:latest
ports:
- containerPort: 80
- containerPort: 22
resources:
limits:
cpu: "1"
memory: "512Mi"3. 监控与日志分析
# 日志记录模块
def log_authentication_attempt(username, password, ip=None):
ip = ip or request.remote_addr
timestamp = datetime.now().isoformat()
log_entry = {
"timestamp": timestamp,
"event": "auth_attempt",
"username": username,
"password": password,
"source_ip": ip,
"service": "ssh"
}
# 存储到数据库或文件
save_to_database(log_entry)
# 实时分析
if is_bruteforce_attempt(ip):
alert_security_team(ip)
maybe_block_ip(ip) # 可选:在防火墙中临时封禁五、高级绕圈策略
1. 动态端口跳转
def dynamic_port_redirect(original_port):
"""动态端口重定向,让攻击者不断寻找服务"""
new_port = random.choice([8080, 8443, 2222, 3306])
time.sleep(2)
return f"Service moved to port {new_port}. Temporary redirect."2. 虚假漏洞诱导
def fake_vulnerability_response():
"""模拟存在漏洞的服务响应"""
return {
"error": "Database connection failed",
"detail": "MySQL server at 192.168.1.100 is not available",
"debug": "Try using --skip-grant-tables option"
}3. 蜜罐链式反应
当检测到高级攻击者时,可以将其引导至另一个蜜罐系统:
def redirect_to_secondary_honeypot(ip):
"""将攻击者重定向到二级蜜罐"""
secondary_ip = "192.168.100.100" # 另一个蜜罐IP
return f"Connection terminated. Try {secondary_ip} instead."六、安全注意事项
严格网络隔离:确保蜜罐系统与真实业务网络完全隔离
数据保护:所有收集的日志和攻击数据应加密存储
法律合规:在部署前了解当地关于蜜罐使用的法律法规
资源限制:为蜜罐容器设置资源限制,防止被用作 DDoS 跳板
定期更新:及时更新蜜罐指纹和模拟服务,保持欺骗性
通过以上方案,您可以构建一个能够有效延长攻击者停留时间、增加其攻击成本的蜜罐系统。实际部署时,可根据具体需求调整参数和策略,并建议结合 HFish 等成熟蜜罐平台进行扩展。
蜜罐系统配置与攻击者绕圈策略实现方案
https://uniomo.com/archives/mi-guan-xi-tong-pei-zhi-yu-gong-ji-zhe-rao-quan-ce-lue-shi-xian-fang-an