Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into vschema-ddl-table-esc…
Browse files Browse the repository at this point in the history
…aping

Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Mar 28, 2023
2 parents 7e23ac0 + c96c30d commit 16fa7ec
Show file tree
Hide file tree
Showing 106 changed files with 60,509 additions and 55,713 deletions.
122 changes: 122 additions & 0 deletions go/cmd/vtctldclient/command/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ package command

import (
"fmt"
"sort"
"strings"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/topo/topoproto"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand All @@ -35,6 +41,59 @@ var (
Args: cobra.ExactArgs(1),
RunE: commandGetWorkflows,
}

// Workflow is a parent command for Workflow* sub commands.
Workflow = &cobra.Command{
Use: "workflow",
Short: "Administer VReplication workflows (Reshard, MoveTables, etc) in the given keyspace",
DisableFlagsInUseLine: true,
Aliases: []string{"Workflow"},
Args: cobra.ExactArgs(1),
RunE: commandGetWorkflows,
}

// WorkflowUpdate makes a WorkflowUpdate gRPC call to a vtctld.
WorkflowUpdate = &cobra.Command{
Use: "update",
Short: "Update the configuration parameters for a VReplication workflow",
Example: `vtctldclient --server=localhost:15999 workflow --keyspace=customer update --workflow=commerce2customer --cells "zone1" --cells "zone2" -c "zone3,zone4" -c "zone5"`,
DisableFlagsInUseLine: true,
Aliases: []string{"Update"},
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
changes := false
if cmd.Flags().Lookup("cells").Changed { // Validate the provided value(s)
changes = true
for i, cell := range workflowUpdateOptions.Cells { // Which only means trimming whitespace
workflowUpdateOptions.Cells[i] = strings.TrimSpace(cell)
}
} else {
workflowUpdateOptions.Cells = textutil.SimulatedNullStringSlice
}
if cmd.Flags().Lookup("tablet-types").Changed { // Validate the provided value(s)
changes = true
for i, tabletType := range workflowUpdateOptions.TabletTypes {
workflowUpdateOptions.TabletTypes[i] = strings.ToUpper(strings.TrimSpace(tabletType))
if _, err := topoproto.ParseTabletType(workflowUpdateOptions.TabletTypes[i]); err != nil {
return err
}
}
} else {
workflowUpdateOptions.TabletTypes = textutil.SimulatedNullStringSlice
}
if cmd.Flags().Lookup("on-ddl").Changed { // Validate the provided value
changes = true
if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(workflowUpdateOptions.OnDDL)]; !ok {
return fmt.Errorf("invalid on-ddl value: %s", workflowUpdateOptions.OnDDL)
}
} // Simulated NULL will need to be handled in command
if !changes {
return fmt.Errorf("no configuration options specified to update")
}
return nil
},
RunE: commandWorkflowUpdate,
}
)

var getWorkflowsOptions = struct {
Expand Down Expand Up @@ -65,7 +124,70 @@ func commandGetWorkflows(cmd *cobra.Command, args []string) error {
return nil
}

var (
workflowOptions = struct {
Keyspace string
}{}
workflowUpdateOptions = struct {
Workflow string
Cells []string
TabletTypes []string
OnDDL string
}{}
)

func commandWorkflowUpdate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

// We've already validated any provided value, if one WAS provided.
// Now we need to do the mapping from the string representation to
// the enum value.
onddl := int32(textutil.SimulatedNullInt) // Simulated NULL when no value provided
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(workflowUpdateOptions.OnDDL)]; ok {
onddl = val
}

req := &vtctldatapb.WorkflowUpdateRequest{
Keyspace: workflowOptions.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVRWorkflowRequest{
Workflow: workflowUpdateOptions.Workflow,
Cells: workflowUpdateOptions.Cells,
TabletTypes: workflowUpdateOptions.TabletTypes,
OnDdl: binlogdatapb.OnDDLAction(onddl),
},
}

resp, err := client.WorkflowUpdate(commandCtx, req)
if err != nil {
return err
}

// Sort the inner TabletInfo slice for deterministic output.
sort.Slice(resp.Details, func(i, j int) bool {
return resp.Details[i].Tablet < resp.Details[j].Tablet
})

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)

return nil
}

