Skip to content

Commit

Permalink
Update ValidateEndpoints in endpoint calculator
Browse files Browse the repository at this point in the history
* Rename DupCount from CalculateEndpoints to
  endpointsExcludedInCalculation, since we need to take endpoints from
  non-default subnet into account as well.
* We don't need to track these counts separately outside of
  CalculateEndpoints() since we only need them in ValidateEndpoints() to
  understand the total number of endpoint we excluded during
  calculations. The actual numbers only matter when we emit metrics,
  which happends within CalculateEndpoints() in toZoneNetworkEndpointMap().
  • Loading branch information
sawsa307 authored and 08volt committed Jun 3, 2024
1 parent c2cbab3 commit a50c93a
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 15 deletions.
14 changes: 7 additions & 7 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(eds []typ
return subsetMap, podMap, err
}

func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, endpointsExcludedInCalculation int) error {
// this should be a no-op for now
return nil
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(eps []t
return subsetMap, podMap, err
}

func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, endpointsExcludedInCalculation int) error {
// this should be a no-op for now
return nil
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _
if err == nil { // If current calculation ends up in error, we trigger and emit metrics in degraded mode.
l.syncMetricsCollector.UpdateSyncerEPMetrics(l.syncerKey, result.EPCount, result.EPSCount)
}
return result.NetworkEndpointSet, result.EndpointPodMap, result.EPCount[negtypes.Duplicate], err
return result.NetworkEndpointSet, result.EndpointPodMap, result.EPCount[negtypes.Duplicate] + result.EPCount[negtypes.NodeInNonDefaultSubnet], err
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
Expand All @@ -266,12 +266,12 @@ func nodeMapToString(nodeMap map[string][]*v1.Node) string {
//
// For L7 Endpoint Calculator, it returns error if one of the two checks fails:
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// endpiontPodMap removes the duplicated endpoints, and endpointsExcludedInCalculation stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, endpointsExcludedInCalculation int) error {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
countFromPodMap := len(endpointPodMap) + endpointsExcludedInCalculation
if countFromPodMap == 0 {
l.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return fmt.Errorf("%w: Detect endpoint count goes to zero", types.ErrEPCalculationCountZero)
Expand All @@ -287,7 +287,7 @@ func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.Endpoints
}

if countFromEndpointData != countFromPodMap {
l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "endpointsExcludedInCalculation", endpointsExcludedInCalculation)
return fmt.Errorf("%w: Detect endpoint mismatch, count from endpoint slice=%d, count after calculation=%d", types.ErrEPCountsDiffer, countFromEndpointData, countFromPodMap)
}
return nil
Expand Down
228 changes: 224 additions & 4 deletions pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ func TestValidateEndpoints(t *testing.T) {
instance1 := negtypes.TestInstance1
instance2 := negtypes.TestInstance2
duplicatePodName := "pod2-duplicate"
noPodCIDRInstance := negtypes.TestNoPodCIDRInstance
noPodCIDRPod := negtypes.TestNoPodCIDRPod
nonDefaultSubnetInstance := negtypes.TestNonDefaultSubnetInstance
nonDefaultSubnetPod := negtypes.TestNonDefaultSubnetPod
svcPort := negtypes.NegSyncerKey{
Namespace: testServiceNamespace,
Name: testServiceName,
Expand Down Expand Up @@ -338,11 +342,16 @@ func TestValidateEndpoints(t *testing.T) {
},
},
})

nodeLister := testContext.NodeInformer.GetIndexer()
serviceLister := testContext.ServiceInformer.GetIndexer()
zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, false)
zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, true)
zoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, false)
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, svcPort, klog.TODO(), testContext.EnableDualStackNEG, metricscollector.FakeSyncerMetrics())

zoneGetterMSC := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, true)
L7EndpointsCalculatorMSC := NewL7EndpointsCalculator(zoneGetterMSC, podLister, nodeLister, serviceLister, svcPort, klog.TODO(), testContext.EnableDualStackNEG, metricscollector.FakeSyncerMetrics())
L7EndpointsCalculatorMSC.enableMultiSubnetCluster = true
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace), klog.TODO(), &network.NetworkInfo{})
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace), klog.TODO(), &network.NetworkInfo{})

