forked from getsentry/sentry-kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrons.go
283 lines (233 loc) · 7.65 KB
/
crons.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/getsentry/sentry-go"
"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
type EventHandlerType string
const (
EventHandlerAdd EventHandlerType = "ADD"
EventHandlerUpdate EventHandlerType = "UPDATE"
EventHandlerDelete EventHandlerType = "DELETE"
)
var cronjobInformer cache.SharedIndexInformer
var jobInformer cache.SharedIndexInformer
// Starts the crons informer which has event handlers
// adds to the crons monitor data struct used for sending
// checkin events to Sentry
func startCronsInformers(ctx context.Context, namespace string) error {
clientset, err := getClientsetFromContext(ctx)
if err != nil {
return errors.New("failed to get clientset")
}
// Create factory that will produce both the cronjob informer and job informer
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
5*time.Second,
informers.WithNamespace(namespace),
)
// Create the cronjob informer
cronjobInformer, err = createCronjobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// Create the job informer
jobInformer, err = createJobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// Channel to tell the factory to stop the informers
doneChan := make(chan struct{})
factory.Start(doneChan)
// Sync the cronjob informer cache
if ok := cache.WaitForCacheSync(doneChan, cronjobInformer.HasSynced); !ok {
return errors.New("cronjob informer failed to sync")
}
// Sync the job informer cache
if ok := cache.WaitForCacheSync(doneChan, jobInformer.HasSynced); !ok {
return errors.New("job informer failed to sync")
}
// Wait for the channel to be closed
<-doneChan
return nil
}
// Captures sentry crons checkin event if appropriate
// by checking the job status to determine if the job just created pod (job starting)
// or if the job exited
func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerType EventHandlerType) error {
hub := sentry.GetHubFromContext(ctx)
if hub == nil {
return errors.New("cannot get hub from context")
}
// To avoid concurrency issue
hub = hub.Clone()
// Try to find the cronJob name that owns the job
// in order to get the crons monitor data
if len(job.OwnerReferences) == 0 {
return errors.New("job does not have cronjob reference")
}
cronjobRef := job.OwnerReferences[0]
if !*cronjobRef.Controller || cronjobRef.Kind != "CronJob" {
return errors.New("job does not have cronjob reference")
}
cronsMonitorData, ok := cronsMetaData.getCronsMonitorData(cronjobRef.Name)
if !ok {
return errors.New("cannot find cronJob data")
}
hub.WithScope(func(scope *sentry.Scope) {
// If DSN annotation provided, we bind a new client with that DSN
client, ok := dsnClientMapping.GetClientFromObject(ctx, &job.ObjectMeta, hub.Client().Options())
if ok {
hub.BindClient(client)
}
// Pass clone hub down with context
ctx = sentry.SetHubOnContext(ctx, hub)
// The job just begun so check in to start
if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
// Add the job to the cronJob informer data
checkinJobStarting(ctx, job, cronsMonitorData)
} else if job.Status.Active > 0 {
return
} else if job.Status.Failed > 0 || job.Status.Succeeded > 0 {
checkinJobEnding(ctx, job, cronsMonitorData)
return // Finished
}
})
return nil
}
// Sends the checkin event to sentry crons for when a job starts
func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {
logger := zerolog.Ctx(ctx)
hub := sentry.GetHubFromContext(ctx)
if hub == nil {
return errors.New("cannot get hub from context")
}
// Check if job already added to jobData slice
_, ok := cronsMonitorData.JobDatas[job.Name]
if ok {
return nil
}
logger.Debug().Msgf("Checking in at start of job: %s\n", job.Name)
// All containers running in the pod
checkinId := hub.CaptureCheckIn(
&sentry.CheckIn{
MonitorSlug: cronsMonitorData.MonitorSlug,
Status: sentry.CheckInStatusInProgress,
},
cronsMonitorData.monitorConfig,
)
cronsMonitorData.addJob(job, *checkinId)
return nil
}
// Sends the checkin event to sentry crons for when a job ends
func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {
hub := sentry.GetHubFromContext(ctx)
if hub == nil {
return errors.New("cannot get hub from context")
}
logger := zerolog.Ctx(ctx)
// Check desired number of pods have succeeded
var jobStatus sentry.CheckInStatus
if job.Status.Conditions == nil {
return nil
} else {
if job.Status.Conditions[0].Type == "Complete" {
jobStatus = sentry.CheckInStatusOK
} else if job.Status.Conditions[0].Type == "Failed" {
jobStatus = sentry.CheckInStatusError
} else {
return nil
}
}
// Get job data to retrieve the checkin ID
jobData, ok := cronsMonitorData.JobDatas[job.Name]
if !ok {
return nil
}
logger.Trace().Msgf("checking in at end of job: %s\n", job.Name)
hub.CaptureCheckIn(
&sentry.CheckIn{
ID: jobData.getCheckinId(),
MonitorSlug: cronsMonitorData.MonitorSlug,
Status: jobStatus,
},
cronsMonitorData.monitorConfig,
)
return nil
}
// Adds to the sentry events whenever it is associated with a cronjob
// so the sentry event contains the corresponding slug monitor, cronjob name, timestamp of when the cronjob began, and
// the k8s cronjob metadata
func runCronsDataHandler(ctx context.Context, scope *sentry.Scope, pod *v1.Pod, sentryEvent *sentry.Event) (bool, error) {
// get owningCronJob if exists
owningCronJob, err := getOwningCronJob(ctx, pod)
if err != nil {
return false, err
}
// pod not part of a cronjob
if owningCronJob == nil {
return false, nil
}
scope.SetContext("Monitor", sentry.Context{
"Slug": owningCronJob.Name,
})
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, owningCronJob.Kind, owningCronJob.Name)
setTagIfNotEmpty(scope, "cronjob_name", owningCronJob.Name)
// add breadcrumb with cronJob timestamps
scope.AddBreadcrumb(&sentry.Breadcrumb{
Message: fmt.Sprintf("Created cronjob %s", owningCronJob.Name),
Level: sentry.LevelInfo,
Timestamp: owningCronJob.CreationTimestamp.Time,
}, breadcrumbLimit)
metadataJson, err := prettyJson(owningCronJob.ObjectMeta)
if err == nil {
scope.SetContext("Cronjob", sentry.Context{
"Metadata": metadataJson,
})
} else {
return false, err
}
return true, nil
}
// returns the cronjob that is the grandparent of a pod if exists
// but returns nil is no cronjob is found
func getOwningCronJob(ctx context.Context, pod *v1.Pod) (*batchv1.CronJob, error) {
clientset, err := getClientsetFromContext(ctx)
if err != nil {
return nil, err
}
namespace := pod.Namespace
// first attempt to group events by cronJobs
var owningCronJob *batchv1.CronJob = nil
// check if the pod corresponds to a cronJob
for _, podRef := range pod.ObjectMeta.OwnerReferences {
// check the pod has a job as an owner
if !*podRef.Controller || podRef.Kind != "Job" {
continue
}
// find the owning job
owningJob, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), podRef.Name, metav1.GetOptions{})
if err != nil {
continue
}
// check if owning job is owned by a cronJob
for _, jobRef := range owningJob.ObjectMeta.OwnerReferences {
if !*jobRef.Controller || jobRef.Kind != "CronJob" {
continue
}
owningCronJob, err = clientset.BatchV1().CronJobs(namespace).Get(context.Background(), jobRef.Name, metav1.GetOptions{})
if err != nil {
continue
}
}
}
return owningCronJob, nil
}