参考资料:RuleGo 官方文档 | GitHub

什么是 RuleGo?

RuleGo 是一个基于 Go 语言的轻量级、高性能、嵌入式的新一代组件编排规则引擎。它通过将业务逻辑拆解为可复用的组件,以规则链(Rule Chain)的方式灵活编排,实现业务逻辑与代码的解耦,支持运行时动态调整,无需重启服务。

RuleGo 在 2024 年荣获 Gitee 最有价值开源项目(GVP)G-Star 项目毕业认证,是 Go 生态中规则引擎领域的标杆项目。

核心特性

特性 说明
🪶 轻量级 无外部中间件依赖,可下沉到边缘设备
高性能 协程池 + 对象池,有向无环图消息路由
🔀 双模式 嵌入式组件 或 独立中间件部署
🧩 组件化 所有业务逻辑皆为组件,灵活配置复用
🔄 动态编排 运行时热更新规则链,无需重启
🔌 扩展简单 实现接口即可自定义组件
🔁 规则链嵌套 支持子规则链,实现流程复用
🛡️ 上下文隔离 高并发下数据安全,无串流风险
🎯 AOP 机制 无侵入式增强规则链执行行为
📡 数据集成 动态配置 HTTP/MQTT/Kafka/TCP 等 Endpoint

典型应用场景

  • 边缘计算:在边缘服务器预处理 IoT 数据,动态调整处理规则,可替代 node-red
  • 物联网:设备数据上报 → 规则判断 → 触发告警/联动
  • 数据分发:按消息类型路由到 HTTP、MQTT、gRPC 等不同系统
  • 应用集成:作为”胶水”连接 SSH、Webhook、Kafka、数据库、ChatGPT 等
  • 微服务编排:动态编排和驱动微服务调用链
  • 低代码平台:iPaaS、ETL、类 LangFlow 系统
  • 自动化:CI/CD、营销自动化、量化交易系统
  • 风控/积分:业务代码与业务逻辑彻底解耦

快速开始

安装

1
go get github.com/rulego/rulego

三步上手

第一步:用 JSON 定义规则链

规则链是 RuleGo 的核心概念,描述了消息如何在各节点间流转:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"ruleChain": {
"id": "rule01",
"name": "温度告警规则链",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "jsFilter",
"name": "温度过滤",
"debugMode": true,
"configuration": {
"jsScript": "return msg.temperature > 50;"
}
},
{
"id": "s2",
"type": "jsTransform",
"name": "数据转换",
"configuration": {
"jsScript": "metadata['alert']='high_temp'; msg['level']='warning'; return {'msg':msg,'metadata':metadata,'msgType':msgType};"
}
},
{
"id": "s3",
"type": "restApiCall",
"name": "推送告警",
"configuration": {
"restEndpointUrlPattern": "http://alert-service/api/notify",
"requestMethod": "POST",
"maxParallelRequestsCount": 200
}
}
],
"connections": [
{ "fromId": "s1", "toId": "s2", "type": "True" },
{ "fromId": "s2", "toId": "s3", "type": "Success" }
]
}
}

消息流向:温度过滤(>50°C)→ 数据转换 → HTTP 推送告警

第二步:创建规则引擎实例

1
2
3
4
5
6
7
8
9
10
import "github.com/rulego/rulego"

// 加载规则链 JSON
ruleFile, _ := os.ReadFile("rule_chain.json")

// 创建规则引擎实例
ruleEngine, err := rulego.New("rule01", ruleFile)
if err != nil {
panic(err)
}

第三步:投递消息

1
2
3
4
5
6
7
8
9
10
11
// 构造消息元数据
metaData := types.NewMetadata()
metaData.PutValue("deviceId", "device-001")
metaData.PutValue("productType", "sensor")

// 构造消息(负荷 + 类型)
msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData,
`{"temperature": 65, "humidity": 80}`)

// 投递给规则引擎处理
ruleEngine.OnMsg(msg)

规则链 DSL 详解

规则链 DSL 是一个 JSON 文件,完整结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"ruleChain": {
"id": "rule01", // 规则链唯一ID
"name": "示例规则链",
"root": true, // 是否为根规则链
"debugMode": false, // 调试模式(覆盖节点配置)
"configuration": {
"vars": { // 变量列表
"apiUrl": "http://api.example.com"
},
"secrets": { // 密钥列表
"apiKey": "xxx"
}
}
},
"metadata": {
"firstNodeIndex": 0, // 第一个执行节点的索引
"nodes": [], // 节点列表
"connections": [], // 连接关系列表
"endpoints": [] // 接入端点(触发器)
}
}

