Skip to content

Commit

Permalink
sink(ticdc): fix incorrect default field (#12038) (#12053)
Browse files Browse the repository at this point in the history
close #12037
  • Loading branch information
ti-chi-bot authored Feb 12, 2025
1 parent df8c488 commit 1e1482b
Show file tree
Hide file tree
Showing 11 changed files with 13 additions and 46 deletions.
6 changes: 3 additions & 3 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,13 +1629,13 @@ func TestNewDMRowChange(t *testing.T) {
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2,
},
}
recoveredTI := model.BuildTiDBTableInfo(cdcTableInfo.TableName.Table, cols, cdcTableInfo.IndexColumnsOffset)
Expand Down
1 change: 0 additions & 1 deletion cdc/model/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func columnFromV1(c *codecv1.Column) *model.Column {
Charset: c.Charset,
Flag: c.Flag,
Value: c.Value,
Default: c.Default,
ApproximateBytes: c.ApproximateBytes,
}
}
11 changes: 0 additions & 11 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/table/tables"
datumTypes "github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -497,13 +496,3 @@ func (ti *TableInfo) GetPrimaryKeyColumnNames() []string {
}
return result
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
}
defaultDatum := datumTypes.NewDatum(defaultValue)
return defaultDatum.GetValue()
}
5 changes: 1 addition & 4 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ func columnData2Column(col *ColumnData, tableInfo *TableInfo) *Column {
Collation: colInfo.GetCollate(),
Flag: *tableInfo.ColumnsFlag[colID],
Value: col.Value,
Default: GetColumnDefaultValue(colInfo),
}
}

Expand Down Expand Up @@ -652,7 +651,6 @@ type Column struct {
Collation string `msg:"collation"`
Flag ColumnFlagType `msg:"-"`
Value interface{} `msg:"-"`
Default interface{} `msg:"-"`

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `msg:"-"`
Expand Down Expand Up @@ -1360,7 +1358,7 @@ func (x ColumnDataX) GetFlag() ColumnFlagType {

// GetDefaultValue return default value.
func (x ColumnDataX) GetDefaultValue() interface{} {
return GetColumnDefaultValue(x.info)
return x.info.GetDefaultValue()
}

// GetColumnInfo returns column info.
Expand Down Expand Up @@ -1388,7 +1386,6 @@ func Columns2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) {
info.Columns[i].SetType(column.Type)
info.Columns[i].SetCharset(column.Charset)
info.Columns[i].SetCollate(column.Collation)
info.Columns[i].DefaultValue = column.Default

info.ColumnsFlag[columnID] = new(ColumnFlagType)
*info.ColumnsFlag[columnID] = column.Flag
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo, outputColumnID bool
if mysql.HasNotNullFlag(col.GetFlag()) {
t.Nullable = "false"
}
t.Default = model.GetColumnDefaultValue(col)
t.Default = col.GetDefaultValue()

switch col.GetType() {
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration:
Expand Down
5 changes: 4 additions & 1 deletion pkg/sink/codec/debezium/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,10 @@ func (c *dbzCodec) writeDebeziumFieldValue(
col model.ColumnDataX,
ft *types.FieldType,
) error {
value := getValue(col)
value := col.Value
if value == nil {
value = col.GetDefaultValue()
}
if value == nil {
writer.WriteNullField(col.GetName())
return nil
Expand Down
8 changes: 0 additions & 8 deletions pkg/sink/codec/debezium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/binary"
"fmt"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
)

Expand All @@ -43,13 +42,6 @@ func getBitFromUint64(n int, v uint64) []byte {
return buf[:numBytes]
}

func getValue(col model.ColumnDataX) any {
if col.Value == nil {
return col.GetDefaultValue()
}
return col.Value
}

func getSchemaTopicName(namespace string, schema string, table string) string {
return fmt.Sprintf("%s.%s.%s",
common.SanitizeName(namespace),
Expand Down
13 changes: 0 additions & 13 deletions pkg/sink/codec/debezium/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,9 @@ package debezium
import (
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestGetValue(t *testing.T) {
column := &model.Column{
Default: 1,
}
data := model.Column2ColumnDataXForTest(column)
v := getValue(data)
require.Equal(t, v, int64(1))
data.Value = 2
v = getValue(data)
require.Equal(t, v, 2)
}

func TestGetSchemaTopicName(t *testing.T) {
namespace := "default"
schema := "1A.B"
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
"nullable": !mysql.HasNotNullFlag(col.GetFlag()),
"default": nil,
}
defaultValue := model.GetColumnDefaultValue(col)
defaultValue := col.GetDefaultValue()
if defaultValue != nil {
// according to TiDB source code, the default value is converted to string if not nil.
column["default"] = map[string]interface{}{
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,15 +1487,15 @@ func TestEncodeLargeEventsNormal(t *testing.T) {

obtainedDefaultValues := make(map[string]interface{}, len(obtainedDDL.TableInfo.Columns))
for _, col := range obtainedDDL.TableInfo.Columns {
obtainedDefaultValues[col.Name.O] = model.GetColumnDefaultValue(col)
obtainedDefaultValues[col.Name.O] = col.GetDefaultValue()
switch col.GetType() {
case mysql.TypeFloat, mysql.TypeDouble:
require.Equal(t, 0, col.GetDecimal())
default:
}
}
for _, col := range ddlEvent.TableInfo.Columns {
expected := model.GetColumnDefaultValue(col)
expected := col.GetDefaultValue()
obtained := obtainedDefaultValues[col.Name.O]
require.Equal(t, expected, obtained)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func newColumnSchema(col *timodel.ColumnInfo) *columnSchema {
tp.Decimal = col.GetDecimal()
}

defaultValue := model.GetColumnDefaultValue(col)
defaultValue := col.GetDefaultValue()
if defaultValue != nil && col.GetType() == mysql.TypeBit {
defaultValue = common.MustBinaryLiteralToInt([]byte(defaultValue.(string)))
}
Expand Down

0 comments on commit 1e1482b

Please sign in to comment.