Expand Down Expand Up @@ -390,6 +399,7 @@ func TestValidateEndpoints(t *testing.T) {
testCases := []struct {
desc string
ec negtypes.NetworkEndpointsCalculator
ecMSC negtypes.NetworkEndpointsCalculator
testEndpointSlices []*discovery.EndpointSlice
currentMap map[string]negtypes.NetworkEndpointSet
// Use mutation to inject error into that we cannot trigger currently.
Expand All @@ -399,15 +409,16 @@ func TestValidateEndpoints(t *testing.T) {
{
desc: "ValidateEndpoints for L4 local endpoint calculator", // we are adding this test to make sure the test is updated when the functionality is added
ec: L4LocalEndpointCalculator,
ecMSC: L4LocalEndpointCalculator,
testEndpointSlices: nil, // for now it is a no-op
mutation: noopMutation,
currentMap: nil,
expect: nil,
},
{

desc: "ValidateEndpoints for L4 cluster endpoint calculator", // we are adding this test to make sure the test is updated when the functionality is added
ec: L4ClusterEndpointCalculator,
ecMSC: L4LocalEndpointCalculator,
testEndpointSlices: nil, // for now it is a no-op
mutation: noopMutation,
currentMap: nil,
Expand All @@ -416,14 +427,16 @@ func TestValidateEndpoints(t *testing.T) {
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has no duplicated endpoints",
ec: L7EndpointsCalculator,
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: l7TestEPS,
mutation: noopMutation,
currentMap: nil,
expect: nil,
},
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has one duplicated endpoint",
ec: L7EndpointsCalculator,
desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has one duplicated endpoint",
ec: L7EndpointsCalculator,
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -476,6 +489,7 @@ func TestValidateEndpoints(t *testing.T) {
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts not equal",
ec: L7EndpointsCalculator,
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: l7TestEPS,
mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) {
// Add one additional endpoint in endpointData
Expand All @@ -488,6 +502,7 @@ func TestValidateEndpoints(t *testing.T) {
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. EndpointData has zero endpoint",
ec: L7EndpointsCalculator,
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: l7TestEPS,
mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) {
for i := range endpointData {
Expand All @@ -501,6 +516,7 @@ func TestValidateEndpoints(t *testing.T) {
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. EndpointPodMap has zero endpoint",
ec: L7EndpointsCalculator,
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: l7TestEPS,
mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) {
endpointPodMap = negtypes.EndpointPodMap{}
Expand All @@ -512,6 +528,7 @@ func TestValidateEndpoints(t *testing.T) {
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. EndpointData and endpointPodMap both have zero endpoint",
ec: L7EndpointsCalculator,
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: l7TestEPS,
mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) {
for i := range endpointData {
Expand All @@ -536,6 +553,209 @@ func TestValidateEndpoints(t *testing.T) {
if got := tc.ec.ValidateEndpoints(endpointData, endpointPodMap, endpointsExcludedInCalculation); !errors.Is(got, tc.expect) {
t.Errorf("ValidateEndpoints() = %v, expected %v", got, tc.expect)
}

// Run tests with multi-subnet cluster enabled.
endpointData = negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices)
_, endpointPodMap, endpointsExcludedInCalculation, err = tc.ecMSC.CalculateEndpoints(endpointData, tc.currentMap)
if err != nil {
t.Errorf("With multi-subnet cluster enabled, received error when calculating endpoint: %v", err)
}
endpointData, endpointPodMap = tc.mutation(endpointData, endpointPodMap)
if got := tc.ecMSC.ValidateEndpoints(endpointData, endpointPodMap, endpointsExcludedInCalculation); !errors.Is(got, tc.expect) {
t.Errorf("With multi-subnet cluster enabled, ValidateEndpoints() = %v, expected %v", got, tc.expect)
}
})
}

// Add noPodCIDRPod that corresponds to noPodCIDRInstance.
podLister.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: noPodCIDRPod,
Labels: map[string]string{
discovery.LabelServiceName: testServiceName,
discovery.LabelManagedBy: managedByEPSControllerValue,
},
},
Spec: v1.PodSpec{
NodeName: noPodCIDRInstance,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "10.101.3.1",
PodIPs: []v1.PodIP{
{IP: "10.101.3.1"},
},
},
})
podLister.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: nonDefaultSubnetPod,
Labels: map[string]string{
discovery.LabelServiceName: testServiceName,
discovery.LabelManagedBy: managedByEPSControllerValue,
},
},
Spec: v1.PodSpec{
NodeName: nonDefaultSubnetInstance,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "10.200.1.1",
PodIPs: []v1.PodIP{
{IP: "10.200.1.1"},
},
},
})

podLister.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: nonDefaultSubnetPod,
Labels: map[string]string{
discovery.LabelServiceName: testServiceName,
discovery.LabelManagedBy: managedByEPSControllerValue,
},
},
Spec: v1.PodSpec{
NodeName: nonDefaultSubnetInstance,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "10.200.1.1",
PodIPs: []v1.PodIP{
{IP: "10.200.1.1"},
},
},
})

