参考资料:RuleGo 官方文档 | GitHub
什么是 RuleGo?
RuleGo 是一个基于 Go 语言的轻量级、高性能、嵌入式的新一代组件编排规则引擎。它通过将业务逻辑拆解为可复用的组件,以规则链(Rule Chain)的方式灵活编排,实现业务逻辑与代码的解耦,支持运行时动态调整,无需重启服务。
RuleGo 在 2024 年荣获 Gitee 最有价值开源项目(GVP) 和 G-Star 项目毕业认证,是 Go 生态中规则引擎领域的标杆项目。
核心特性
| 特性 |
说明 |
| 🪶 轻量级 |
无外部中间件依赖,可下沉到边缘设备 |
| ⚡ 高性能 |
协程池 + 对象池,有向无环图消息路由 |
| 🔀 双模式 |
嵌入式组件 或 独立中间件部署 |
| 🧩 组件化 |
所有业务逻辑皆为组件,灵活配置复用 |
| 🔄 动态编排 |
运行时热更新规则链,无需重启 |
| 🔌 扩展简单 |
实现接口即可自定义组件 |
| 🔁 规则链嵌套 |
支持子规则链,实现流程复用 |
| 🛡️ 上下文隔离 |
高并发下数据安全,无串流风险 |
| 🎯 AOP 机制 |
无侵入式增强规则链执行行为 |
| 📡 数据集成 |
动态配置 HTTP/MQTT/Kafka/TCP 等 Endpoint |
典型应用场景
- 边缘计算:在边缘服务器预处理 IoT 数据,动态调整处理规则,可替代 node-red
- 物联网:设备数据上报 → 规则判断 → 触发告警/联动
- 数据分发:按消息类型路由到 HTTP、MQTT、gRPC 等不同系统
- 应用集成:作为”胶水”连接 SSH、Webhook、Kafka、数据库、ChatGPT 等
- 微服务编排:动态编排和驱动微服务调用链
- 低代码平台:iPaaS、ETL、类 LangFlow 系统
- 自动化:CI/CD、营销自动化、量化交易系统
- 风控/积分:业务代码与业务逻辑彻底解耦
快速开始
安装
1
| go get github.com/rulego/rulego
|
三步上手
第一步:用 JSON 定义规则链
规则链是 RuleGo 的核心概念,描述了消息如何在各节点间流转:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| { "ruleChain": { "id": "rule01", "name": "温度告警规则链", "root": true }, "metadata": { "nodes": [ { "id": "s1", "type": "jsFilter", "name": "温度过滤", "debugMode": true, "configuration": { "jsScript": "return msg.temperature > 50;" } }, { "id": "s2", "type": "jsTransform", "name": "数据转换", "configuration": { "jsScript": "metadata['alert']='high_temp'; msg['level']='warning'; return {'msg':msg,'metadata':metadata,'msgType':msgType};" } }, { "id": "s3", "type": "restApiCall", "name": "推送告警", "configuration": { "restEndpointUrlPattern": "http://alert-service/api/notify", "requestMethod": "POST", "maxParallelRequestsCount": 200 } } ], "connections": [ { "fromId": "s1", "toId": "s2", "type": "True" }, { "fromId": "s2", "toId": "s3", "type": "Success" } ] } }
|
消息流向:温度过滤(>50°C)→ 数据转换 → HTTP 推送告警
第二步:创建规则引擎实例
1 2 3 4 5 6 7 8 9 10
| import "github.com/rulego/rulego"
ruleFile, _ := os.ReadFile("rule_chain.json")
ruleEngine, err := rulego.New("rule01", ruleFile) if err != nil { panic(err) }
|
第三步:投递消息
1 2 3 4 5 6 7 8 9 10 11
| metaData := types.NewMetadata() metaData.PutValue("deviceId", "device-001") metaData.PutValue("productType", "sensor")
msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, `{"temperature": 65, "humidity": 80}`)
ruleEngine.OnMsg(msg)
|
规则链 DSL 详解
规则链 DSL 是一个 JSON 文件,完整结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| { "ruleChain": { "id": "rule01", "name": "示例规则链", "root": true, "debugMode": false, "configuration": { "vars": { "apiUrl": "http://api.example.com" }, "secrets": { "apiKey": "xxx" } } }, "metadata": { "firstNodeIndex": 0, "nodes": [], "connections": [], "endpoints": [] } }
|
节点配置字段
| 字段 |
类型 |
说明 |
id |
string |
节点唯一标识 |
type |
string |
节点类型(对应组件) |
name |
string |
节点名称 |
debugMode |
boolean |
是否开启调试回调 |
configuration |
object |
节点配置参数 |
additionalInfo |
object |
可视化坐标等扩展信息 |
连接关系
1 2 3 4 5
| { "fromId": "s1", "toId": "s2", "type": "True" }
|
内置标准组件
过滤器(filter)
jsFilter - JS 脚本过滤
根据 JS 脚本返回值(true/false)决定消息路由方向:
1 2 3 4 5 6
| { "type": "jsFilter", "configuration": { "jsScript": "return msg.temperature > 50;" } }
|
switch - 条件分支
使用 expr 表达式引擎进行多条件路由:
1 2 3 4 5 6 7 8 9
| { "type": "switch", "configuration": { "cases": [ { "case": "msg.temperature > 80", "then": "critical" }, { "case": "msg.temperature > 50", "then": "warning" } ] } }
|
内置变量:id、ts、data、msg、metadata、type、dataType
对消息内容进行灵活转换:
1 2 3 4 5 6
| { "type": "jsTransform", "configuration": { "jsScript": "metadata['name']='test01'; msg['addField']='value1'; return {'msg':msg,'metadata':metadata,'msgType':msgType};" } }
|
使用 Lua 5.1 语法进行转换(适合高性能场景):
1 2 3 4 5 6
| { "type": "x/luaTransform", "configuration": { "script": "msg.temperature = msg.temperature * 1.8 + 32\nmetadata.unit = 'F'\nreturn msg, metadata, msgType" } }
|
动作(action)
restApiCall - HTTP 客户端
调用外部 REST API,支持 SSE 流式响应(可对接大模型 API):
1 2 3 4 5 6 7 8 9 10
| { "type": "restApiCall", "configuration": { "restEndpointUrlPattern": "http://api.example.com/notify", "requestMethod": "POST", "headers": { "Authorization": "Bearer ${token}" }, "readTimeoutMs": 5000, "maxParallelRequestsCount": 200 } }
|
mqttClient - MQTT 推送
将消息发布到 MQTT Broker:
1 2 3 4 5 6 7 8
| { "type": "mqttClient", "configuration": { "server": "127.0.0.1:1883", "topic": "/device/msg/${deviceId}", "qos": 1 } }
|
log - 日志记录
1 2 3 4 5 6
| { "type": "log", "configuration": { "jsScript": "return 'msg: ' + JSON.stringify(msg) + ', metadata: ' + JSON.stringify(metadata);" } }
|
高级特性
动态更新规则链
无需重启,运行时热更新:
1 2 3 4 5 6 7 8
| err := ruleEngine.ReloadSelf(newRuleFile)
ruleEngine.ReloadChild("node01", newNodeFile)
dsl := ruleEngine.DSL()
|
规则引擎池管理
1 2 3 4 5 6 7 8
| rulego.Load("/rules", rulego.WithConfig(config))
ruleEngine, ok := rulego.Get("rule01")
rulego.Del("rule01")
|
全局配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| config := rulego.NewConfig()
config.OnDebug = func(chainId, flowType, nodeId string, msg types.RuleMsg, relationType string, err error) { log.Printf("[%s] node=%s, relation=%s, msg=%s", flowType, nodeId, relationType, msg.Data) }
config.ScriptMaxExecutionTime = 5 * time.Second
config.Properties.PutValue("apiUrl", "http://api.example.com")
ruleEngine, err := rulego.New("rule01", ruleFile, rulego.WithConfig(config))
|
注册自定义函数(UDF)
让 JS/Lua 脚本可以调用 Go 函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| config.RegisterUdf("add", func(a, b int) int { return a + b })
type DBClient struct{} func (d *DBClient) Query(id string) string { return "result:" + id }
config.RegisterUdf("db", &DBClient{})
config.RegisterUdf("utils", types.Script{ Type: types.Js, Content: `var utils = { format: function(d) { return d.toString(); } }`, })
|
缓存机制
跨节点、跨规则链共享数据:
1 2 3 4 5 6 7 8
| let cache = $ctx.ChainCache();
cache.Set("key", "value"); cache.Set("key2", "value2", "10m"); let value = cache.Get("key"); cache.Delete("key");
|
AOP 切面
无侵入式增强规则链行为:
1 2 3 4 5 6 7 8 9
| config := rulego.NewConfig( types.WithAspects( &aspect.SkipFallbackAspect{ ErrorCountLimit: 3, LimitDuration: time.Second * 10, }, ), )
|
自定义组件开发
实现 types.Node 接口,三步完成自定义组件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| type UpperNode struct{}
func (n *UpperNode) Type() string { return "test/upper" } func (n *UpperNode) New() types.Node { return &UpperNode{} }
func (n *UpperNode) Init(config types.Config, cfg types.Configuration) error { return nil }
func (n *UpperNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) { msg.Data = strings.ToUpper(msg.Data) ctx.TellSuccess(msg) }
func (n *UpperNode) Destroy() {}
rulego.Registry.Register(&UpperNode{})
|
Endpoint 数据集成
通过 Endpoint DSL 动态配置数据接入端点,无需编写代码:
HTTP Endpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| { "id": "e1", "type": "http", "configuration": { "server": ":9090" }, "routers": [{ "id": "r1", "params": ["post"], "from": { "path": "/api/v1/msg/:chainId" }, "to": { "path": "${chainId}", "wait": true, "processors": ["responseToBody"] } }] }
|
MQTT Endpoint
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "id": "e_mqtt", "type": "mqtt", "configuration": { "server": "127.0.0.1:1883", "username": "admin", "password": "admin" }, "routers": [{ "from": { "path": "#" }, "to": { "path": "default" } }] }
|
定时任务 Endpoint
1 2 3 4 5 6 7 8
| { "id": "schedule_e1", "type": "schedule", "routers": [{ "from": { "path": "*/1 * * * * *" }, "to": { "path": "default" } }] }
|
启动 Endpoint:
1 2 3 4 5 6 7
| endpointBuf, _ := os.ReadFile("endpoint.json") ep, err := endpoint.New("", endpointBuf, endpoint.DynamicEndpointOptions.WithConfig(config)) ep.Start()
ep.Reload([]byte(newDsl))
|
组件生态
性能表现
RuleGo 使用**有向无环图(DAG)**表示规则链,消息只需沿图中路径处理,无需匹配所有规则,极大提升了路由效率。
测试环境:树莓派 2(900MHz Cortex-A7 × 4,1GB RAM)
规则链:JS 过滤 → JS 转换 → HTTP 推送
| 并发数 |
内存占用 |
| 100 并发 |
~19MB |
| 500 并发 |
~19MB |
内存占用几乎不随并发增加而增长,非常适合资源受限的边缘设备。
适用场景判断
如果你的项目符合以下任一情况,RuleGo 是理想选择:
- ✅ 业务逻辑复杂且频繁变动,不想每次都重新部署
- ✅ 需要对接大量第三方系统或协议
- ✅ 构建 IoT 边缘计算或物联网平台
- ✅ 需要低代码/可视化的业务编排能力
- ✅ 想在 Go 项目中实现业务热部署
总结
RuleGo 以组件化 + 规则链的设计理念,将复杂的业务逻辑拆解为可复用的积木,通过 JSON DSL 灵活编排,实现了业务逻辑与代码的彻底解耦。无论是边缘计算、IoT、低代码平台还是微服务编排,RuleGo 都能提供优雅的解决方案。
相关链接