Skip to content

Commit

Permalink
Some improvements to the operator and substrate (#206)
Browse files Browse the repository at this point in the history
* update CNI and karpenter version, update local path for cluster config

* Update etcd mig example config

* minor fix for logging format
  • Loading branch information
prateekgogia authored May 13, 2022
1 parent ede4336 commit 3611f9b
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 35 deletions.
59 changes: 56 additions & 3 deletions operator/docs/examples/etcd-with-mixed-instances.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
## Create hollow pods to pre-provision mixed instances for etcd pods
```bash
ZONES=("a" "b" "c")
SIZES=("m5.large" "t3.large" "c5.2xlarge")
CONTROL_PLANE=foo
SIZES=("t3.medium" "t3.medium" "t2.medium")
CONTROL_PLANE=foo
for i in {1..3}; do
cat <<EOF | kubectl apply -f -
apiVersion: v1
Expand Down Expand Up @@ -34,6 +34,59 @@ apiVersion: kit.k8s.sh/v1alpha1
kind: ControlPlane
metadata:
name: ${CONTROL_PLANE} # Desired Cluster name
namespace: guest
spec:
kubernetesVersion: "1.21"
master:
apiServer:
replicas: 2
spec:
containers:
- name: apiserver
args:
- --max-requests-inflight=400
- --max-mutating-requests-inflight=200
controllerManager:
spec:
containers:
- name: controller-manager
args:
- --controllers=*
- --kube-api-qps=300
- --kube-api-burst=400
scheduler:
spec:
containers:
- name: scheduler
args:
- --kube-api-qps=300
- --kube-api-burst=400
etcd:
spec:
containers:
- name: etcd
resources:
requests:
memory: 2Gi
EOF
```

# Create Dataplane nodes for the guest cluster provisioned
```bash
CONTROL_PLANE=foo
cat <<EOF | kubectl apply -f -
apiVersion: kit.k8s.sh/v1alpha1
kind: DataPlane
metadata:
name: ${CONTROL_PLANE}-nodes
spec:
clusterName: ${CONTROL_PLANE} # Desired Cluster Name
nodeCount: 10
subnetSelector:
kit.aws/substrate: ${MANAGEMENT_CLUSTER_NAME}
instanceTypes:
- c4.xlarge
- c5.xlarge
- c4.4xlarge
- c5.4xlarge
EOF
```
4 changes: 3 additions & 1 deletion operator/pkg/controllers/master/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func kubeAPIServerCertConfig(hostname string, nn types.NamespacedName) *secrets.
CommonName: "kube-apiserver",
AltNames: certutil.AltNames{
DNSNames: []string{hostname, "localhost", "kubernetes", "kubernetes.default",
"kubernetes.default.svc", "kubernetes.default.svc.cluster.local"},
"kubernetes.default.svc", "kubernetes.default.svc.cluster.local",
fmt.Sprintf("%s-cp.%s.svc.cluster.local", nn.Name, nn.Namespace),
},
IPs: []net.IP{net.IPv4(127, 0, 0, 1), apiServerVirtualIP()},
},
},
Expand Down
2 changes: 2 additions & 0 deletions operator/pkg/utils/imageprovider/imageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
"1.19": kubeVersion119Tag,
"1.20": kubeVersion120Tag,
"1.21": kubeVersion121Tag,
"1.22": kubeVersion122Tag,
}
)

