作者介绍
刘宇,翼支付云原生存储领域资深专家,深耕有状态服务云原生化全链路实践,聚焦分布式数据库核心技术攻坚与开发运维一体化体系的构建。
在 OceanBase(OB)数据库规模化应用进程中 ,数据库工单平台是降低业务接入门槛、加速 OB 落地的核心载体。在使用业内常用的开源审核工具 goInception 审核 sql 因无法适配 OB-MySQL 租户的分布式执行计划格式,导致 SQL 工单审核时预估行数(est_rows)提取失败 ,大表 DML、全表扫描等高危操作难以拦截,增加了运维风险,成为了 DBA 的关键阻碍。
本文将完整拆解从 goInception 核心魔改到 Python 集成调用的全流程 ,提供 “ 问题定位 - 技术改造 - 落地验证” 的全链路方案 ,帮助 DBA 团队快速落地 OB-MySQL 租户的自动化 SQL 审核能力 ,扫清技术障碍。
一、业务痛点:OB-MySQL 租户的审核困境
OB-MySQL 租户为适配分布式架构 ,其 EXPLAIN 执行计划输出与原生 MySQL 存在本质差异:
这—差异导致原生 go-inception 完全失效:
二、核心思路:魔改 goInception 适配 OB 执行计划
本文改造的版本为 goInception V1.3.0 版本。
改造核心逻辑可概括为 “扩展字段 + 多源解析 + 兼容原有”:
goInception 的 sql 执行计划 explain 处理在 session/session_inception.go 文件中。
1、扩展 ExplainInfo 结构体:存储 OB 专属执行计划
首先在 session/common.go 中扩展执行计划存储结构体 ,新增 ObPlan 字段专门承接 OB 的 Query Plan 文本:
// ExplainInfo 执行计划信息
type ExplainInfo struct {
// gorm.Model
SelectType string `gorm:"Column:select_type"`
Table string `gorm:"Column:table"`
Partitions string `gorm:"Column:partitions"`
Type string `gorm:"Column:type"`
PossibleKeys string `gorm:"Column:possible_keys"`
Key string `gorm:"Column:key"`
KeyLen string `gorm:"Column:key_len"`
Ref string `gorm:"Column:ref"`
Rows int64 `gorm:"Column:rows"`
Filtered float32 `gorm:"Column:filtered"`
Extra string `gorm:"Column:Extra"`
// TiDB的Explain预估行数存储在Count中
Count string `gorm:"Column:count"`
// TiDB (v4.0及之后)的Explain预估行数存储在Count中
EstRows string `gorm:"Column:estRows"`
// ob_plan,用来存Ob的执行计划文本
ObPlan sql.NullString `gorm:"Column:Query Plan"`
}
采用 sql.NullString 类型可兼容 Query Plan 为 NULL 的场景 ,避免空指针异常 ,避免影响原有mysql ,tidb审核逻辑。
2、优化 getExplainInfo 函数:调度多源解析逻辑
在 session/session_inception.go 中优化核心解析函数 ,实现 “MySQL/TiDB 原有逻辑 + OB专属逻辑” 的分支处理:
func (s *session) getExplainInfo(sql string, sqlId string) {
if s.hasError() {
return
}
var newRecord *Record
if s.inc.EnableFingerprint && sqlId != "" {
newRecord = &Record{
Buf: new(bytes.Buffer),
}
}
r := s.myRecord
var rows []ExplainInfo
if err := s.rawScan(sql, &rows); err != nil {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMessage(myErr.Message)
if newRecord != nil {
newRecord.appendErrorMessage(myErr.Message)
}
} else {
s.appendErrorMessage(err.Error())
if newRecord != nil {
newRecord.appendErrorMessage(err.Error())
}
}
}
if len(rows) > 0 {
if s.inc.ExplainRule == "max" {
r.AffectedRows = 0
for _, row := range rows {
if row.Rows == 0 {
if row.Count != "" {
if f, err := strconv.ParseFloat(row.Count, 64); err == nil {
row.Rows = int64(f)
}
} else if row.EstRows != "" {
if v, err := strconv.ParseFloat(row.EstRows, 64); err == nil {
row.Rows = int64(v)
}
//改造点
} else if row.ObPlan.Valid {
row.Rows = ObRowAffect(row.ObPlan)
}
}
r.AffectedRows = Max64(r.AffectedRows, row.Rows)
}
} else {
row := rows[0]
if row.Rows == 0 {
if row.Count != "" {
if f, err := strconv.ParseFloat(row.Count, 64); err == nil {
row.Rows = int64(f)
}
} else if row.EstRows != "" {
if v, err := strconv.ParseFloat(row.EstRows, 64); err == nil {
row.Rows = int64(v)
}
//改造点
} else if row.ObPlan.Valid {
row.Rows = ObRowAffect(row.ObPlan)
}
}
r.AffectedRows = row.Rows
}
if newRecord != nil {
newRecord.AffectedRows = r.AffectedRows
}
}
if s.inc.MaxUpdateRows > 0 && r.AffectedRows >
int64(s.inc.MaxUpdateRows) {
switch r.Type.(type) {
case *ast.DeleteStmt, *ast.UpdateStmt:
s.appendErrorNo(ER_UDPATE_TOO_MUCH_ROWS,
r.AffectedRows, s.inc.MaxUpdateRows)
if newRecord != nil {
newRecord.appendErrorNo(s.inc.Lang, ER_UDPATE_TOO_MUCH_ROWS,
r.AffectedRows, s.inc.MaxUpdateRows)
}
}
}
if newRecord != nil {
s.sqlFingerprint[sqlId] = newRecord
}
}
改造点在增加两次if row.ObPlan.Valid { row.Rows = ObRowAffect(row.ObPlan)}。
3、核心解析:ObRowAffect 函数提取 OB 预估行数
这也是魔改的关键 ,也在 session/session_inception.go 添加如下函数 ,专门解析 OB 文本表格格式的 Query Plan ,提取最大预估行数:
func ObRowAffect(plan sql.NullString) int64 {
if !plan.Valid {
return 0
}
r := strings.NewReader(plan.String)
br := bufio.NewReader(r)
estrows := make([]string, 0)
for {
l, e := br.ReadString('\n')
if e != nil && len(l) == 0 {
break
}
if strings.HasPrefix(l, "|") {
r := strings.Split(l, "|")
estrows = append(estrows, strings.TrimSpace(r[4]))
}
}
var estrowMax int
for i := 1; i < len(estrows); i++ {
estrow, err := strconv.Atoi(estrows[i])
if err != nil {
continue
}
estrowMax = max(estrow, estrowMax)
}
return int64(estrowMax)
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
三、 Python 集成:封装标准化接口
魔改这两个文件后后 ,编译后就可以直接使用了 ,然后数据库工单系统(python开发)中需通 Python封装统—访问GoInception类。
1、集成代码实现
from app.common.utils.db_conn.mysql_conn import OpenMysqlDb
class GoInception:
def __init__(self) -> None:
self.go_inception_host = "localhost"
self.go_inception_user = "root"
self.go_inception_password = ""
self.go_inception_port = 4000
self.go_inception_db_name = ""
self.commit = False
def check_sql(self, host: str, user: str, password: str, port: int, database: str, sqls: str):
sql = f"""/*--host='{host}';--port={port};--user={user};--password='{password}';--check=1;max_insert_rows=10;*/
inception_magic_start;
use `{database}`;
{sqls};
inception_magic_commit;
"""
with OpenMysqlDb(
host=self.go_inception_host,
user=self.go_inception_user,
port=self.go_inception_port,
password=self.go_inception_password,
db_name=self.go_inception_db_name,
commit=self.commit,
) as conn:
conn.ping()
return conn.db_query(sql=sql)
def execute_sql(self, host: str, user: str, password: str, port: int, database: str, sqls: str, backup=0, ignore_warnings=0, fingerprint=0):
sql = f"""/*--host='{host}';--port={port};--user='{user}';--password='{password}';--execute=1;backup={backup};ignore_warnings={ignore_warnings};fingerprint={fingerprint};*/
inception_magic_start;
use `{database}`;
{sqls};
inception_magic_commit;
"""
with OpenMysqlDb(
host=self.go_inception_host,
user=self.go_inception_user,
port=self.go_inception_port,
password=self.go_inception_password,
db_name=self.go_inception_db_name,
commit=self.commit,
) as conn:
conn.ping()
r = conn.db_query(sql=sql)
return r
四、落地效果验证
1、完整流程链路
工单提交 → 识别 OB-MySQL 租户 → 调用 Python 接口→ 传递参数 → go-inception 解析Query Plan → 提取 est\_rows → 阈值校验 → 返回审核结果 → 工单系统展示/驳回
2、典型场景验证
3、核心指标
五、总结与扩展
本次落地实现了 “底层魔改 + 上层封装” 的完整闭环 ,核心优势在于无侵入兼容原有流程、零感知集成、强容错设计。
通过这套方案 ,OB 工单系统可实现 OB-MySQL 租户 SQL 的自动化精准审核 ,有效降低分布式数据库运维风险 ,让审核工具与 OB 生态深度适配。