扩展 AuditAI¶
添加新检测器、工具、MCP 工具和知识源。
添加新检测器¶
1. 创建工具¶
在 src/tools/ 中创建新文件:
# src/tools/my_detector.py
from ..utils.logger import get_logger
logger = get_logger(__name__)
class MyDetector:
def __init__(self):
pass
def analyze(self, contract_path: str) -> list[dict]:
"""Analyze a contract and return findings."""
# Your detection logic here
return [
{
"check": "my-finding-type",
"impact": "High",
"location": "function:line",
"description": "Description of the finding",
}
]
2. 在 ToolKit 中注册¶
添加到 src/tools/__init__.py:
from .my_detector import MyDetector
class ToolKit:
def __init__(self):
# ... existing tools ...
self.my_detector = MyDetector()
3. 在 Auditor 中使用¶
在 src/agents/auditor.py 中添加新方法:
async def _run_my_detector(self, contract_path: str) -> list[dict]:
logger.info("Running MyDetector analysis")
results = self.tools.my_detector.analyze(contract_path)
return [
{
"id": f"my-detector-{i}",
"type": r["check"],
"severity": r["impact"],
"location": r["location"],
"description": r["description"],
"source": "my-detector",
}
for i, r in enumerate(results)
]
4. 添加到并行检测¶
在 detect() 中将检测器添加到 asyncio.gather():
slither_results, aderyn_results, llm_results, my_results = await asyncio.gather(
self._run_slither(contract_path),
self._run_aderyn(contract_path),
self._run_llm_analysis(sanitized_code),
self._run_my_detector(contract_path),
)
5. 注册到检测器家族¶
添加到 _DETECTOR_FAMILIES 用于共识评分:
_DETECTOR_FAMILIES = ("slither", "aderyn", "mimo", "ba", "ta", "expert1", "expert2", "triager", "my-detector")
添加新工具到 Toolkit¶
1. 创建工具¶
# src/tools/my_tool.py
class MyTool:
def run(self, **kwargs):
# Your tool logic
return {"result": "..."}
2. 注册¶
# src/tools/__init__.py
from .my_tool import MyTool
class ToolKit:
def __init__(self):
self.my_tool = MyTool()
添加新 MCP 工具¶
1. 定义工具¶
在 src/mcp/mcp_server.py 中,添加到 _register_default_tools():
self.tools["my_tool"] = {
"name": "my_tool",
"description": "Description of what my_tool does",
"inputSchema": {
"type": "object",
"properties": {
"input_param": {
"type": "string",
"description": "Description of input_param",
},
},
"required": ["input_param"],
},
}
2. 添加处理程序¶
async def _handle_my_tool(self, arguments: dict) -> dict:
input_param = arguments.get("input_param")
# Your logic here
return {"result": f"Processed {input_param}"}
3. 在调度中注册¶
在 _dispatch_tool() 中:
async def _dispatch_tool(self, tool_name: str, arguments: dict) -> dict:
if tool_name == "analyze_contract":
return await self._handle_analyze_contract(arguments)
elif tool_name == "my_tool":
return await self._handle_my_tool(arguments)
# ... etc
扩展知识库¶
添加新文档¶
from src.knowledge import KnowledgeBase
kb = KnowledgeBase()
await kb.initialize()
await kb.add_document({
"id": "my-doc-1",
"content": "Reentrancy occurs when an external call...",
"metadata": {
"type": "vulnerability_reference",
"language": "solidity",
"vuln_category": "reentrancy",
},
})
添加新数据源¶
在 src/knowledge/knowledge_base.py 中创建加载方法:
async def _load_my_source(self):
"""Load documents from my custom source."""
source_dir = Path("data/my-source")
if not source_dir.exists():
return
for md_file in source_dir.rglob("*.md"):
content = md_file.read_text()
chunks = self._chunk_markdown(content)
for i, chunk in enumerate(chunks):
self.documents.append({
"id": f"my-source-{md_file.stem}-{i}",
"content": chunk,
"metadata": {"type": "my_source", "source_path": str(md_file)},
})
从 initialize() 调用:
async def initialize(self):
await self._load_context_repo()
await self._load_knowledge_dir()
await self._load_solodit_reports()
await self._load_my_source() # <-- 添加此处
await self._initialize_embeddings()
await self._create_collection()