Skip to content

Commit

Permalink
add send delta metric
Browse files Browse the repository at this point in the history
  • Loading branch information
KSmigielski committed Feb 3, 2025
1 parent 7ba3f47 commit 815c897
Show file tree
Hide file tree
Showing 53 changed files with 4,379 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/deployment/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ and then run the `bin/envoy-control-runner` created from `distZip` task.

```xml

<logger name="io.envoyproxy.controlplane.cache.SimpleCache" level="WARN"/>
<logger name="io.envoyproxy.controlplane.io.envoyproxy.controlplane.cache.SimpleCache" level="WARN"/>
<logger name="io.envoyproxy.controlplane.cache.DiscoveryServer" level="WARN"/>
```

Expand Down
2 changes: 1 addition & 1 deletion envoy-control-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {

implementation group: 'com.google.re2j', name: 're2j', version: versions.re2j

api group: 'io.envoyproxy.controlplane', name: 'server', version: versions.java_controlplane
api group: 'io.envoyproxy.controlplane', name: 'api', version: versions.java_controlplane

implementation group: 'io.grpc', name: 'grpc-netty', version: versions.grpc

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.envoyproxy.controlplane.cache;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

public abstract class AbstractWatch<V, T> {

private static final AtomicIntegerFieldUpdater<AbstractWatch> isCancelledUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled");
private final V request;
private final Consumer<T> responseConsumer;
private volatile int isCancelled = 0;
private Runnable stop;

/**
* Construct a watch.
*
* @param request the original request for the watch
* @param responseConsumer handler for outgoing response messages
*/
public AbstractWatch(V request, Consumer<T> responseConsumer) {
this.request = request;
this.responseConsumer = responseConsumer;
}

/**
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel
* may be called multiple times, with each subsequent call being a no-op.
*/
public void cancel() {
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
if (stop != null) {
stop.run();
}
}
}

/**
* Returns boolean indicating whether or not the watch has been cancelled.
*/
public boolean isCancelled() {
return isCancelledUpdater.get(this) == 1;
}

/**
* Returns the original request for the watch.
*/
public V request() {
return request;
}

/**
* Sends the given response to the watch's response handler.
*
* @param response the response to be handled
* @throws io.envoyproxy.controlplane.cache.WatchCancelledException if the watch has already been cancelled
*/
public void respond(T response) throws io.envoyproxy.controlplane.cache.WatchCancelledException {
if (isCancelled()) {
throw new WatchCancelledException();
}

responseConsumer.accept(response);
}

/**
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it
* ensures that this stop callback is only executed once.
*/
public void setStop(Runnable stop) {
this.stop = stop;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.envoyproxy.controlplane.cache;

import io.envoyproxy.controlplane.cache.ConfigWatcher;
import io.envoyproxy.controlplane.cache.StatusInfo;

import javax.annotation.concurrent.ThreadSafe;
import java.util.Collection;

/**
* {@code Cache} is a generic config cache with support for watchers.
*/
@ThreadSafe
public interface Cache<T> extends ConfigWatcher {

/**
* Returns all known groups.
*
*/
Collection<T> groups();

/**
* Returns the current {@link StatusInfo} for the given group.
*
* @param group the node group whose status is being fetched
*/
StatusInfo<T> statusInfo(T group);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.envoyproxy.controlplane.cache;

import io.envoyproxy.controlplane.cache.Cache;
import io.envoyproxy.controlplane.cache.MutableStatusInfo;
import io.envoyproxy.controlplane.cache.StatusInfo;
import io.envoyproxy.controlplane.cache.Watch;

import javax.annotation.concurrent.ThreadSafe;

/**
* {@code CacheStatusInfo} provides a default implementation of {@link StatusInfo} for use in {@link Cache}
* implementations.
*/
@ThreadSafe
public class CacheStatusInfo<T> extends MutableStatusInfo<T, Watch> {
public CacheStatusInfo(T nodeGroup) {
super(nodeGroup);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.envoyproxy.controlplane.cache;

import io.envoyproxy.controlplane.cache.CacheStatusInfo;
import io.envoyproxy.controlplane.cache.DeltaCacheStatusInfo;
import io.envoyproxy.controlplane.cache.Resources;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CacheStatusInfoAggregator<T> {
private final ConcurrentMap<T, ConcurrentMap<io.envoyproxy.controlplane.cache.Resources.ResourceType, io.envoyproxy.controlplane.cache.CacheStatusInfo<T>>> statuses =
new ConcurrentHashMap<>();
private final ConcurrentMap<T, ConcurrentMap<io.envoyproxy.controlplane.cache.Resources.ResourceType, io.envoyproxy.controlplane.cache.DeltaCacheStatusInfo<T>>> deltaStatuses =
new ConcurrentHashMap<>();

public Collection<T> groups() {
return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet());
}

public void remove(T group) {
statuses.remove(group);
deltaStatuses.remove(group);
}

/**
* Returns map of delta status infos for group identifier.
*
* @param group group identifier.
*/
public Map<io.envoyproxy.controlplane.cache.Resources.ResourceType, io.envoyproxy.controlplane.cache.DeltaCacheStatusInfo<T>> getDeltaStatus(T group) {
return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>());
}

/**
* Returns map of status infos for group identifier.
*
* @param group group identifier.
*/
public Map<io.envoyproxy.controlplane.cache.Resources.ResourceType, io.envoyproxy.controlplane.cache.CacheStatusInfo<T>> getStatus(T group) {
return statuses.getOrDefault(group, new ConcurrentHashMap<>());
}

/**
* Check if statuses for specific group have any watcher.
*
* @param group group identifier.
* @return true if statuses for specific group have any watcher.
*/
public boolean hasStatuses(T group) {
Map<io.envoyproxy.controlplane.cache.Resources.ResourceType, io.envoyproxy.controlplane.cache.CacheStatusInfo<T>> status = getStatus(group);
Map<io.envoyproxy.controlplane.cache.Resources.ResourceType, io.envoyproxy.controlplane.cache.DeltaCacheStatusInfo<T>> deltaStatus = getDeltaStatus(group);
return status.values().stream().mapToLong(io.envoyproxy.controlplane.cache.CacheStatusInfo::numWatches).sum()
+ deltaStatus.values().stream().mapToLong(io.envoyproxy.controlplane.cache.DeltaCacheStatusInfo::numWatches).sum() > 0;
}

/**
* Returns delta status info for group identifier and creates new one if it doesn't exist.
*
* @param group group identifier.
* @param resourceType resource type.
*/
public io.envoyproxy.controlplane.cache.DeltaCacheStatusInfo<T> getOrAddDeltaStatusInfo(T group, io.envoyproxy.controlplane.cache.Resources.ResourceType resourceType) {
return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group));
}

/**
* Returns status info for group identifier and creates new one if it doesn't exist.
*
* @param group group identifier.
* @param resourceType resource type.
*/
public io.envoyproxy.controlplane.cache.CacheStatusInfo<T> getOrAddStatusInfo(T group, Resources.ResourceType resourceType) {
return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.envoyproxy.controlplane.cache;

import javax.annotation.concurrent.ThreadSafe;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

/**
* {@code ConfigWatcher} requests watches for configuration resources by type, node, last applied version identifier,
* and resource names hint. The watch should send the responses when they are ready. The watch can be cancelled by the
* consumer, in effect terminating the watch for the request. ConfigWatcher implementations must be thread-safe.
*/
@ThreadSafe
public interface ConfigWatcher {

/**
* Returns a new configuration resource {@link Watch} for the given discovery request.
*
* @param ads is the watch for an ADS request?
* @param request the discovery request (node, names, etc.) to use to generate the watch
* @param knownResourceNames resources that are already known to the caller
* @param responseConsumer the response handler, used to process outgoing response messages
* @param hasClusterChanged Indicates if EDS should be sent immediately, even if version has not been changed.
* Supported in ADS mode.
*
* @param allowDefaultEmptyEdsUpdate indicates if default empty EDS response should be sent when some clusters
* in request are missing in snapshot. Supported in ADS mode.
*
*/
Watch createWatch(
boolean ads,
XdsRequest request,
Set<String> knownResourceNames,
Consumer<Response> responseConsumer,
boolean hasClusterChanged,
boolean allowDefaultEmptyEdsUpdate);

/**
* Returns a new configuration resource {@link Watch} for the given discovery request.
*
* @param request the discovery request (node, names, etc.) to use to generate the watch
* @param requesterVersion the last version applied by the requester
* @param resourceVersions resources that are already known to the requester
* @param pendingResources resources that the caller is waiting for
* @param isWildcard indicates if the stream is in wildcard mode
* @param responseConsumer the response handler, used to process outgoing response messages
* @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed.
* Supported in ADS mode.
*/
DeltaWatch createDeltaWatch(
DeltaXdsRequest request,
String requesterVersion,
Map<String, String> resourceVersions,
Set<String> pendingResources,
boolean isWildcard,
Consumer<DeltaResponse> responseConsumer,
boolean hasClusterChanged);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.envoyproxy.controlplane.cache;

import io.envoyproxy.controlplane.cache.DeltaWatch;
import io.envoyproxy.controlplane.cache.MutableStatusInfo;

public class DeltaCacheStatusInfo<T> extends MutableStatusInfo<T, DeltaWatch> {

public DeltaCacheStatusInfo(T nodeGroup) {
super(nodeGroup);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.envoyproxy.controlplane.cache;

import com.google.auto.value.AutoValue;
import com.google.protobuf.Message;

import java.util.List;
import java.util.Map;

/**
* {@code Response} is a data class that contains the response for an assumed configuration type.
*/
@AutoValue
public abstract class DeltaResponse {

public static DeltaResponse create(DeltaXdsRequest request,
Map<String, io.envoyproxy.controlplane.cache.VersionedResource<?>> resources,
List<String> removedResources,
String version) {
return new AutoValue_DeltaResponse(request, resources, removedResources, version);
}

/**
* Returns the original request associated with the response.
*/
public abstract DeltaXdsRequest request();

/**
* Returns the resources to include in the response.
*/
public abstract Map<String, VersionedResource<? extends Message>> resources();

/**
* Returns the removed resources to include in the response.
*/
public abstract List<String> removedResources();

/**
* Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version
* as an acknowledgement.
*/
public abstract String version();
}
Loading

0 comments on commit 815c897

Please sign in to comment.