func init() {
GetWorkflows.Flags().BoolVarP(&getWorkflowsOptions.ShowAll, "show-all", "a", false, "Show all workflows instead of just active workflows.")
Root.AddCommand(GetWorkflows)

Workflow.PersistentFlags().StringVarP(&workflowOptions.Keyspace, "keyspace", "k", "", "Keyspace context for the workflow (required)")
Workflow.MarkPersistentFlagRequired("keyspace")
Root.AddCommand(Workflow)
WorkflowUpdate.Flags().StringVarP(&workflowUpdateOptions.Workflow, "workflow", "w", "", "The workflow you want to update (required)")
WorkflowUpdate.MarkFlagRequired("workflow")
WorkflowUpdate.Flags().StringSliceVarP(&workflowUpdateOptions.Cells, "cells", "c", nil, "New Cell(s) or CellAlias(es) (comma-separated) to replicate from")
WorkflowUpdate.Flags().StringSliceVarP(&workflowUpdateOptions.TabletTypes, "tablet-types", "t", nil, "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY)")
WorkflowUpdate.Flags().StringVar(&workflowUpdateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE")
Workflow.AddCommand(WorkflowUpdate)
}
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Available Commands:
ValidateVersionShard Validates that the version on the primary matches all of the replicas.
completion Generate the autocompletion script for the specified shell
help Help about any command
workflow Administer VReplication workflows (Reshard, MoveTables, etc) in the given keyspace

Flags:
--action_timeout duration timeout for the total command (default 1h0m0s)
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,8 @@ func (c *Conn) handleComPrepare(handler Handler, data []byte) (kontinue bool) {
paramsCount := uint16(0)
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
switch node := node.(type) {
case sqlparser.Argument:
if strings.HasPrefix(string(node), "v") {
case *sqlparser.Argument:
if strings.HasPrefix(node.Name, "v") {
paramsCount++
}
}
Expand Down
27 changes: 27 additions & 0 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,27 @@ func tstWorkflowComplete(t *testing.T) error {
return tstWorkflowAction(t, workflowActionComplete, "", "")
}

// testWorkflowUpdate is a very simple test of the workflow update
// vtctlclient/vtctldclient command.
// It performs a non-behavior impacting update, setting tablet-types
// to primary,replica,rdonly (the only applicable types in these tests).
func testWorkflowUpdate(t *testing.T) {
tabletTypes := "primary,replica,rdonly"
// Test vtctlclient first
_, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, "noexist.noexist", "update")
require.Error(t, err, err)
resp, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, ksWorkflow, "update")
require.NoError(t, err)
require.NotEmpty(t, resp)

// Test vtctldclient last
_, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", "noexist", "update", "--workflow", "noexist", "--tablet-types", tabletTypes)
require.Error(t, err)
resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", tabletTypes)
require.NoError(t, err, err)
require.NotEmpty(t, resp)
}

func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}
Expand Down Expand Up @@ -391,6 +412,9 @@ func testReshardV2Workflow(t *testing.T) {
verifyNoInternalTables(t, vtgateConn, targetKs+"/-40")
verifyNoInternalTables(t, vtgateConn, targetKs+"/c0-")

// Confirm that updating Reshard workflows works.
testWorkflowUpdate(t)

testRestOfWorkflow(t)
}

Expand All @@ -414,6 +438,9 @@ func testMoveTablesV2Workflow(t *testing.T) {

testReplicatingWithPKEnumCols(t)

// Confirm that updating MoveTable workflows works.
testWorkflowUpdate(t)

testRestOfWorkflow(t)

listAllArgs := []string{"workflow", "customer", "listall"}
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
query := "select cid from customer"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", query, query))
insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')"
matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)"
matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1 /* INT64 */, :vtg2 /* VARCHAR */)"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))

// confirm that the backticking of table names in the routing rules works
Expand Down Expand Up @@ -791,7 +791,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
ksShards := []string{"product/0", "customer/-80", "customer/80-"}
printShardPositions(vc, ksShards)
insertQuery2 := "insert into customer(name, cid) values('tempCustomer2', 100)"
matchInsertQuery2 := "insert into customer(`name`, cid) values (:vtg1, :_cid0)"
matchInsertQuery2 := "insert into customer(`name`, cid) values (:vtg1 /* VARCHAR */, :_cid0)"
require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2))

