Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#12038
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
wk989898 authored and ti-chi-bot committed Feb 8, 2025
1 parent 54a2ec5 commit 192e395
Show file tree
Hide file tree
Showing 11 changed files with 2,197 additions and 9 deletions.
6 changes: 3 additions & 3 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1627,13 +1627,13 @@ func TestNewDMRowChange(t *testing.T) {
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2,
},
}
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
Expand Down
1 change: 0 additions & 1 deletion cdc/model/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func columnFromV1(c *codecv1.Column) *model.Column {
Charset: c.Charset,
Flag: c.Flag,
Value: c.Value,
Default: c.Default,
ApproximateBytes: c.ApproximateBytes,
}
}
4 changes: 3 additions & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/table/tables"
datumTypes "github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -390,6 +389,7 @@ func (ti *TableInfo) GetPrimaryKeyColumnNames() []string {
}
return result
}
<<<<<<< HEAD

// GetSchemaName returns the schema name of the table
func (ti *TableInfo) GetSchemaName() string {
Expand Down Expand Up @@ -456,3 +456,5 @@ func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
defaultDatum := datumTypes.NewDatum(defaultValue)
return defaultDatum.GetValue()
}
=======
>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))
165 changes: 165 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,57 @@ func (r *RowChangedEvent) IsUpdate() bool {
return len(r.PreColumns) != 0 && len(r.Columns) != 0
}

<<<<<<< HEAD
=======
func columnData2Column(col *ColumnData, tableInfo *TableInfo) *Column {
colID := col.ColumnID
offset, ok := tableInfo.columnsOffset[colID]
if !ok {
log.Panic("invalid column id",
zap.Int64("columnID", colID),
zap.Any("tableInfo", tableInfo))
}
colInfo := tableInfo.Columns[offset]
return &Column{
Name: colInfo.Name.O,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Collation: colInfo.GetCollate(),
Flag: *tableInfo.ColumnsFlag[colID],
Value: col.Value,
}
}

func columnDatas2Columns(cols []*ColumnData, tableInfo *TableInfo) []*Column {
if cols == nil {
return nil
}
columns := make([]*Column, len(cols))
nilColumnNum := 0
for i, colData := range cols {
if colData == nil {
nilColumnNum++
continue
}
columns[i] = columnData2Column(colData, tableInfo)
}
log.Debug("meet nil column data",
zap.Any("nilColumnNum", nilColumnNum),
zap.Any("tableInfo", tableInfo))
return columns
}

// GetColumns returns the columns of the event
func (r *RowChangedEvent) GetColumns() []*Column {
return columnDatas2Columns(r.Columns, r.TableInfo)
}

// GetPreColumns returns the pre columns of the event
func (r *RowChangedEvent) GetPreColumns() []*Column {
return columnDatas2Columns(r.PreColumns, r.TableInfo)
}

>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))
// PrimaryKeyColumnNames return all primary key's name
func (r *RowChangedEvent) PrimaryKeyColumnNames() []string {
var result []string
Expand Down Expand Up @@ -526,13 +577,34 @@ func (r *RowChangedEvent) ApproximateBytes() int {

// Column represents a column value in row changed event
type Column struct {
<<<<<<< HEAD
Name string `json:"name" msg:"name"`
Type byte `json:"type" msg:"type"`
Charset string `json:"charset" msg:"charset"`
Collation string `json:"collation" msg:"collation"`
Flag ColumnFlagType `json:"flag" msg:"-"`
Value interface{} `json:"value" msg:"-"`
Default interface{} `json:"default" msg:"-"`
=======
Name string `msg:"name"`
Type byte `msg:"type"`
Charset string `msg:"charset"`
Collation string `msg:"collation"`
Flag ColumnFlagType `msg:"-"`
Value interface{} `msg:"-"`

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `msg:"-"`
}

// ColumnData represents a column value in row changed event
type ColumnData struct {
// ColumnID may be just a mock id, because we don't store it in redo log.
// So after restore from redo log, we need to give every a column a mock id.
// The only guarantee is that the column id is unique in a RowChangedEvent
ColumnID int64 `json:"column_id" msg:"column_id"`
Value interface{} `json:"value" msg:"-"`
>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `json:"-" msg:"-"`
Expand Down Expand Up @@ -957,3 +1029,96 @@ type TopicPartitionKey struct {
PartitionKey string
TotalPartition int32
}
<<<<<<< HEAD
=======

// ColumnDataX is like ColumnData, but contains more informations.
//
//msgp:ignore RowChangedEvent
type ColumnDataX struct {
*ColumnData
flag *ColumnFlagType
info *model.ColumnInfo
}

// GetColumnDataX encapsures ColumnData to ColumnDataX.
func GetColumnDataX(col *ColumnData, tb *TableInfo) ColumnDataX {
x := ColumnDataX{ColumnData: col}
if x.ColumnData != nil {
x.flag = tb.ColumnsFlag[col.ColumnID]
x.info = tb.Columns[tb.columnsOffset[col.ColumnID]]
}
return x
}

// GetName returns name.
func (x ColumnDataX) GetName() string {
return x.info.Name.O
}

// GetType returns type.
func (x ColumnDataX) GetType() byte {
return x.info.GetType()
}

// GetCharset returns charset.
func (x ColumnDataX) GetCharset() string {
return x.info.GetCharset()
}

// GetCollation returns collation.
func (x ColumnDataX) GetCollation() string {
return x.info.GetCollate()
}

// GetFlag returns flag.
func (x ColumnDataX) GetFlag() ColumnFlagType {
return *x.flag
}

// GetDefaultValue return default value.
func (x ColumnDataX) GetDefaultValue() interface{} {
return x.info.GetDefaultValue()
}

// GetColumnInfo returns column info.
func (x ColumnDataX) GetColumnInfo() *model.ColumnInfo {
return x.info
}

// Columns2ColumnDataForTest is for tests.
func Columns2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) {
info := &TableInfo{
TableInfo: &model.TableInfo{
Columns: make([]*model.ColumnInfo, len(columns)),
},
ColumnsFlag: make(map[int64]*ColumnFlagType, len(columns)),
columnsOffset: make(map[int64]int),
}
colDatas := make([]*ColumnData, 0, len(columns))

for i, column := range columns {
var columnID int64 = int64(i)
info.columnsOffset[columnID] = i

info.Columns[i] = &model.ColumnInfo{}
info.Columns[i].Name.O = column.Name
info.Columns[i].SetType(column.Type)
info.Columns[i].SetCharset(column.Charset)
info.Columns[i].SetCollate(column.Collation)

info.ColumnsFlag[columnID] = new(ColumnFlagType)
*info.ColumnsFlag[columnID] = column.Flag

colDatas = append(colDatas, &ColumnData{ColumnID: columnID, Value: column.Value})
}

return colDatas, info
}

// Column2ColumnDataXForTest is for tests.
func Column2ColumnDataXForTest(column *Column) ColumnDataX {
datas, info := Columns2ColumnDataForTest([]*Column{column})
return GetColumnDataX(datas[0], info)
}
>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))
4 changes: 4 additions & 0 deletions pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo, outputColumnID bool
if mysql.HasNotNullFlag(col.GetFlag()) {
t.Nullable = "false"
}
<<<<<<< HEAD
t.Default = entry.GetDDLDefaultDefinition(col)
=======
t.Default = col.GetDefaultValue()
>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))

switch col.GetType() {
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration:
Expand Down
Loading

0 comments on commit 192e395

Please sign in to comment.