Expand All @@ -31,6 +32,7 @@ const (
kubeVersion119Tag = "v1.19.13-eks-1-19-9"
kubeVersion120Tag = "v1.20.7-eks-1-20-6"
kubeVersion121Tag = "v1.21.2-eks-1-21-4"
kubeVersion122Tag = "v1.22.6-eks-1-22-5"
repositoryName = "public.ecr.aws/eks-distro/"
busyBoxImage = "public.ecr.aws/docker/library/busybox:stable"
)
Expand Down
3 changes: 3 additions & 0 deletions substrate/cmd/kitctl/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"fmt"
"io"
"math/rand"
"os"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand All @@ -36,6 +38,7 @@ func main() {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rand.Seed(time.Now().UnixNano())
logger := zap.New(zapcore.NewCore(zapcore.NewConsoleEncoder(zapcore.EncoderConfig{MessageKey: "message"}),
customLogWriteTo(ctx, os.Stdout), zap.LevelEnablerFunc(func(level zapcore.Level) bool {
return level >= logLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (a *AWSVPCCNI) Create(ctx context.Context, substrate *v1alpha1.Substrate) (
Namespace: "kube-system",
Name: "aws-vpc-cni",
Repository: "https://aws.github.io/eks-charts",
Version: "1.1.13",
Version: "1.1.16",
}); err != nil {
return reconcile.Result{}, fmt.Errorf("applying chart, %w", err)
}
Expand Down
10 changes: 4 additions & 6 deletions substrate/pkg/controller/substrate/cluster/addons/karpenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (k *Karpenter) Create(ctx context.Context, substrate *v1alpha1.Substrate) (
Namespace: "karpenter",
Name: "karpenter",
Repository: "https://charts.karpenter.sh",
Version: "0.7.3",
Version: "0.9.0",
CreateNamespace: true,
Values: map[string]interface{}{
"clusterName": substrate.Name,
Expand All @@ -90,13 +90,11 @@ func (k *Karpenter) Create(ctx context.Context, substrate *v1alpha1.Substrate) (
if err != nil {
return reconcile.Result{}, fmt.Errorf("initializing client, %w", err)
}
subnets := append(substrate.Status.Infrastructure.PublicSubnetIDs, substrate.Status.Infrastructure.PrivateSubnetIDs...)
// Tag EC2 Resources
if _, err := k.EC2.CreateTagsWithContext(ctx, &ec2.CreateTagsInput{
Resources: aws.StringSlice(append(
substrate.Status.Infrastructure.PublicSubnetIDs,
aws.StringValue(substrate.Status.Infrastructure.SecurityGroupID),
)),
Tags: []*ec2.Tag{{Key: aws.String("karpenter.sh/discovery"), Value: aws.String(substrate.Name)}},
Resources: aws.StringSlice(append(subnets, aws.StringValue(substrate.Status.Infrastructure.SecurityGroupID))),
Tags: []*ec2.Tag{{Key: aws.String("karpenter.sh/discovery"), Value: aws.String(substrate.Name)}},
}); err != nil {
return reconcile.Result{}, fmt.Errorf("tagging resources, %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ rules:
resources: ["controlplanes", "dataplanes"]
verbs: ["get", "list", "watch", "create", "update", "delete", "patch"]
- apiGroups: [""]
resources: ["serviceaccounts", "secrets", "namespaces", "nodes"]
resources: ["serviceaccounts", "secrets", "namespaces", "nodes", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create", "update", "delete", "patch"]
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
Expand Down
49 changes: 35 additions & 14 deletions substrate/pkg/controller/substrate/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
)

const (
ClusterCertsBasePath = "/tmp/"
kubeconfigPath = "/etc/kubernetes"
kubeconfigFile = "etc/kubernetes/admin.conf"
certPKIPath = "/etc/kubernetes/pki"
Expand All @@ -66,15 +65,21 @@ const (
)

type Config struct {
S3 *s3.S3
STS *sts.STS
S3Uploader *s3manager.Uploader
S3 *s3.S3
STS *sts.STS
S3Uploader *s3manager.Uploader
clusterConfigPath string
}

func (c *Config) Create(ctx context.Context, substrate *v1alpha1.Substrate) (reconcile.Result, error) {
if substrate.Status.Cluster.APIServerAddress == nil {
return reconcile.Result{Requeue: true}, nil
}
if c.clusterConfigPath == "" {
if err := c.ensureKitEnvDir(); err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring kit env dir, %w", err)
}
}
// ensure S3 bucket
if err := c.ensureBucket(ctx, substrate); err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring S3 bucket, %w", err)
Expand Down Expand Up @@ -102,11 +107,11 @@ func (c *Config) Create(ctx context.Context, substrate *v1alpha1.Substrate) (rec
}
// upload to s3 bucket
if err := c.S3Uploader.UploadWithIterator(ctx, NewDirectoryIterator(
aws.StringValue(discovery.Name(substrate)), path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate))))); err != nil {
aws.StringValue(discovery.Name(substrate)), path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate))))); err != nil {
return reconcile.Result{}, fmt.Errorf("uploading to S3 %w", err)
}
logging.FromContext(ctx).Debugf("Uploaded cluster configuration to s3://%s", aws.StringValue(discovery.Name(substrate)))
substrate.Status.Cluster.KubeConfig = ptr.String(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), kubeconfigFile))
substrate.Status.Cluster.KubeConfig = ptr.String(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), kubeconfigFile))
return reconcile.Result{}, nil
}

