当前位置: 首页 > news >正文

CAI:开源网络安全AI框架,打造自主安全测试智能体

项目概述

CAI(Cybersecurity AI)是一个开源的网络安全AI框架,旨在构建能够自主执行完整网络安全攻击链的智能体系统。该项目通过模块化的AI智能体架构,实现了从侦察到权限提升的全流程自动化安全测试,在CTF挑战中表现优异,平均比人类专家快11倍。

功能特性

  • 多智能体架构:包含红队、蓝队、DFIR、代码执行、内存分析等专业安全智能体
  • 自主攻击链执行:支持完整的网络安全攻击链自动化,包括侦察、漏洞利用、权限提升
  • 基准测试体系:集成CyberMetric、SecEval、CTIBench等多个安全评估基准
  • 隐私保护评估:提供CyberPII-Bench基准,专门评估LLM在安全场景中的PII处理能力
  • 并行执行模式:支持多智能体并行执行和协作模式
  • 记忆管理系统:基于RAG的记忆存储和检索,支持跨安全演练的知识复用
  • 工具集成丰富:集成Shodan、Nmap、Burp Suite等主流安全工具

安装指南

环境要求

  • Python 3.8+
  • Docker(可选,用于虚拟化环境)
  • 主流操作系统(Linux/macOS/Windows)

安装步骤

  1. 克隆项目仓库:
git clone https://github.com/aliasrobotics/CAI.git
cd CAI
  1. 安装依赖包:
pip install setuptools wheel twine build
  1. 安装核心框架:
python3 -m build
pip install -e .
  1. 安装基准测试依赖:
pip install cvss
git submodule update --init --recursive
  1. 配置环境变量(在.env文件中):
OPENAI_API_KEY="your_api_key"
ANTHROPIC_API_KEY="your_anthropic_key"
OPENROUTER_API_KEY="your_openrouter_key"
OLLAMA_API_BASE="http://localhost:11434"

使用说明

基本使用

运行基准测试评估:

python benchmarks/eval.py --model ollama/qwen2.5:14b \--dataset_file benchmarks/cybermetric/CyberMetric-80-v1.json \--eval cybermetric --backend ollama

启动CLI交互界面:

python -m cai.cli

智能体使用示例

红队智能体执行渗透测试:

from cai.agents.red_teamer import redteam_agent# 执行侦察任务
result = redteam_agent.run("对目标网络进行端口扫描和服务识别")

蓝队智能体进行防御分析:

from cai.agents.blue_team import blueteam_agent# 执行安全监控
result = blueteam_agent.run("检查系统日志中的异常登录行为")

并行执行模式

配置并行智能体执行:

from cai.repl.commands.parallel import ParallelConfig# 配置红蓝队并行执行
parallel_configs = [ParallelConfig("redteam_agent"),ParallelConfig("blueteam_agent")
]

核心代码

红队智能体实现

"""Red Team Base Agent"""
import os
from dotenv import load_dotenv
from cai.sdk.agents import Agent, OpenAIChatCompletionsModel
from openai import AsyncOpenAI
from cai.util import load_prompt_template, create_system_prompt_renderer
from cai.tools.command_and_control.sshpass import run_ssh_command_with_credentials
from cai.tools.reconnaissance.generic_linux_command import generic_linux_command
from cai.tools.web.search_web import make_google_search
from cai.tools.reconnaissance.exec_code import execute_code
from cai.tools.reconnaissance.shodan import shodan_search, shodan_host_infoload_dotenv()# 加载系统提示词
bug_bounter_system_prompt = load_prompt_template("prompts/system_bug_bounter.md")# 定义工具列表
tools = [generic_linux_command,execute_code,shodan_search,shodan_host_info
]if os.getenv('GOOGLE_SEARCH_API_KEY') and os.getenv('GOOGLE_SEARCH_CX'):tools.append(make_google_search)# 创建红队智能体实例
bug_bounter_agent = Agent(name="Bug Bounter",instructions=create_system_prompt_renderer(bug_bounter_system_prompt),description="""Agent that specializes in bug bounty hunting and vulnerability discovery.Expert in web security, API testing, and responsible disclosure.""",tools=tools,model=OpenAIChatCompletionsModel(model=os.getenv('CAI_MODEL', "alias0"),openai_client=AsyncOpenAI(),)
)

代码执行智能体

"""A Coding Agent (CodeAgent)"""
import copy
import platform
import re
import signal
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, Unionfrom cai.agents.meta.local_python_executor import LocalPythonInterpreter
from cai.sdk.agents import Agent, Result, OpenAIChatCompletionsModel
from dotenv import load_dotenv
from openai import AsyncOpenAIclass CodeAgentException(Exception):"""Base exception class for CodeAgent-related errors."""passclass CodeAgent(Agent):"""CodeAgent that uses executable Python code to consolidate LLM agents' actions."""def __init__(self, model_name: str = "alias0"):# 初始化Python解释器self.interpreter = LocalPythonInterpreter()# 初始化父类super().__init__(name="CodeAgent",description="Agent that uses executable Python code for security testing",instructions="""You are a CodeAgent specialized in cybersecurity tasks.Use Python code to analyze, exploit, and assess security vulnerabilities.""",model=OpenAIChatCompletionsModel(model=model_name,openai_client=AsyncOpenAI(),),tools=[self.execute_python_code])def execute_python_code(self, code: str) -> str:"""Execute Python code in a secure environment."""try:result = self.interpreter.execute(code)return str(result)except Exception as e:return f"Execution error: {str(e)}"