节点配置字段

字段 类型 说明
id string 节点唯一标识
type string 节点类型(对应组件)
name string 节点名称
debugMode boolean 是否开启调试回调
configuration object 节点配置参数
additionalInfo object 可视化坐标等扩展信息

连接关系

1
2
3
4
5
{
"fromId": "s1",
"toId": "s2",
"type": "True" // 连接类型:True/False/Success/Failure 等
}

内置标准组件

过滤器(filter)

jsFilter - JS 脚本过滤

根据 JS 脚本返回值(true/false)决定消息路由方向:

1
2
3
4
5
6
{
"type": "jsFilter",
"configuration": {
"jsScript": "return msg.temperature > 50;"
}
}

switch - 条件分支

使用 expr 表达式引擎进行多条件路由:

1
2
3
4
5
6
7
8
9
{
"type": "switch",
"configuration": {
"cases": [
{ "case": "msg.temperature > 80", "then": "critical" },
{ "case": "msg.temperature > 50", "then": "warning" }
]
}
}

内置变量idtsdatamsgmetadatatypedataType

转换器(transform)

jsTransform - JS 脚本转换

对消息内容进行灵活转换:

1
2
3
4
5
6
{
"type": "jsTransform",
"configuration": {
"jsScript": "metadata['name']='test01'; msg['addField']='value1'; return {'msg':msg,'metadata':metadata,'msgType':msgType};"
}
}

x/luaTransform - Lua 脚本转换

使用 Lua 5.1 语法进行转换(适合高性能场景):

1
2
3
4
5
6
{
"type": "x/luaTransform",
"configuration": {
"script": "msg.temperature = msg.temperature * 1.8 + 32\nmetadata.unit = 'F'\nreturn msg, metadata, msgType"
}
}

动作(action)

restApiCall - HTTP 客户端

调用外部 REST API,支持 SSE 流式响应(可对接大模型 API):

1
2
3
4
5
6
7
8
9
10
{
"type": "restApiCall",
"configuration": {
"restEndpointUrlPattern": "http://api.example.com/notify",
"requestMethod": "POST",
"headers": { "Authorization": "Bearer ${token}" },
"readTimeoutMs": 5000,
"maxParallelRequestsCount": 200
}
}

mqttClient - MQTT 推送

将消息发布到 MQTT Broker:

1
2
3
4
5
6
7
8
{
"type": "mqttClient",
"configuration": {
"server": "127.0.0.1:1883",
"topic": "/device/msg/${deviceId}",
"qos": 1
}
}

log - 日志记录

1
2
3
4
5
6
{
"type": "log",
"configuration": {
"jsScript": "return 'msg: ' + JSON.stringify(msg) + ', metadata: ' + JSON.stringify(metadata);"
}
}

高级特性

动态更新规则链

无需重启,运行时热更新:

1
2
3
4
5
6
7
8
// 更新整个规则链
err := ruleEngine.ReloadSelf(newRuleFile)

// 更新某个节点
ruleEngine.ReloadChild("node01", newNodeFile)

// 获取当前规则链 DSL
dsl := ruleEngine.DSL()

规则引擎池管理

1
2
3
4
5
6
7
8
// 批量加载文件夹下所有规则链
rulego.Load("/rules", rulego.WithConfig(config))

// 按 ID 获取规则引擎实例
ruleEngine, ok := rulego.Get("rule01")

// 删除规则引擎实例
rulego.Del("rule01")

全局配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
config := rulego.NewConfig()

// 节点调试回调(节点配置 debugMode:true 时触发)
config.OnDebug = func(chainId, flowType, nodeId string,
msg types.RuleMsg, relationType string, err error) {
log.Printf("[%s] node=%s, relation=%s, msg=%s",
flowType, nodeId, relationType, msg.Data)
}

// 脚本执行超时(默认 2000ms)
config.ScriptMaxExecutionTime = 5 * time.Second

// 全局属性(节点配置中通过 ${global.key} 引用)
config.Properties.PutValue("apiUrl", "http://api.example.com")

ruleEngine, err := rulego.New("rule01", ruleFile, rulego.WithConfig(config))

注册自定义函数(UDF)

让 JS/Lua 脚本可以调用 Go 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 注册 Go 函数
config.RegisterUdf("add", func(a, b int) int {
return a + b
})

// 注册结构体方法
type DBClient struct{}
func (d *DBClient) Query(id string) string { return "result:" + id }

config.RegisterUdf("db", &DBClient{})
// JS 中调用:let result = db.Query("123")