Expand All @@ -124,7 +129,7 @@ func (c *Config) Delete(ctx context.Context, substrate *v1alpha1.Substrate) (rec
} else {
logging.FromContext(ctx).Infof("Deleted S3 bucket %s", aws.StringValue(discovery.Name(substrate)))
}
return reconcile.Result{}, os.RemoveAll(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate))))
return reconcile.Result{}, os.RemoveAll(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate))))
}

func ErrNoSuchBucket(err error) bool {
Expand All @@ -137,7 +142,7 @@ func ErrNoSuchBucket(err error) bool {
}

func (c *Config) generateCerts(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
cfg.CertificatesDir = path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), certPKIPath)
cfg.CertificatesDir = path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), certPKIPath)
certTree, err := certs.GetDefaultCertList().AsMap().CertTree()
if err != nil {
return err
Expand All @@ -151,7 +156,7 @@ func (c *Config) generateCerts(cfg *kubeadm.InitConfiguration, substrate *v1alph

func (c *Config) kubeConfigs(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
// Generate Kube config files for master components
kubeConfigDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), kubeconfigPath)
kubeConfigDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), kubeconfigPath)
for _, kubeConfigFileName := range []string{
kubeadmconstants.AdminKubeConfigFileName,
kubeadmconstants.KubeletKubeConfigFileName,
Expand All @@ -165,7 +170,7 @@ func (c *Config) kubeConfigs(cfg *kubeadm.InitConfiguration, substrate *v1alpha1
}

func (c *Config) generateStaticPodManifests(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
manifestDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath)
manifestDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath)
// etcd phase adds cfg.CertificatesDir to static pod yaml for pods to read the certs from
cfg.CertificatesDir = certPKIPath
if err := etcd.CreateLocalEtcdStaticPodManifestFile(
Expand All @@ -176,7 +181,7 @@ func (c *Config) generateStaticPodManifests(cfg *kubeadm.InitConfiguration, subs
kubeadmconstants.KubeAPIServer,
kubeadmconstants.KubeControllerManager,
kubeadmconstants.KubeScheduler} {
err := controlplane.CreateStaticPodFiles(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath), "",
err := controlplane.CreateStaticPodFiles(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath), "",
&cfg.ClusterConfiguration, &cfg.LocalAPIEndpoint, false, componentName)
if err != nil {
return fmt.Errorf("creating static pod file for %v, %w", componentName, err)
Expand All @@ -200,7 +205,7 @@ func (c *Config) ensureBucket(ctx context.Context, substrate *v1alpha1.Substrate
}

func (c *Config) kubeletSystemService(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
localDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), kubeletSystemdPath)
localDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), kubeletSystemdPath)
if _, err := os.Stat(localDir); err != nil {
if !os.IsNotExist(err) {
return err
Expand Down Expand Up @@ -295,7 +300,7 @@ func (c *Config) ensureAuthenticatorConfig(ctx context.Context, substrate *v1alp
return fmt.Errorf("creating authenticator config, %w", err)
}
logging.FromContext(ctx).Debugf("Created config map for authenticator")
configDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), authenticatorConfigDir)
configDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), authenticatorConfigDir)
if err := os.MkdirAll(configDir, 0700); err != nil {
return fmt.Errorf("failed to create directory, %w", err)
}
Expand All @@ -318,17 +323,30 @@ func (c *Config) staticPodSpecForAuthenticator(ctx context.Context, substrate *v
if err != nil {
return fmt.Errorf("failed to marshal config map manifest, %w", err)
}
if err := ioutil.WriteFile(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)),
if err := ioutil.WriteFile(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)),
clusterManifestPath, "aws-iam-authenticator.yaml"), serialized, 0644); err != nil {
return fmt.Errorf("writing authenticator pod yaml, %w", err)
}
return nil
}

