Skip to content

Commit

Permalink
aws: Support external pod connectivity for AWS via pod VM network
Browse files Browse the repository at this point in the history
Add basic support for external pod connectivity via pod VM network.
This requires pod VM instance to have two NICs.
When using public IP with multi-NIC instance, we need to use elastic IP
and attach it to the specific network interface. We can't use
AssociatePublicIPAddress as it's meant to be used only with single NIC
instance.  So what we do is create a new network interface, attach it to
the instance and associate an Elastic IP with this network interface

Signed-off-by: Pradipta Banerjee <[email protected]>
  • Loading branch information
bpradipt committed Feb 4, 2025
1 parent 0b9d96a commit 9585fb1
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 8 deletions.
15 changes: 8 additions & 7 deletions src/cloud-api-adaptor/pkg/adaptor/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,25 +226,26 @@ func (s *cloudService) CreateVM(ctx context.Context, req *pb.CreateVMRequest) (r
// Get Pod VM image from annotations
image := util.GetImageFromAnnotation(req.Annotations)

netNSPath := req.NetworkNamespacePath

podNetworkConfig, err := s.workerNode.Inspect(netNSPath)
if err != nil {
return nil, fmt.Errorf("failed to inspect netns %s: %w", netNSPath, err)
}

// Pod VM spec
vmSpec := provider.InstanceTypeSpec{
InstanceType: instanceType,
VCPUs: vcpus,
Memory: memory,
GPUs: gpus,
Image: image,
MultiNic: podNetworkConfig.ExternalNetViaPodVM,
}

// TODO: server name is also generated in each cloud provider, and possibly inconsistent
serverName := putil.GenerateInstanceName(pod, string(sid), 63)

netNSPath := req.NetworkNamespacePath

podNetworkConfig, err := s.workerNode.Inspect(netNSPath)
if err != nil {
return nil, fmt.Errorf("failed to inspect netns %s: %w", netNSPath, err)
}

podDir := filepath.Join(s.serverConfig.PodsDir, string(sid))
if err := os.MkdirAll(podDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating a pod directory: %s, %w", podDir, err)
Expand Down
236 changes: 235 additions & 1 deletion src/cloud-providers/aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@ type ec2Client interface {
DescribeImages(ctx context.Context,
params *ec2.DescribeImagesInput,
optFns ...func(*ec2.Options)) (*ec2.DescribeImagesOutput, error)
// Add AllocateAddress method
AllocateAddress(ctx context.Context,
params *ec2.AllocateAddressInput,
optFns ...func(*ec2.Options)) (*ec2.AllocateAddressOutput, error)
AssociateAddress(ctx context.Context,
params *ec2.AssociateAddressInput,
optFns ...func(*ec2.Options)) (*ec2.AssociateAddressOutput, error)
DescribeAddresses(ctx context.Context,
params *ec2.DescribeAddressesInput,
optFns ...func(*ec2.Options)) (*ec2.DescribeAddressesOutput, error)
ReleaseAddress(ctx context.Context,
params *ec2.ReleaseAddressInput,
optFns ...func(*ec2.Options)) (*ec2.ReleaseAddressOutput, error)
DisassociateAddress(ctx context.Context,
params *ec2.DisassociateAddressInput,
optFns ...func(*ec2.Options)) (*ec2.DisassociateAddressOutput, error)
CreateNetworkInterface(ctx context.Context,
params *ec2.CreateNetworkInterfaceInput,
optFns ...func(*ec2.Options)) (*ec2.CreateNetworkInterfaceOutput, error)
AttachNetworkInterface(ctx context.Context,
params *ec2.AttachNetworkInterfaceInput,
optFns ...func(*ec2.Options)) (*ec2.AttachNetworkInterfaceOutput, error)
DeleteNetworkInterface(ctx context.Context,
params *ec2.DeleteNetworkInterfaceInput,
optFns ...func(*ec2.Options)) (*ec2.DeleteNetworkInterfaceOutput, error)
ModifyNetworkInterfaceAttribute(ctx context.Context,
params *ec2.ModifyNetworkInterfaceAttributeInput,
optFns ...func(*ec2.Options)) (*ec2.ModifyNetworkInterfaceAttributeOutput, error)
}

// Make instanceRunningWaiter as an interface
Expand Down Expand Up @@ -224,7 +252,6 @@ func (p *awsProvider) CreateInstance(ctx context.Context, podName, sandboxID str
input.KeyName = aws.String(p.serviceConfig.KeyName)
}

// Auto assign public IP address if UsePublicIP is set
if p.serviceConfig.UsePublicIP {
// Auto-assign public IP
input.NetworkInterfaces = []types.InstanceNetworkInterfaceSpecification{
Expand Down Expand Up @@ -303,6 +330,20 @@ func (p *awsProvider) CreateInstance(ctx context.Context, podName, sandboxID str

}

if spec.MultiNic {
nIfaceId, err := p.createAddonNICforInstance(ctx, instanceID)
if err != nil {
return nil, err
}
// If public IP is set, then create an ElasticIP and associate it with this secondary interface
if p.serviceConfig.UsePublicIP {
err = p.createElasticIPforInstance(ctx, instanceID, nIfaceId)
if err != nil {
return nil, err
}
}
}

instance := &provider.Instance{
ID: instanceID,
Name: instanceName,
Expand All @@ -313,6 +354,12 @@ func (p *awsProvider) CreateInstance(ctx context.Context, podName, sandboxID str
}

func (p *awsProvider) DeleteInstance(ctx context.Context, instanceID string) error {

err := p.deleteElasticIPforInstance(ctx, instanceID)
if err != nil {
logger.Printf("failed to deallocate the Elastic IP address: %v", err)
}

terminateInput := &ec2.TerminateInstancesInput{
InstanceIds: []string{
instanceID,
Expand Down Expand Up @@ -464,6 +511,193 @@ func (p *awsProvider) getPublicIP(ctx context.Context, instanceID string) (netip
return publicIPAddr, nil
}

// Create a NIC and attach it to the instance
func (p *awsProvider) createAddonNICforInstance(ctx context.Context, instanceID string) (nIfaceId *string, err error) {
// Create network interface
// Add create network interface input
nicName := fmt.Sprintf("nic-%s", instanceID)
createNetworkInterfaceInput := &ec2.CreateNetworkInterfaceInput{
SubnetId: aws.String(p.serviceConfig.SubnetId),
Groups: p.serviceConfig.SecurityGroupIds,

TagSpecifications: []types.TagSpecification{
{
ResourceType: types.ResourceTypeNetworkInterface,
Tags: []types.Tag{
{
Key: aws.String("Name"),
Value: aws.String(nicName),
},
},
},
},
}

nic, err := p.ec2Client.CreateNetworkInterface(ctx, createNetworkInterfaceInput)
if err != nil {
return nil, fmt.Errorf("failed to create a network interface: %v", err)
}

nIfaceId = nic.NetworkInterface.NetworkInterfaceId

// Wait for instance to be ready before attaching the network interface
describeInstanceInput := &ec2.DescribeInstancesInput{
InstanceIds: []string{instanceID},
}
err = p.waiter.Wait(ctx, describeInstanceInput, maxWaitTime)
if err != nil {
logger.Printf("failed to wait for the instance to be ready : %v ", err)
return nil, err

}

// Attach network interface
attachNetworkInterfaceInput := &ec2.AttachNetworkInterfaceInput{
InstanceId: aws.String(instanceID),
NetworkInterfaceId: nIfaceId,
DeviceIndex: aws.Int32(1),
}

nicAttachOp, err := p.ec2Client.AttachNetworkInterface(ctx, attachNetworkInterfaceInput)
if err != nil {
_, nicDelErr := p.ec2Client.DeleteNetworkInterface(ctx, &ec2.DeleteNetworkInterfaceInput{
NetworkInterfaceId: nIfaceId,
})
if nicDelErr != nil {
logger.Printf("failed to delete the network interface: %v", nicDelErr)
}

return nil, fmt.Errorf("failed to attach a network interface: %v", err)
}

if nicAttachOp.AttachmentId == nil {
logger.Printf("AttachmentId is nil. This will prevent deletion of the network interface")
return nil, fmt.Errorf("AttachmentId is nil")
}

// Set Delete on termination to true
_, err = p.ec2Client.ModifyNetworkInterfaceAttribute(ctx, &ec2.ModifyNetworkInterfaceAttributeInput{
Attachment: &types.NetworkInterfaceAttachmentChanges{
AttachmentId: nicAttachOp.AttachmentId,
DeleteOnTermination: aws.Bool(true),
},
NetworkInterfaceId: nIfaceId,
})
if err != nil {
return nil, fmt.Errorf("failed to modify the network interface attribute: %v", err)
}

logger.Printf("created a network interface %s and attached it to the instance %s", *nIfaceId, instanceID)

return nIfaceId, nil
}

// Create Elastic IP and attach it to the interface
func (p *awsProvider) createElasticIPforInstance(ctx context.Context, instanceID string, nIfaceId *string) error {
eipName := fmt.Sprintf("eip-%s", instanceID)

// Create Elastic IP. Allocate from AWS pool
allocateAddressInput := &ec2.AllocateAddressInput{
Domain: types.DomainTypeVpc,
TagSpecifications: []types.TagSpecification{
{
ResourceType: types.ResourceTypeElasticIp,
Tags: []types.Tag{
{
Key: aws.String("Name"),
Value: aws.String(eipName),
},
},
},
},
}

eip, err := p.ec2Client.AllocateAddress(ctx, allocateAddressInput)
if err != nil {
return fmt.Errorf("failed to allocate an Elastic IP address: %v", err)
}

// Wait for instance to be ready before associating the Elastic IP address
describeInstanceInput := &ec2.DescribeInstancesInput{
InstanceIds: []string{instanceID},
}
err = p.waiter.Wait(ctx, describeInstanceInput, maxWaitTime)
if err != nil {
logger.Printf("failed to wait for the instance to be ready : %v ", err)
return err
}

// Associate the Elastic IP with the instance
_, err = p.ec2Client.AssociateAddress(ctx, &ec2.AssociateAddressInput{
AllocationId: eip.AllocationId,
AllowReassociation: aws.Bool(true),
NetworkInterfaceId: nIfaceId,
})
if err != nil {
// Release the Elastic IP address
_, relErr := p.ec2Client.ReleaseAddress(ctx, &ec2.ReleaseAddressInput{
AllocationId: eip.AllocationId,
})
if relErr != nil {
logger.Printf("failed to release the Elastic IP address: %v", relErr)
}
return fmt.Errorf("failed to associate an Elastic IP address with the instance: %v", err)
}

logger.Printf("associated the Elastic IP address: %s with the instance: %s", *eip.PublicIp, instanceID)

return nil
}

func (p *awsProvider) deleteElasticIPforInstance(ctx context.Context, instanceID string) error {

describeAddressInput := &ec2.DescribeAddressesInput{
Filters: []types.Filter{
{
Name: aws.String("instance-id"),
Values: []string{instanceID},
},
},
}

// Describe addresses to find the Elastic IP
describeAddressesOutput, err := p.ec2Client.DescribeAddresses(ctx, describeAddressInput)
if err != nil {
return fmt.Errorf("failed to describe the Elastic IP addresses: %v for instance: %s", err, instanceID)
}

if len(describeAddressesOutput.Addresses) == 0 {
logger.Printf("No Elastic IP addresses found for instance: %s", instanceID)
return nil
}

// Find the Elastic IP associated with the given network interface and delete it
for _, addr := range describeAddressesOutput.Addresses {
//if addr.NetworkInterfaceId != nil && *addr.NetworkInterfaceId == *nIfaceId {
if addr.InstanceId != nil && *addr.InstanceId == instanceID {

// Disassociate the Elastic IP address
_, err = p.ec2Client.DisassociateAddress(ctx, &ec2.DisassociateAddressInput{
AssociationId: addr.AssociationId,
})
if err != nil {
return fmt.Errorf("failed to disassociate the Elastic IP address: %v", err)
}

// Release the Elastic IP address
_, err = p.ec2Client.ReleaseAddress(ctx, &ec2.ReleaseAddressInput{
AllocationId: addr.AllocationId,
})
if err != nil {
return fmt.Errorf("failed to release the Elastic IP address: %v", err)
}
logger.Printf("released the Elastic IP address: %s for instance: %s", *addr.PublicIp, instanceID)
}
}

return nil
}

func (p *awsProvider) getDeviceNameAndSize(imageID string) (string, int32, error) {
// Add describe images input
describeImagesInput := &ec2.DescribeImagesInput{
Expand Down
72 changes: 72 additions & 0 deletions src/cloud-providers/aws/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,78 @@ func (m mockEC2Client) DescribeInstances(ctx context.Context,
}, nil
}

// Create a mock for EC2 AllocateAddress method
func (m mockEC2Client) AllocateAddress(ctx context.Context,
params *ec2.AllocateAddressInput,
optFns ...func(*ec2.Options)) (*ec2.AllocateAddressOutput, error) {

return nil, nil
}

// Create a mock for EC2 AssociateAddress method
func (m mockEC2Client) AssociateAddress(ctx context.Context,
params *ec2.AssociateAddressInput,
optFns ...func(*ec2.Options)) (*ec2.AssociateAddressOutput, error) {

return nil, nil
}

// Create a mock for EC2 DescribeAddresses method
func (m mockEC2Client) DescribeAddresses(ctx context.Context,
params *ec2.DescribeAddressesInput,
optFns ...func(*ec2.Options)) (*ec2.DescribeAddressesOutput, error) {

return nil, nil
}

// Create a mock for EC2 DisassociateAddress method
func (m mockEC2Client) DisassociateAddress(ctx context.Context,
params *ec2.DisassociateAddressInput,
optFns ...func(*ec2.Options)) (*ec2.DisassociateAddressOutput, error) {

return nil, nil
}

// Create a mock for EC2 ReleaseAddress method
func (m mockEC2Client) ReleaseAddress(ctx context.Context,
params *ec2.ReleaseAddressInput,
optFns ...func(*ec2.Options)) (*ec2.ReleaseAddressOutput, error) {

return nil, nil
}

// Create a mock for EC2 CreateNetworkInterface method
func (m mockEC2Client) CreateNetworkInterface(ctx context.Context,
params *ec2.CreateNetworkInterfaceInput,
optFns ...func(*ec2.Options)) (*ec2.CreateNetworkInterfaceOutput, error) {

return nil, nil
}

// Create a mock for EC2 AttachNetworkInterface method
func (m mockEC2Client) AttachNetworkInterface(ctx context.Context,
params *ec2.AttachNetworkInterfaceInput,
optFns ...func(*ec2.Options)) (*ec2.AttachNetworkInterfaceOutput, error) {

return nil, nil
}

// Create a mock for EC2 DeleteNetworkInterface method
func (m mockEC2Client) DeleteNetworkInterface(ctx context.Context,
params *ec2.DeleteNetworkInterfaceInput,
optFns ...func(*ec2.Options)) (*ec2.DeleteNetworkInterfaceOutput, error) {

return nil, nil
}

// Create a mock for EC2 ModifyNetworkInterfaceAttribute method
func (m mockEC2Client) ModifyNetworkInterfaceAttribute(ctx context.Context,
params *ec2.ModifyNetworkInterfaceAttributeInput,
optFns ...func(*ec2.Options)) (*ec2.ModifyNetworkInterfaceAttributeOutput, error) {

return nil, nil
}

// Mock instanceRunningWaiter
type MockAWSInstanceWaiter struct{}

Expand Down
1 change: 1 addition & 0 deletions src/cloud-providers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ type InstanceTypeSpec struct {
Arch string
GPUs int64
Image string
MultiNic bool
}

0 comments on commit 9585fb1

Please sign in to comment.