Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: add session configuration in toml #847

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ type DataSource struct {
Router *router.Table
RouteTargetSet map[string]struct{} `json:"-"`

Conn *sql.DB
Conn *sql.DB
SessionConfig SessionConfig `toml:"session" json:"session"`
}

// IsAutoSnapshot returns true if the tidb_snapshot is expected to automatically
Expand Down Expand Up @@ -192,6 +193,14 @@ func (d *DataSource) ToDriverConfig() *mysql.Config {
cfg.TLSConfig = d.Security.TLSName
}

for param, value := range d.SessionConfig {
switch v := value.(type) {
case string:
cfg.Params[param] = "\"" + v + "\""
default:
cfg.Params[param] = fmt.Sprintf("%v", v)
}
}
return cfg
}

Expand Down Expand Up @@ -357,6 +366,9 @@ func (t *TaskConfig) ComputeConfigHash() (string, error) {
return fmt.Sprintf("%x", sha256.Sum256(hash)), nil
}

// SessionConfig the the session level configuration for data source.
type SessionConfig map[string]any

// Config is the configuration.
type Config struct {
*flag.FlagSet `json:"-"`
Expand Down
13 changes: 13 additions & 0 deletions sync_diff_inspector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ check-struct-only = false
port = 3306
user = "root"
password = ""
session-config = "mysql_config"
# MySQL doesn't has snapshot config

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
session-config = "tidb_config"

# Support tls connection
# security.ca-path = "..."
Expand Down Expand Up @@ -71,3 +73,14 @@ index-fields = [""]
ignore-columns = ["",""]
chunk-size = 0
collation = ""

######################### Session config #########################
# Optional
[session-configs.mysql_config]
max_execution_time = 3600
optimizer_switch = "index_merge=on,batched_key_access=off"
net_read_timeout = 60

[session-configs.tidb_config]
tidb_opt_prefer_range_scan = "ON"
max_execution_time = 86400
8 changes: 4 additions & 4 deletions sync_diff_inspector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestParseConfig(t *testing.T) {

require.Nil(t, cfg.Parse([]string{"--config", "config.toml"}))
require.Nil(t, cfg.Init())
require.Nil(t, cfg.Task.Init(cfg.DataSources, cfg.TableConfigs))
require.Nil(t, cfg.Task.Init(cfg.DataSources, cfg.TableConfigs, cfg.SessionConfigs))

require.Nil(t, cfg.Parse([]string{"--config", "config_sharding.toml"}))
// we change the config from config.toml to config_sharding.toml
Expand All @@ -44,16 +44,16 @@ func TestParseConfig(t *testing.T) {
require.Nil(t, cfg.Parse([]string{"--config", "config_sharding.toml"}))
// this time will be ok, because we remove the last outputDir.
require.Nil(t, cfg.Init())
require.Nil(t, cfg.Task.Init(cfg.DataSources, cfg.TableConfigs))
require.Nil(t, cfg.Task.Init(cfg.DataSources, cfg.TableConfigs, cfg.SessionConfigs))

require.True(t, cfg.CheckConfig())

// we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000
require.JSONEq(t, cfg.String(),
"{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
"{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"session-configs\":{\"mysql_config\":{\"max_execution_time\":3600,\"net_read_timeout\":60,\"optimizer_switch\":\"index_merge=on,batched_key_access=off\"},\"tidb_config\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"session-config\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
hash, err := cfg.Task.ComputeConfigHash()
require.NoError(t, err)
require.Equal(t, hash, "c080f9894ec24aadb4aaec1109cd1951454f09a1233f2034bc3b06e0903cb289")
require.Equal(t, hash, "ae60c7b787ff067e19c3d8f84b24eb19af095e05707d622890e5cd012c5442f5")

require.True(t, cfg.TableConfigs["config1"].Valid())

Expand Down
9 changes: 8 additions & 1 deletion sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package source
import (
"context"
"database/sql"
"fmt"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -271,12 +272,15 @@ func initDBConn(ctx context.Context, cfg *config.Config) error {
// so the connection count need to be cfg.SplitThreadCount + cfg.CheckThreadCount + cfg.CheckThreadCount.
targetConn, err := common.ConnectMySQL(cfg.Task.TargetInstance.ToDriverConfig(), cfg.SplitThreadCount+2*cfg.CheckThreadCount)
if err != nil {
log.Error(fmt.Sprintf("failed to configure session for data source '%s'", cfg.Task.Target),
zap.String("error", err.Error()),
)
return errors.Trace(err)
}

cfg.Task.TargetInstance.Conn = targetConn

for _, source := range cfg.Task.SourceInstances {
for sourceIdx, source := range cfg.Task.SourceInstances {
// If it is still set to AUTO it means it was not set on the target.
// We require it to be set to AUTO on both.
if source.IsAutoSnapshot() {
Expand All @@ -285,6 +289,9 @@ func initDBConn(ctx context.Context, cfg *config.Config) error {
// connect source db with target db time_zone
conn, err := common.ConnectMySQL(source.ToDriverConfig(), cfg.SplitThreadCount+2*cfg.CheckThreadCount)
if err != nil {
log.Error(fmt.Sprintf("failed to configure session for data source '%s'", cfg.Task.Source[sourceIdx]),
zap.String("error", err.Error()),
)
return errors.Trace(err)
}
source.Conn = conn
Expand Down