前言
最近刚结束了一个HVV蓝队,比较头疼,甲方内部设备简直太多了,目前各个大厂都是这么玩儿的,本地MSS不好好适配不同厂家的产品,弹起来适配的问题最好的借口就是需要开发介入调整适配,不单单是在适配不同厂商的防火墙上扯皮还是在适配不同厂商探针上同样扯皮,有这扯皮功夫还不如直接写个工具一键封禁得了。其实就目前防守状态来讲,通过告警事件联动不同区域的防火墙这种技术手段太简单了,本地MSS在态感的基础上优化剧本再加上GPT介入研判已经基本上可以解决绝大部分的告警攻击了,可能目前唯一的问题可能就是出现在厂商态感底层告警逻辑上,目前探针获取的流量也只能获取非SSL流量,不做SSL卸载或者SSL证书解密的话一部分告警是无法获取的,另外常见的横向行为和隐藏流量也只能基于厂商设备底层逻辑或者剧本编排。
这次的工具功能是封禁共享情报的ip,或者是基于不同边界的不同品牌的防火墙和WAF的封禁。
下载地址:
功能介绍
- 支持多种防火墙设备统一管理(H3C,QAX,SXF,山石,K01,DP等)
- 网防K01提供黑名单批量查询、添加、删除功能
- 支持 IP 规则化、过滤、清洗等功能
- 界面简洁,操作便捷,可以选择性的根据不同的情报源头选择不同边界的设备进行封禁
- 关于产品型号可支持大部分型号,除网防K01外其它产品封禁实现原理是基于添加地址到地址簿,封禁策略需手动处理
工具介绍和代码逻辑
配置文件
配置文件存放在config/config.json中,需要提前配置json文件,负责无法使用程序。这里关于config的文件内容务必按照格式规则设置,否则无法解析。

DP防火墙
DP防火墙的登录方式是telnet,逻辑是通过添加ip地址进入地址簿,添加ip地址
package devices
import (
"BT_supertoolsV2/utils"
"fmt"
"strings"
"time"
"github.com/reiver/go-telnet"
)
type DPConfig struct {
Name string `json:"name"`
DeviceIP string `json:"device_ip"`
TelnetPort int `json:"telnet_port"`
Username string `json:"username"`
Password string `json:"password"`
AddressGroup string `json:"address_group"`
Type string `json:"type"`
}
func AddIPsToDP(cfg DPConfig, ips []string) string {
var output strings.Builder
output.WriteString(fmt.Sprintf("=== 开始配置 DP 设备 %s ===\n", cfg.DeviceIP))
// IP 规则化处理
normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
if normalizedIPs == "" {
return "没有有效的 IP 地址!"
}
ipList := strings.Split(normalizedIPs, "\n")
// 连接 Telnet
address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
conn, err := telnet.DialTo(address)
if err != nil {
return fmt.Sprintf("Telnet 连接失败: %v\n", err)
}
defer conn.Close()
// 封装发送命令函数
send := func(cmd string) {
conn.Write([]byte(cmd + "\n"))
time.Sleep(500 * time.Millisecond)
output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
}
// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
send(cfg.Username)
send(cfg.Password)
send("conf")
for _, ip := range ipList {
send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
}
send("exit")
output.WriteString("\n=== 配置完成 ===\n")
return output.String()
}
这里没有直接使用回显显示状态,没有监听服务器回显状态再输入命令,具体使用该方式是新增一个地址簿,再策略位置设置封禁策略,引用添加的地址簿即可。界面效果