insertQuery2 = "insert into customer(name, cid) values('tempCustomer3', 101)" // ID 101, hence due to reverse_bits in shard 80-
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vtgate/queries/normalize/normalize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNormalizeAllFields(t *testing.T) {
defer conn.Close()

insertQuery := `insert into t1 values (1, "chars", "variable chars", x'73757265', 0x676F, 0.33, 9.99, 1, "1976-06-08", "small", "b", "{\"key\":\"value\"}", point(1,5), b'011', 0b0101)`
normalizedInsertQuery := `insert into t1 values (:vtg1, :vtg2, :vtg3, :vtg4, :vtg5, :vtg6, :vtg7, :vtg8, :vtg9, :vtg10, :vtg11, :vtg12, point(:vtg13, :vtg14), :vtg15, :vtg16)`
normalizedInsertQuery := `insert into t1 values (:vtg1 /* INT64 */, :vtg2 /* VARCHAR */, :vtg3 /* VARCHAR */, :vtg4 /* HEXVAL */, :vtg5 /* HEXNUM */, :vtg6 /* DECIMAL */, :vtg7 /* DECIMAL */, :vtg8 /* INT64 */, :vtg9 /* VARCHAR */, :vtg10 /* VARCHAR */, :vtg11 /* VARCHAR */, :vtg12 /* VARCHAR */, point(:vtg13 /* INT64 */, :vtg14 /* INT64 */), :vtg15 /* HEXNUM */, :vtg16 /* HEXNUM */)`
selectQuery := "select * from t1"
utils.Exec(t, conn, insertQuery)
qr := utils.Exec(t, conn, selectQuery)
Expand All @@ -56,7 +56,7 @@ func TestNormalizeAllFields(t *testing.T) {
break
}
}
assert.True(t, found, "correctly normalized record not found in planner cache")
assert.Truef(t, found, "correctly normalized record not found in planner cache %v", results)
}

func getPlanCache(vtgateHostPort string) ([]map[string]any, error) {
Expand Down
32 changes: 31 additions & 1 deletion go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
"net/url"
"regexp"
"strings"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/binlogdata"
)

var (
delimitedListRegexp = regexp.MustCompile(`[ ,;]+`)
delimitedListRegexp = regexp.MustCompile(`[ ,;]+`)
SimulatedNullString = sqltypes.NULL.String()
SimulatedNullStringSlice = []string{sqltypes.NULL.String()}
SimulatedNullInt = -1
)

// SplitDelimitedList splits a given string by comma, semi-colon or space, and returns non-empty strings
Expand Down Expand Up @@ -73,3 +79,27 @@ func SingleWordCamel(w string) string {
}
return strings.ToUpper(w[0:1]) + strings.ToLower(w[1:])
}

// ValueIsSimulatedNull returns true if the value represents
// a NULL or unknown/unspecified value. This is used to
// distinguish between a zero value / default and a user
// provided value that is equivalent (e.g. an empty string
// or slice).
func ValueIsSimulatedNull(val any) bool {
switch cval := val.(type) {
case string:
return cval == SimulatedNullString
case []string:
return len(cval) == 1 && cval[0] == sqltypes.NULL.String()
case binlogdata.OnDDLAction:
return int32(cval) == int32(SimulatedNullInt)
case int:
return cval == SimulatedNullInt
case int32:
return int32(cval) == int32(SimulatedNullInt)
case int64:
return int64(cval) == int64(SimulatedNullInt)
default:
return false
}
}
11 changes: 7 additions & 4 deletions go/tools/codegen/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ func CheckErrors(loaded []*packages.Package, skip func(fileName string) bool) er
var errors []string
for _, l := range loaded {
for _, e := range l.Errors {
idx := strings.Index(e.Pos, ":")
filePath := e.Pos[:idx]
_, fileName := path.Split(filePath)
if !skip(fileName) {
if idx := strings.Index(e.Pos, ":"); idx >= 0 {
filePath := e.Pos[:idx]
_, fileName := path.Split(filePath)
if !skip(fileName) {
errors = append(errors, e.Error())
}
} else {
errors = append(errors, e.Error())
}
}
Expand Down
Loading

0 comments on commit 16fa7ec

Please sign in to comment.