Skip to content

Commit

Permalink
WIP2
Browse files Browse the repository at this point in the history
  • Loading branch information
Matovidlo committed Feb 13, 2025
1 parent b8cc377 commit c71be03
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 12 deletions.
3 changes: 0 additions & 3 deletions internal/pkg/service/common/etcdop/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package etcdop

import (
"context"
"fmt"
"time"

"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -158,7 +157,6 @@ func (v Prefix) WatchWithoutRestart(ctx context.Context, client etcd.Watcher, op
// The rawCh channel is closed by the context, so the context does not have to be checked here again.
rawCh := client.Watch(ctx, v.Prefix(), append([]etcd.OpOption{etcd.WithPrefix(), etcd.WithCreatedNotify()}, opts...)...)
for rawResp := range rawCh {
fmt.Println("rawResp from WatchWithoutRestart", rawResp.Header.Revision)
header := rawResp.Header
resp := WatchResponseRaw{}
resp.Header = &header
Expand Down Expand Up @@ -197,7 +195,6 @@ func (v Prefix) WatchWithoutRestart(ctx context.Context, client etcd.Watcher, op

// Map event type
for _, rawEvent := range rawResp.Events {
fmt.Println("rawEvent from WatchWithoutRestart", string(rawEvent.Kv.Key), rawEvent.Type)
var typ EventType
switch {
case rawEvent.IsCreate():
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/service/common/etcdop/watch_mirror_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package etcdop

import (
"context"
"fmt"
"maps"
"sync"

Expand Down Expand Up @@ -89,7 +88,6 @@ func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGr

// Atomically process all events
for _, event := range events {
fmt.Println("event from StartMirroring", event.Key, event.Type)
if m.filter != nil && !m.filter(event) {
continue
}
Expand Down Expand Up @@ -152,7 +150,6 @@ func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGr
m.updated = make(chan struct{})
m.updatedLock.Unlock()

fmt.Println("calling callbacks from mirror map", update, "restart", restart)
// Call callbacks
for _, fn := range m.onUpdate {
go fn(update)
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/service/stream/sink/router/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (r *pipelineRef) ensureOpened(c recordctx.Context) error {
}

// Try open, if needed, and there is no retry backoff delay active

r.lock.Lock()
defer r.lock.Unlock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ func (p *SinkPipeline) ReopenOnSinkModification() bool {
}

func (p *SinkPipeline) WriteRecord(c recordctx.Context) (pipelinePkg.WriteResult, error) {

// Make a local copy of pipelines while holding the read lock
p.writeLock.RLock()
defer p.writeLock.RUnlock()
return p.balancer.WriteRecord(c, p.pipelines)
pipelines := make([]balancer.SlicePipeline, len(p.pipelines))
copy(pipelines, p.pipelines)
p.writeLock.RUnlock()

return p.balancer.WriteRecord(c, pipelines)
}

// UpdateSlicePipelines reacts on slices changes - closes old pipelines and open new pipelines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ type SliceData struct {
LocalStorage localModel.Slice
}

func NewSlicePipeline(ctx context.Context, logger log.Logger, telemetry telemetry.Telemetry, connections *connection.Manager, encoding *encoding.Manager, ready *readyNotifier, slice *SliceData, onClose func(ctx context.Context, cause string)) *SlicePipeline {
func NewSlicePipeline(
ctx context.Context,
logger log.Logger,
telemetry telemetry.Telemetry,
connections *connection.Manager,
encoding *encoding.Manager,
ready *readyNotifier,
slice *SliceData,
onClose func(ctx context.Context, cause string),
) *SlicePipeline {
p := &SlicePipeline{
logger: logger.With(slice.SliceKey.Telemetry()...),
telemetry: telemetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func (r *Router) onSlicesModification(ctx context.Context, modifiedSinks []key.S
wg.Add(1)
go func() {
defer wg.Done()
if err := p.UpdateSlicePipelines(ctx, r.assignSinkSlices(sinkKey)); err != nil {
slices := r.assignSinkSlices(sinkKey)
if err := p.UpdateSlicePipelines(ctx, slices); err != nil {
r.logger.Errorf(ctx, `cannot update sink slices pipelines: %s, sink %q`, err, sinkKey)
}
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,15 @@ func OpenNetworkFile(
ctx := context.Background()
// It is expected to receive only one message, `io.EOF` or `message` that the termination is done
if _, err := termStream.Recv(); err != nil {
if errors.Is(err, io.EOF) {
fmt.Println("err", err, "unwrap", errors.Unwrap(err))

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt
if strings.HasSuffix(err.Error(), io.EOF.Error()) {

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings
onServerTermination(ctx, "remote server shutdown")
return
}

if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
if s.Code() == codes.Unavailable && errors.Is(s.Err(), io.EOF) {
fmt.Println("err", s.Err(), "unwrap", errors.Unwrap(s.Err()))

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt
if s.Code() == codes.Unavailable && strings.HasSuffix(s.Err().Error(), io.EOF.Error()) {

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings (typecheck)

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings) (typecheck)

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings
onServerTermination(ctx, "remote server shutdown")
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (m *Manager) OpenPipeline(
closeFunc,
)
if err != nil {
m.removePipeline(sliceKey)
return nil, err
}

Expand Down

0 comments on commit c71be03

Please sign in to comment.