// 注册 JS 工具函数
config.RegisterUdf("utils", types.Script{
Type: types.Js,
Content: `var utils = { format: function(d) { return d.toString(); } }`,
})

缓存机制

跨节点、跨规则链共享数据:

1
2
3
4
5
6
7
8
// JS 脚本中使用缓存
let cache = $ctx.ChainCache(); // 规则链级别缓存
// let cache = $ctx.GlobalCache(); // 全局级别缓存

cache.Set("key", "value"); // 永不过期
cache.Set("key2", "value2", "10m"); // 10 分钟后过期
let value = cache.Get("key");
cache.Delete("key");

AOP 切面

无侵入式增强规则链行为:

1
2
3
4
5
6
7
8
9
// 添加故障降级切面:错误超过 3 次则跳过该节点 10 秒
config := rulego.NewConfig(
types.WithAspects(
&aspect.SkipFallbackAspect{
ErrorCountLimit: 3,
LimitDuration: time.Second * 10,
},
),
)

自定义组件开发

实现 types.Node 接口,三步完成自定义组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1. 实现 Node 接口
type UpperNode struct{}

func (n *UpperNode) Type() string { return "test/upper" }
func (n *UpperNode) New() types.Node { return &UpperNode{} }

func (n *UpperNode) Init(config types.Config, cfg types.Configuration) error {
// 解析节点配置
return nil
}

func (n *UpperNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
msg.Data = strings.ToUpper(msg.Data)
ctx.TellSuccess(msg) // 成功 → 下一节点
// ctx.TellFailure(msg, err) // 失败
// ctx.TellNext(msg, "Custom") // 自定义关系
}

func (n *UpperNode) Destroy() {}

// 2. 注册到组件库
rulego.Registry.Register(&UpperNode{})

// 3. 在规则链 JSON 中使用
// "type": "test/upper"

Endpoint 数据集成

通过 Endpoint DSL 动态配置数据接入端点,无需编写代码:

HTTP Endpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"id": "e1",
"type": "http",
"configuration": { "server": ":9090" },
"routers": [{
"id": "r1",
"params": ["post"],
"from": { "path": "/api/v1/msg/:chainId" },
"to": {
"path": "${chainId}",
"wait": true,
"processors": ["responseToBody"]
}
}]
}

MQTT Endpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"id": "e_mqtt",
"type": "mqtt",
"configuration": {
"server": "127.0.0.1:1883",
"username": "admin",
"password": "admin"
},
"routers": [{
"from": { "path": "#" },
"to": { "path": "default" }
}]
}

定时任务 Endpoint

1
2
3
4
5
6
7
8
{
"id": "schedule_e1",
"type": "schedule",
"routers": [{
"from": { "path": "*/1 * * * * *" },
"to": { "path": "default" }
}]
}

启动 Endpoint:

1
2
3
4
5
6
7
endpointBuf, _ := os.ReadFile("endpoint.json")
ep, err := endpoint.New("", endpointBuf,
endpoint.DynamicEndpointOptions.WithConfig(config))
ep.Start()

// 动态更新
ep.Reload([]byte(newDsl))

组件生态

组件库 说明
rulego-components 标准扩展组件库
rulego-components-ai AI/大模型场景组件
rulego-components-iot IoT 场景组件
rulego-components-etl ETL 数据处理组件
rulego-components-ci CI/CD 场景组件
RuleGo-Editor 可视化规则链编辑器
RuleGo-Server 自动化工作流平台

性能表现

RuleGo 使用**有向无环图(DAG)**表示规则链,消息只需沿图中路径处理,无需匹配所有规则,极大提升了路由效率。

测试环境:树莓派 2(900MHz Cortex-A7 × 4,1GB RAM)

规则链:JS 过滤 → JS 转换 → HTTP 推送

并发数 内存占用
100 并发 ~19MB
500 并发 ~19MB

内存占用几乎不随并发增加而增长,非常适合资源受限的边缘设备。

适用场景判断

如果你的项目符合以下任一情况,RuleGo 是理想选择:

  • ✅ 业务逻辑复杂且频繁变动,不想每次都重新部署
  • ✅ 需要对接大量第三方系统或协议
  • ✅ 构建 IoT 边缘计算或物联网平台
  • ✅ 需要低代码/可视化的业务编排能力
  • ✅ 想在 Go 项目中实现业务热部署

总结

RuleGo 以组件化 + 规则链的设计理念,将复杂的业务逻辑拆解为可复用的积木,通过 JSON DSL 灵活编排,实现了业务逻辑与代码的彻底解耦。无论是边缘计算、IoT、低代码平台还是微服务编排,RuleGo 都能提供优雅的解决方案。


相关链接