基准测试评估系统

"""Benchmark Evaluation Script"""
import argparse
import json
from typing import Dict, Listdef evaluate_model(model_name: str, dataset_file: str, eval_type: str, backend: str):"""评估语言模型在网络安全相关多选题基准上的表现Args:model_name: 模型名称(如 "gpt-4", "qwen2.5:14b")dataset_file: 数据集文件路径(JSON或TSV格式)eval_type: 评估类型("cybermetric", "seceval", "cti_bench")backend: 后端类型("openai", "openrouter", "ollama", "anthropic")"""# 加载数据集if dataset_file.endswith('.json'):with open(dataset_file, 'r') as f:dataset = json.load(f)else:dataset = load_tsv_dataset(dataset_file)# 初始化评估器evaluator = get_evaluator(eval_type)# 运行评估results = []for item in dataset:response = query_model(model_name, item['question'], backend)score = evaluator.evaluate(response, item['answer'])results.append(score)# 计算总体得分final_score = sum(results) / len(results)return final_scoredef main():parser = argparse.ArgumentParser(description='网络安全AI基准评估')parser.add_argument('-m', '--model', required=True, help='要评估的模型名称')parser.add_argument('-d', '--dataset_file', required=True, help='数据集文件路径')parser.add_argument('-e', '--eval', required=True, help='评估基准类型')parser.add_argument('-B', '--backend', required=True, help='后端类型')args = parser.parse_args()score = evaluate_model(args.model, args.dataset_file, args.eval, args.backend)print(f"模型 {args.model} 在 {args.eval} 基准上的得分: {score:.2%}")

记忆管理系统

"""Memory agent for CAI with RAG support"""
import os
from typing import Dict, List
from cai.sdk.agents import Agent
from cai.rag.vector_db import VectorDatabaseclass MemoryAgent(Agent):"""记忆管理智能体,支持长期经验存储和检索"""def __init__(self):# 初始化向量数据库self.vector_db = VectorDatabase()super().__init__(name="Memory Agent",description="Manages long-term memory experiences across security exercises",instructions="""You manage episodic and semantic memory for security exercises.Store experiences in vector database and retrieve using RAG pipeline.""",tools=[self.store_memory, self.retrieve_memory])def store_memory(self, experience: str, exercise_type: str) -> str:"""存储安全演练经验到记忆库"""embedding = self.vector_db.embed_text(experience)memory_id = self.vector_db.store(embedding, experience, exercise_type)return f"Memory stored with ID: {memory_id}"def retrieve_memory(self, query: str, exercise_type: str = None) -> List[Dict]:"""从记忆库检索相关经验"""query_embedding = self.vector_db.embed_text(query)results = self.vector_db.search(query_embedding, exercise_type)return results

更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
公众号二维码

http://www.wxhsa.cn/company.asp?id=5025

相关文章:

  • GAS中,负责封装技能所影响的目标数据(如 Actor、位置、碰撞结果等)-FGameplayAbilityTargetData
  • 详细介绍:Maven入门_简介、安装与配置
  • 实用指南:立体校正原理
  • train-labels.idx1-ubyte里是什么
  • 滑动窗口最大值-leetcode
  • 创建预测窗口-ScopedPredictionWindow();
  • 95. 不同的二叉搜索树 II
  • lc1028-从先序遍历还原二叉树
  • P12558 [UOI 2024] Heroes and Monsters 题解
  • 加把劲——2025 年中总结
  • Ability-GetCurrentActorInfo()-IsLocallyControlled()和APawn::IsLocallyControlled()
  • 应该遵守的代码规范与读《数学之美》有感
  • AbilitySystemComponent和AbilityTask
  • AT_arc171_c [ARC171C] Swap on Tree
  • 202509_QQ_冷门的Base家族
  • SpawnActorDeferred()和SpawnActorOfClass()
  • 【QT】信号和槽
  • 学习日报|线程池专题学习总结 - 详解
  • 如何设计业务架构 - 智慧园区
  • snmp协议
  • 刷题复习(四)二分搜索
  • aardio | 通过点击checkbox复选框本身判断是否勾选
  • 项目介绍
  • 新媒体运营用AI排版工具|10分钟搞定公众号图文的全流程指南
  • 练习第一天学习的内容
  • 常见小错误 FREQUENTLY MADE MISTAKES IN OI
  • ctf工具整理
  • 力扣39题 组合总和
  • 250915 jave se简单过完一遍
  • 详细介绍:Linux相关概念和易错知识点(44)(IP地址、子网和公网、NAPT、代理)