H3c防火墙
采用ssh登录使用命令操作
package devices
import (
"fmt"
"strings"
"time"
"golang.org/x/crypto/ssh"
)
func AddIPsToH3C(cfg H3CConfig, ips []string) string {
var output strings.Builder
output.WriteString(fmt.Sprintf("=== 开始配置 H3C 设备 %s ===\n\n", cfg.DeviceIP))
config := &ssh.ClientConfig{
User: cfg.Username,
Auth: []ssh.AuthMethod{
ssh.Password(cfg.Password),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 10 * time.Second,
}
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
if err != nil {
return fmt.Sprintf("SSH连接失败: %v\n", err)
}
defer client.Close()
session, err := client.NewSession()
if err != nil {
return fmt.Sprintf("创建会话失败: %v\n", err)
}
defer session.Close()
stdin, err := session.StdinPipe()
if err != nil {
return fmt.Sprintf("获取输入管道失败: %v\n", err)
}
modes := ssh.TerminalModes{
ssh.ECHO: 0,
ssh.TTY_OP_ISPEED: 14400,
ssh.TTY_OP_OSPEED: 14400,
}
if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
return fmt.Sprintf("设置终端失败: %v\n", err)
}
if err := session.Shell(); err != nil {
return fmt.Sprintf("启动 shell 失败: %v\n", err)
}
commands := []string{
"system-view",
fmt.Sprintf("object-group ip %s", cfg.AddressGroup),
}
for _, ip := range ips {
commands = append(commands, fmt.Sprintf("network host address %s", ip))
}
commands = append(commands, "exit")
for _, cmd := range commands {
fmt.Fprintf(stdin, "%s\n", cmd)
time.Sleep(300 * time.Millisecond)
output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
}
output.WriteString("\n=== 配置完成 ===\n")
return output.String()
}
这里采用了监听回显,核心代码是
commands := []string{
"system-view",
fmt.Sprintf("object-group ip %s", cfg.AddressGroup),
}
for _, ip := range ips {
commands = append(commands, fmt.Sprintf("network host address %s", ip))
}
commands = append(commands, "exit")
当然这里可以增加优化,代码内关于ip地址的添加以及删除等操作都可以做,工具后期可以依托命令功能增加多个模块化设计,当然为了降低操作风险的话,这里黑名单封禁的操作足够使用了。

山石防火墙
山石登录方式是telnet登录使用命令操作
import (
"BT_supertoolsV2/utils"
"fmt"
"strings"
"time"
"github.com/reiver/go-telnet"
)
type HSConfig struct {
Name string `json:"name"`
DeviceIP string `json:"device_ip"`
TelnetPort int `json:"telnet_port"`
Username string `json:"username"`
Password string `json:"password"`
AddressGroup string `json:"address_group"`
Type string `json:"type"`
}
func AddIPsToHillstone(cfg HSConfig, ips []string) string {
var output strings.Builder
output.WriteString(fmt.Sprintf("=== 开始配置 Hillstone 设备 %s ===\n", cfg.DeviceIP))
// IP 规则化处理
normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
if normalizedIPs == "" {
return "没有有效的 IP 地址!"
}
ipList := strings.Split(normalizedIPs, "\n")
// 连接 Telnet
address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
conn, err := telnet.DialTo(address)
if err != nil {
return fmt.Sprintf("Telnet 连接失败: %v\n", err)
}
defer conn.Close()
// 封装发送命令函数
send := func(cmd string) {
conn.Write([]byte(cmd + "\n"))
time.Sleep(500 * time.Millisecond)
output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
}
// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
send(cfg.Username)
send(cfg.Password)
send("conf")
for _, ip := range ipList {
send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
}
send("exit")
output.WriteString("\n=== 配置完成 ===\n")
return output.String()
}
山石防火墙的命令基本和华三防火墙的命令基本一致,目前基本上可以支持大多数版本型号的防火墙,使用前可以做测试。