func (c *Config) ensureKitEnvDir() error {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("finding HOME dir %v", err)
}
c.clusterConfigPath = filepath.Join(home, ".kit/env")
if err := os.MkdirAll(c.clusterConfigPath, 0755); err != nil {
return fmt.Errorf("creating .kit/env dir %v", err)
}
return nil
}

// DirectoryIterator represents an iterator of a specified directory
type DirectoryIterator struct {
filePaths []string
bucket string
localDir string
next struct {
path string
f *os.File
Expand All @@ -351,6 +369,7 @@ func NewDirectoryIterator(bucket, dir string) s3manager.BatchUploadIterator {
return &DirectoryIterator{
filePaths: paths,
bucket: bucket,
localDir: dir,
}
}

Expand All @@ -373,6 +392,8 @@ func (d *DirectoryIterator) Err() error {

// UploadObject uploads a file
func (d *DirectoryIterator) UploadObject() s3manager.BatchUploadObject {
// trim the local path before uploading to S3
d.next.path = strings.TrimPrefix(d.next.path, d.localDir)
return s3manager.BatchUploadObject{
Object: &s3manager.UploadInput{Bucket: &d.bucket, Key: &d.next.path, Body: d.next.f},
After: d.next.f.Close,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ while [ true ]; do
echo "\$(date) Syncing S3 files for \$dir"
mkdir -p \$dir
existing_checksum=\$(ls -alR \$dir | md5sum)
aws s3 sync s3://%[2]s/tmp/%[2]s\$dir "\$dir"
aws s3 sync s3://%[2]s\$dir "\$dir"
new_checksum=\$(ls -alR \$dir | md5sum)
if [ "\$new_checksum" != "\$existing_checksum" ]; then
echo "Successfully synced from S3 \$dir"
Expand Down
16 changes: 12 additions & 4 deletions substrate/pkg/controller/substrate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package substrate
import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -39,6 +41,7 @@ import (
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -101,17 +104,22 @@ func (c *Controller) Reconcile(ctx context.Context, substrate *v1alpha1.Substrat
}
result, err := f(ctx, mutable)
if err != nil {
errs[i] = fmt.Errorf("reconciling %s, %w", reflect.ValueOf(resource).Elem().Type(), err)
cancel()
return
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "RequestLimitExceeded" {
logging.FromContext(ctx).Debugf("RequestLimitExceeded while reconciling %s, err %w", reflect.ValueOf(resource).Elem().Type(), err)
} else {
logging.FromContext(ctx).Errorf("reconciling %s, err %v", reflect.ValueOf(resource).Elem().Type(), err)
errs[i] = fmt.Errorf("reconciling %s, %w", reflect.ValueOf(resource).Elem().Type(), err)
cancel()
return
}
}
c.Lock()
runtime.Must(mergo.Merge(substrate, mutable))
c.Unlock()
if !result.Requeue && result.RequeueAfter == 0 {
return
}
time.Sleep(result.RequeueAfter + time.Second*1)
time.Sleep(result.RequeueAfter + time.Duration(rand.Intn(3000))*time.Millisecond)
}
})
return multierr.Combine(errs...)
Expand Down
8 changes: 4 additions & 4 deletions tests/images/clusterloader2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ FROM golang:1.16.4 AS builder
WORKDIR /go/src/k8s.io
RUN git clone https://github.com/kubernetes/perf-tests
WORKDIR /go/src/k8s.io/perf-tests/clusterloader2
RUN GOPROXY=direct go build -o ./clusterloader ./cmd
RUN GOPROXY=direct GOOS=linux CGO_ENABLED=0 go build -o ./clusterloader ./cmd

FROM amazon/aws-cli
FROM alpine:3.15.4
WORKDIR /
COPY --from=builder /go/src/k8s.io/perf-tests/clusterloader2/clusterloader .
ENTRYPOINT ["bash"]
COPY --from=builder /go/src/k8s.io/perf-tests/clusterloader2/clusterloader /clusterloader
ENTRYPOINT ["/clusterloader"]

0 comments on commit 3611f9b

Please sign in to comment.