Skip to content

obgnail/mysql-river

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Mysql River

worktop

introduction

解析 mysql binlog,提供简单易用的同步方案。

内置三个电池:

  • TraceLog:将 binlog 实时翻译为 sql 语句。
  • ElasticSearchSync:将 binlog 的数据同步到 es 中。
  • KafkaBroker:将 binlog 的数据同步到 Kafka 中,和 MySQL 彻底解耦。

feather

mysql-river 内置 auto position saver 和 auto health checker 两个功能:

  • auto position saver:自动记录 river 的处理进展,将其保存为 master.info 文件。当 river 挂掉重启后依旧可以恢复进展,不必担心数据丢失。
  • auto health checker:提供健康检测接口。当 river 的进展和 mysql binlog 的进展差值超过阈值时,触发对应函数。

health check rule

对比 master.info(file-pos) 和 canal.GetMasterPos()(db-pos) 的 position 信息,当触发规则时,调用对应函数。可以对接自动告警功能。

  • 当获取 db-pos 失败时, 健康状态为 red
  • 当 db-pos 跟 file-pos 相差在阈值内, 健康状态为 green
  • 当 db-pos 跟 file-pos 相关在阈值外时, 健康状态为 yellow
  • 当 db-pos 跟 上次记录的 db-pos 没有变化时,且file-pos 跟 上次记录的 file-pos 没有变化时,且 db-pos 跟 file-pos 相等时 健康状态为 green
  • 当 db-pos 跟 上次记录的 db-pos 没有变化时,且file-pos 跟 上次记录的 file-pos 没有变化时,且 db-pos 大于 file-pos 时 健康状态为 red
  • 当 db-pos 跟 上次记录的 db-pos 没有变化时,且file-pos 跟 上次记录的 file-pos 有变化时, 健康状态为 green
  • 当 db-pos 跟 上次记录的 db-pos 有变化时, 且 file-pos 跟 上次记录的 file-pos 没有变化时, 健康状态为 red
  • 当 db-pos 跟 上次记录的 db-pos 有变化时, 且 file-pos 跟 上次记录的 file-pos 有变化时, 健康状态为 green

Usage

只需实现 Handler 接口:

  • OnEvent:核心函数。river 会自动解析 mysql binlog 文件,将 20+ 种 event 归纳为 insert、update、delete、ddl、gtid、xid、rotate、table_changed 几种。
  • OnAlert:auto health check 不通过时自动调用此函数,可以对接自动告警功能。
  • OnClose:river 发生不可恢复错误时,自动调用此函数,可以用此关闭 handler 或对接自动告警功能。
type Handler interface {
	String() string
	OnEvent(event *EventData) error
	OnAlert(msg *StatusMsg) error
	OnClose(river *River) // OnEvent、OnAlert抛出的error会触发OnClose
}
type EventData struct {
	// insert、update、delete、ddl、gtid、xid、rotate、table_changed
	EventType string                 `json:"event_type"`
	ServerID  uint32                 `json:"server_id"`
	LogName   string                 `json:"log_name"`
	LogPos    uint32                 `json:"log_pos"`
	Db        string                 `json:"db"`
	Table     string                 `json:"table"`
	SQL       string                 `json:"sql"` // 仅当EventType为ddl有值
	GTIDSet   string                 `json:"gtid_set"`
	Primary   []string               `json:"primary"`   // 主键字段;EventType为insert、update、delete时有值
	Before    map[string]interface{} `json:"before"`    // 变更前数据, insert 类型的 before 为空
	After     map[string]interface{} `json:"after"`     // 变更后数据, delete 类型的 after 为空
	Timestamp uint32                 `json:"timestamp"` // 事件时间
}
type StatusMsg struct {
	Status        HealthStatus
	LastStatus    HealthStatus
	Reason        []string // 发生告警时的消息(可能有多条不通过)
	FilePos       *mysql.Position
	DBPos         *mysql.Position
	CheckInterval time.Duration
	PosThreshold  int
}

example

package main

import (
	"fmt"
	"github.com/obgnail/mysql-river/river"
	"time"
)