网盾K01
网盾K01的话这里采用的是关于api的利用,目前关于网盾K01的黑名单添加的模式是和防火墙不一致的,可以采用接口的方式增加。目前代码中关于黑名单添加的事件设置的是3600小时。
// 批量封禁
func AddBlacklist(devices []K01Device, ips []string, output *widget.Entry) {
for _, device := range devices {
appendOutput(output, fmt.Sprintf("🔑 正在登录设备 [https://%s]...", device.IP))
// 登录并获取 Token
url := fmt.Sprintf("https://%s", device.IP)
token := Login(url, device.Username, device.Password, output, device.Name)
if token == "" {
appendOutput(output, fmt.Sprintf("❌ 登录失败: %s", device.Name))
continue
}
// 逐个 IP 添加到黑名单
for _, ip := range ips {
// 如果 IP 地址没有 CIDR 后缀,添加 "/32"
if !strings.Contains(ip, "/") {
ip += "/32"
}
// 打印调试日志,确认每个 IP 地址
appendOutput(output, fmt.Sprintf("🔍 封禁 IP: %s", ip))
// 准备请求 payload
payload := map[string]interface{}{
"color": 0,
"device_mask": []int{224},
"items": []map[string]interface{}{
{
"type": 0,
"ip": ip, // 这里单独封禁每个 IP 地址
"timeout": 336,
"time_type": "3600",
"device_mask": []int{224},
"comment": "自动封禁",
},
},
"method": "add",
}
// 打印调试日志,确认请求 Payload
payloadBytes, _ := json.Marshal(payload)
// appendOutput(output, fmt.Sprintf("🔍 请求 Payload: %s", string(payloadBytes)))
// 请求封禁
req, err := http.NewRequest("POST", url+"/api/v1/security/iplist/save", bytes.NewBuffer(payloadBytes))
if err != nil {
appendOutput(output, fmt.Sprintf("❌ 创建封禁请求失败: %s", err.Error()))
continue
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
// 执行请求
resp, err := insecureClient.Do(req)
if err != nil {
appendOutput(output, fmt.Sprintf("❌ 封禁请求失败: %s", err.Error()))
continue
}
defer resp.Body.Close()
// // 打印调试日志,确认响应代码
// appendOutput(output, fmt.Sprintf("🔍 响应状态: %s", resp.Status))
// 解析响应
body, _ := ioutil.ReadAll(resp.Body)
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
appendOutput(output, fmt.Sprintf("❌ 解析封禁响应失败: %s", err.Error()))
continue
}
// // 打印调试日志,确认响应结果
// appendOutput(output, fmt.Sprintf("🔍 响应结果: %v", result))
// 根据结果显示成功或失败
if success, ok := result["success"].(bool); ok && success {
appendOutput(output, fmt.Sprintf("✅ 添加成功: %s", ip))
} else {
appendOutput(output, fmt.Sprintf("❌ 添加失败: %v", result["msg"]))
}
}
}
}
// DeleteBlacklist 删除 IP 从黑名单
func DeleteBlacklist(devices []K01Device, selected []int, ipList []string, outputBox *widget.Entry) {
for _, idx := range selected {
device := devices[idx] // 获取当前选择的设备
url := fmt.Sprintf("https://%s", device.IP)
// 输出正在登录设备信息
appendOutput(outputBox, fmt.Sprintf("🔄 正在登录设备【%s】...", device.Name))
// 登录设备,获取 token
token := Login(url, device.Username, device.Password, outputBox, device.Name)
if token == "" {
appendOutput(outputBox, fmt.Sprintf("❌ 设备【%s】登录失败,无法删除黑名单", device.Name))
continue
}
// 遍历 IP 列表,直接进行删除操作
for _, ip := range ipList {
apiUrl := fmt.Sprintf("%s/api/v1/security/iplist/save", url)
payload := fmt.Sprintf(`{
"color": 0,
"items": [{"id": "%s/32;0;0", "type": 0, "ip": "%s/32", "device_mask": [224]}],
"method": "delete"
}`, ip, ip)
// 创建请求
req, err := http.NewRequest("POST", apiUrl, bytes.NewBuffer([]byte(payload)))
if err != nil {
appendOutput(outputBox, "❌ 创建删除请求失败: "+err.Error())
continue
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
// 发送请求
resp, err := insecureClient.Do(req)
if err != nil {
appendOutput(outputBox, "❌ 删除请求失败: "+err.Error())
continue
}
defer resp.Body.Close()
// 读取响应
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
appendOutput(outputBox, "❌ 读取删除响应失败: "+err.Error())
continue
}
// 解析响应
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
appendOutput(outputBox, "❌ 解析删除响应失败: "+err.Error())
continue
}
// 检查删除是否成功
if success, ok := result["success"].(bool); ok && success {
appendOutput(outputBox, fmt.Sprintf("✅ 删除成功,IP: %s", ip))
} else {
// 如果失败,输出错误信息
if msg, ok := result["msg"].(string); ok {
appendOutput(outputBox, fmt.Sprintf("❌ 删除失败: %s", msg))
} else {
appendOutput(outputBox, "❌ 删除失败,未知错误")
}
}
}
}
关于api的调用的话是有其自己的参数规则的,因为认证方式是https,所以关于身份认证的话需要忽略ssl证书。因为考虑到删除函数的逻辑需先查询要封禁的黑名单是否在黑名单列表,所以这里就执行的是直接删除,否则就删除失败,但是对于功能性的查询模块的功能是有的,由于网盾k01的产品特性,一点发现全网阻断,所以这里基本上用不到删除黑名单的功能。

QAX网神防火墙
网神联动利用的是ssh登录执行命令,所以这里权限也相对来说比较大,对于蓝队来讲不建议增加其它代码功能,原理也是利用策略引用地址簿进行封禁
func AddIPsToQAX(cfg QAXConfig, ips []string) string {
var output strings.Builder
output.WriteString(fmt.Sprintf("=== 开始配置奇安信设备 %s ===\n\n", cfg.DeviceIP))
config := &ssh.ClientConfig{
User: cfg.Username,
Auth: []ssh.AuthMethod{
ssh.Password(cfg.Password),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 15 * time.Second,
}
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
if err != nil {
return fmt.Sprintf("SSH连接失败: %v\n", err)
}
defer client.Close()
session, err := client.NewSession()
if err != nil {
return fmt.Sprintf("创建会话失败: %v\n", err)
}
defer session.Close()
stdin, err := session.StdinPipe()
if err != nil {
return fmt.Sprintf("获取输入管道失败: %v\n", err)
}
modes := ssh.TerminalModes{
ssh.ECHO: 0,
ssh.TTY_OP_ISPEED: 14400,
ssh.TTY_OP_OSPEED: 14400,
}
if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
return fmt.Sprintf("设置终端失败: %v\n", err)
}
if err := session.Shell(); err != nil {
return fmt.Sprintf("启动shell失败: %v\n", err)
}
commands := []string{
"config terminal",
fmt.Sprintf("object address %s", cfg.AddressGroup),
}
for _, ip := range ips {
commands = append(commands, fmt.Sprintf("network %s 32", ip))
}
commands = append(commands, "exit")
for _, cmd := range commands {
fmt.Fprintf(stdin, "%s\n", cmd)
time.Sleep(500 * time.Millisecond)
output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
}
output.WriteString("\n=== 配置完成 ===\n")
return output.String()
}
向地址簿中添加ip地址核心代码
commands := []string{
"config terminal",
fmt.Sprintf("object address %s", cfg.AddressGroup),
}
for _, ip := range ips {
commands = append(commands, fmt.Sprintf("network %s 32", ip))
}
commands = append(commands, "exit")
目前我也是查询了多款网神防火墙的手册,关于地址簿添加这块儿的命令版本是没有变化的,基本上支持大多数版本型号。
SXF_AF和WAF
这里SXF的设备是有两种模式可供选择的,但是目前SXF的ssh登录一般是由两段密码组成还有可能经常性的修改,所以这里使用的API的方式向地址簿中添加要封禁的IP地址,但是这里的话,策略务必要配置正确。
// 新增的结构体,用于表示包含 devices 字段的 JSON 数据结构
type DeviceList struct {
Devices []SXFDevice `json:"devices"`
}
// 登录响应结构体
type loginResponse struct {
Code int `json:"code"`
Data struct {
LoginResult struct {
Token string `json:"token"`
} `json:"loginResult"`
} `json:"data"`
Message string `json:"message"`
}
// 添加 IP 请求结构体
type ipRange struct {
Start string `json:"start"`
}
type addIPRequest struct {
IPRanges []ipRange `json:"ipRanges"`
}
// ✅ 防冲突:明确为 SXF 的登录函数
// 登录获取 token
func SXFLogin(device SXFDevice) (string, error) {
// 打印设备信息,确保 IP、用户名和密码正确
fmt.Printf("设备信息: IP=%s, Username=%s, Password=%s\n", device.IP, device.Username, device.Password)
url := fmt.Sprintf("https://%s/api/v1/namespaces/public/login", device.IP)
payload := map[string]string{
"name": device.Username,
"password": device.Password,
}
jsonData, _ := json.Marshal(payload)
client := &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
}
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return "", fmt.Errorf("登录请求失败: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
// 打印响应内容进行调试
fmt.Println("响应体内容:", string(body))
var result struct {
Code int `json:"code"`
Message string `json:"message"`
Data struct {
LoginResult struct {
Token string `json:"token"`
} `json:"loginResult"`
} `json:"data"`
}
if err := json.Unmarshal(body, &result); err != nil {
return "", fmt.Errorf("解析登录响应失败: %v", err)
}
// 检查返回的 code
if result.Code != 0 {
return "", fmt.Errorf("登录失败: %s", result.Message)
}
// 返回 token
return result.Data.LoginResult.Token, nil
}
// 加载 JSON 文件并解析为设备切片
// 加载 JSON 文件并解析为设备切片
func loadSXFDevices(filePath string) ([]SXFDevice, error) {
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("设备配置文件加载失败: %v", err)
}
// 输出加载的 JSON 数据(用于调试)
fmt.Printf("加载的 JSON 数据: %s\n", string(data))
var devices []SXFDevice
err = json.Unmarshal(data, &devices)
if err != nil {
return nil, fmt.Errorf("设备解析失败: %v", err)
}
// 输出解析后的设备信息(调试)
fmt.Printf("加载的设备数量: %d\n", len(devices))
for _, device := range devices {
// 输出每台设备的详细信息,特别是 AddressGroup 字段
fmt.Printf("设备信息:Name=%s, IP=%s, AddressGroup=%s\n", device.Name, device.IP, device.AddressGroup)
}
return devices, nil
}
// ✅ 添加多个 IP 到 SXF 地址组(支持日志回调)
// 在 AddToSXFBlacklist 函数内部,确保请求头正确设置
// 设备添加到黑名单的函数
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
// 添加多个 IP 到 SXF 地址组(支持日志回调)
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
func AddToSXFBlacklist(devices []SXFDevice, ips []string, logFn func(string)) error {
for _, device := range devices {
// 1. 登录
token, err := SXFLogin(device)
if err != nil {
logFn(fmt.Sprintf("【%s】❌ 登录失败: %v", device.Name, err))
continue
}
logFn(fmt.Sprintf("【%s】✅ 登录成功", device.Name))
// 2. 添加每个IP
for _, ip := range ips {
ipRanges := []map[string]string{{"start": ip, "end": ip}}
requestData := map[string]interface{}{"ipRanges": ipRanges}
requestDataJSON, _ := json.Marshal(requestData)
// 3. 发送请求
url := fmt.Sprintf(
"https://%s/api/v1/namespaces/public/ipgroups/%s?_arrayop=add",
device.IP,
device.AddressGroup,
)
req, _ := http.NewRequest("PATCH", url, bytes.NewBuffer(requestDataJSON))
req.Header = http.Header{
"token": []string{token},
"Content-Type": []string{"application/json"},
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
resp, err := client.Do(req)
if err != nil {
logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %v", device.Name, ip, err))
continue
}
defer resp.Body.Close()
// 4. 处理响应
body, _ := io.ReadAll(resp.Body)
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
logFn(fmt.Sprintf("【%s】❌ IP %s 响应解析失败: %v", device.Name, ip, err))
continue
}
if code, ok := result["code"].(float64); ok && code == 0 {
logFn(fmt.Sprintf("【%s】✅ IP %s 已添加到地址组[%s]",
device.Name, ip, device.AddressGroup))
} else {
msg, _ := result["message"].(string)
logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %s", device.Name, ip, msg))
}
}
}
return nil
}
如果想要扩展功能的话不建议使用API去扩展,毕竟太麻烦了,各种参数比较麻烦,没有ssh的方式登录使用命令操作简便。

整体界面效果

总结
其它厂商的防火墙确实没找到手册,有的是找到手册没有测试的设备,目前上述的设备和代码是完全没有问题的,如果想添加其它厂商的设备,可以给我操作手册,我这边更新新版本发布。每次更新后的授权时间是3个月
。