mscTestCases := []struct {
desc string
ecMSC negtypes.NetworkEndpointsCalculator
testEndpointSlices []*discovery.EndpointSlice
currentMap map[string]negtypes.NetworkEndpointSet
// Use mutation to inject error into that we cannot trigger currently.
mutation func([]negtypes.EndpointsData, negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap)
expect error
}{
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has an endpoint corresponds to node without PodCIDR",
ecMSC: L7EndpointsCalculator,
testEndpointSlices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
Namespace: testServiceNamespace,
Labels: map[string]string{
discovery.LabelServiceName: testServiceName,
},
},
AddressType: "IPv4",
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.100.1.1"},
NodeName: &instance1,
TargetRef: &v1.ObjectReference{
Namespace: testServiceNamespace,
Name: "pod1",
},
},
{
Addresses: []string{"10.100.1.2"},
NodeName: &instance1,
TargetRef: &v1.ObjectReference{
Namespace: testServiceNamespace,
Name: "pod2",
},
},
{
Addresses: []string{"10.101.1.3"},
NodeName: &noPodCIDRInstance,
TargetRef: &v1.ObjectReference{
Namespace: testServiceNamespace,
Name: noPodCIDRPod,
},
},
},
Ports: []discovery.EndpointPort{
{
Name: &emptyNamedPort,
Port: &port80,
Protocol: &protocolTCP,
},
},
},
},
mutation: noopMutation,
currentMap: nil,
expect: nil,
},
{
desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has non-default subnet endpoint",
ecMSC: L7EndpointsCalculatorMSC,
testEndpointSlices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
Namespace: testServiceNamespace,
Labels: map[string]string{
discovery.LabelServiceName: testServiceName,
},
},
AddressType: "IPv4",
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"10.100.1.1"},
NodeName: &instance1,
TargetRef: &v1.ObjectReference{
Namespace: testServiceNamespace,
Name: "pod1",
},
},
{
Addresses: []string{"10.100.1.2"},
NodeName: &instance1,
TargetRef: &v1.ObjectReference{
Namespace: testServiceNamespace,
Name: "pod2",
},
},
{
Addresses: []string{"10.200.1.1"},
NodeName: &nonDefaultSubnetInstance,
TargetRef: &v1.ObjectReference{
Namespace: testServiceNamespace,
Name: nonDefaultSubnetPod,
},
},
},
Ports: []discovery.EndpointPort{
{
Name: &emptyNamedPort,
Port: &port80,
Protocol: &protocolTCP,
},
},
},
},
mutation: noopMutation,
currentMap: nil,
expect: nil,
},
}

for _, tc := range mscTestCases {
t.Run(tc.desc, func(t *testing.T) {
endpointData := negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices)
_, endpointPodMap, endpointsExcludedInCalculation, err := tc.ecMSC.CalculateEndpoints(endpointData, tc.currentMap)
if err != nil {
t.Errorf("With multi-subnet cluster enabled, received error when calculating endpoint: %v", err)
}
endpointData, endpointPodMap = tc.mutation(endpointData, endpointPodMap)
if got := tc.ecMSC.ValidateEndpoints(endpointData, endpointPodMap, endpointsExcludedInCalculation); !errors.Is(got, tc.expect) {
t.Errorf("With multi-subnet cluster enabled, ValidateEndpoints() = %v, expected %v", got, tc.expect)
}
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,12 @@ func (s *transactionSyncer) getEndpointsCalculation(
endpointsData []negtypes.EndpointsData,
currentMap map[string]negtypes.NetworkEndpointSet,
) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
targetMap, endpointPodMap, dupCount, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
targetMap, endpointPodMap, endpointsExcludedInCalculation, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return nil, nil, err
}
if s.enableDegradedMode {
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount)
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, endpointsExcludedInCalculation)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2325,7 +2325,7 @@ func TestValidateEndpointFields(t *testing.T) {
}
}

func TestValidateEndpointFieldsWithMultipleSubnets(t *testing.T) {
func TestValidateEndpointFieldsMultipleSubnets(t *testing.T) {
t.Parallel()

emptyNamedPort := ""
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ type NetworkEndpointsCalculator interface {
// Mode indicates the mode that the EndpointsCalculator is operating in.
Mode() EndpointsCalculatorMode
// ValidateEndpoints validates the NEG endpoint information is correct
ValidateEndpoints(endpointData []EndpointsData, endpointPodMap EndpointPodMap, dupCount int) error
ValidateEndpoints(endpointData []EndpointsData, endpointPodMap EndpointPodMap, endpointsExcludedInCalculation int) error
}

0 comments on commit a50c93a

Please sign in to comment.