var config = &river.Config{
	MySQLConfig: &river.MySQLConfig{
		Host:     "127.0.0.1",
		Port:     3306,
		User:     "root",
		Password: "root",
	},
	PosAutoSaverConfig: &river.PosAutoSaverConfig{
		SaveDir:      "./",
		SaveInterval: 3 * time.Second,
	},
	HealthCheckerConfig: &river.HealthCheckerConfig{
		CheckPosThreshold: 3000,
		CheckInterval:     5 * time.Second,
	},
}

func main() {
	err := river.New(config).
		SetHandler(river.NopCloserAlerter(func(event *river.EventData) error {
			fmt.Println(event.EventType, event.LogName, event.LogPos, event.Before, event.After)
			return nil
		})).
		Sync(river.FromFile) // 从 master.info 文件开始解析
	PanicIfError(err)
}

Built-in battery

trace log

image-20230205212745428

var config = &river.Config{
	MySQLConfig: &river.MySQLConfig{
		Host:     "127.0.0.1",
		Port:     3306,
		User:     "root",
		Password: "root",
	},
	PosAutoSaverConfig: &river.PosAutoSaverConfig{
		SaveDir:      "./",
		SaveInterval: 3 * time.Second,
	},
	HealthCheckerConfig: &river.HealthCheckerConfig{
		CheckPosThreshold: 3000,
		CheckInterval:     5 * time.Second,
	},
}

func main() {
	traceConfig := &trace_log.Config{
		DBs:          []string{"testdb01"},
		EntireFields: false,
		ShowTxMsg:    true,
		Highlight:    true,
	}
	handler := trace_log.New(traceConfig)
	err := river.New(config).SetHandler(handler).Sync(river.FromDB) // 从最新位置开始解析
	PanicIfError(err)
}

elastic search sync

var config = &river.Config{
	MySQLConfig: &river.MySQLConfig{
		Host:     "127.0.0.1",
		Port:     3306,
		User:     "root",
		Password: "root",
	},
	PosAutoSaverConfig: &river.PosAutoSaverConfig{
		SaveDir:      "./",
		SaveInterval: 3 * time.Second,
	},
	HealthCheckerConfig: &river.HealthCheckerConfig{
		CheckPosThreshold: 3000,
		CheckInterval:     5 * time.Second,
	},
}

func main() {
	handlerConfig := &elasticsearch.EsHandlerConfig{
		Host:          "127.0.0.1",
		Port:          9200,
		User:          "",
		Password:      "",
		BulkSize:      128,
		FlushInterval: time.Second,
		SkipNoPkTable: true,
		Rules: []*elasticsearch.Rule{
			elasticsearch.NewDefaultRule("testdb01", "user"),
		},
	}
	handler := elasticsearch.New(handlerConfig)
	err := river.New(config).SetHandler(handler).Sync(river.FromDB)
	PanicIfError(err)
}

kafka broker

因为引入了 kafka 这个组件,谁也不能保证 kafka 不会挂掉,进而引入了bbolt,系统会自动在指定位置生成 kafka_offset.bolt。该文件会自动记录所有 partition 的 offset,并且在下次启动 river 的时候自动加载在此文件并自动进行偏移处理。

故,此机制是透明的。

var config = &river.Config{
	MySQLConfig: &river.MySQLConfig{
		Host:     "127.0.0.1",
		Port:     3306,
		User:     "root",
		Password: "root",
	},
	PosAutoSaverConfig: &river.PosAutoSaverConfig{
		SaveDir:      "./",
		SaveInterval: 3 * time.Second,
	},
	HealthCheckerConfig: &river.HealthCheckerConfig{
		CheckPosThreshold: 3000,
		CheckInterval:     5 * time.Second,
	},
}

func main() {
	kafkaConfig := &kafka.Config{
		Addrs:           []string{"127.0.0.1:9092"},
		Topic:           "binlog",
		OffsetStoreDir:  "./",
		Offset:          nil,
		UseOldestOffset: false,
	}
	handler, err := kafka.New(kafkaConfig)
	PanicIfError(err)
	go handler.Consume(func(msg *sarama.ConsumerMessage) error {
		fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n",
			msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
		return nil
	})
	err = river.New(config).SetHandler(handler).Sync(river.FromFile)
	PanicIfError(err)
}

About

监控 binlog,执行自定义的 handler

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages