Skip to content

Commit

Permalink
reconcile existing CRDs on controller startup (#61)
Browse files Browse the repository at this point in the history
* reconcile existing CRDs on controller startup

* remove testing changes to ctx

* added some comments to help with debugging

---------

Co-authored-by: Shubham Rai <[email protected]>
  • Loading branch information
Maanas-23 and shubhamrai1993 authored Jan 7, 2025
1 parent 81b0006 commit 1b55f30
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 6 deletions.
30 changes: 25 additions & 5 deletions operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
Expand Down Expand Up @@ -175,12 +176,13 @@ func mainWithError() error {
informerManager.Start()

// Set up the ElastiService controller
if err = (&controller.ElastiServiceReconciler{
reconciler := &controller.ElastiServiceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Logger: zapLogger,
InformerManager: informerManager,
}).SetupWithManager(mgr); err != nil {
}
if err = reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ElastiService")
sentry.CaptureException(err)
return fmt.Errorf("main: %w", err)
Expand Down Expand Up @@ -222,9 +224,27 @@ func mainWithError() error {
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
sentry.CaptureException(err)
mgrErrChan := make(chan error, 1)
// we are using a goroutine to start the manager because we don't want to block the main thread
go func() {
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
mgrErrChan <- fmt.Errorf("manager: %w", err)
}
}()

// Wait for cache to sync
if !mgr.GetCache().WaitForCacheSync(context.Background()) {
return fmt.Errorf("failed to sync cache")
}

if err = reconciler.Initialize(context.Background()); err != nil {
setupLog.Error(err, "unable to initialize controller")
return fmt.Errorf("main: %w", err)
}
setupLog.Info("initialized controller")

if err := <-mgrErrChan; err != nil {
return fmt.Errorf("main: %w", err)
}

Expand Down
51 changes: 51 additions & 0 deletions operator/internal/controller/elastiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/getsentry/sentry-go"
"k8s.io/apimachinery/pkg/types"

"truefoundry/elasti/operator/internal/crddirectory"
"truefoundry/elasti/operator/internal/informer"
Expand Down Expand Up @@ -137,3 +138,53 @@ func (r *ElastiServiceReconciler) getMutexForReconcile(key string) *sync.Mutex {
l, _ := r.ReconcileLocks.LoadOrStore(key, &sync.Mutex{})
return l.(*sync.Mutex)
}

func (r *ElastiServiceReconciler) Initialize(ctx context.Context) error {
if err := r.reconcileExistingCRDs(ctx); err != nil {
return fmt.Errorf("failed to reconcile existing CRDs: %w", err)
}
return nil
}

func (r *ElastiServiceReconciler) reconcileExistingCRDs(ctx context.Context) error {
crdList := &v1alpha1.ElastiServiceList{}
if err := r.List(ctx, crdList); err != nil {
return fmt.Errorf("failed to list ElastiServices: %w", err)
}
count := 0

for _, es := range crdList.Items {
// Skip if being deleted
if !es.ObjectMeta.DeletionTimestamp.IsZero() {
r.Logger.Debug("Skipping ElastiService because it is being deleted", zap.String("name", es.Name), zap.String("namespace", es.Namespace))
continue
}

req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: es.Name,
Namespace: es.Namespace,
},
}

if _, err := r.Reconcile(ctx, req); err != nil {
r.Logger.Error(
"Failed to reconcile existing ElastiService",
zap.String("name", es.Name),
zap.String("namespace", es.Namespace),
zap.Error(err),
)
continue
}
count++
r.Logger.Info(
"Reconciled existing ElastiService",
zap.String("name", es.Name),
zap.String("namespace", es.Namespace),
)
}

r.Logger.Info("Successfully reconciled all existing ElastiServices", zap.Int("count", count))

return nil
}
2 changes: 1 addition & 1 deletion operator/internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (m *Manager) enableInformer(req *RequestWatch) error {
})

// Wait for the cache to syncß
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
if !cache.WaitForCacheSync(informerStop, informer.HasSynced) {
m.logger.Error("Failed to sync informer", zap.String("key", key))
return errors.New("failed to sync informer")
}
Expand Down

0 comments on commit 1b55f30

Please sign in to comment.