Skip to content

Commit

Permalink
Fix leader doesn't update cache for request forwarded by follower (#1279
Browse files Browse the repository at this point in the history
)

* also add snapshot operation in security check white list
* clear cache when truncate or clear backend

Change-Id: Ibb6b1e0966d1df3a77b96aa8f48c30cd29c1132a
  • Loading branch information
Linary authored Nov 25, 2020
1 parent 9279bd7 commit 9bc985c
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public abstract class AbstractCache<K, V> implements Cache<K, V> {
public static final int DEFAULT_SIZE = 1 * MB;
public static final int MAX_INIT_CAP = 100 * MB;

public static final String ACTION_INVALID = "invalid";
public static final String ACTION_CLEAR = "clear";

protected static final Logger LOG = Log.logger(Cache.class);

private volatile long hits = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.baidu.hugegraph.backend.cache;

import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -138,7 +141,7 @@ private void listenChanges() {
this.graph(), event);
event.checkArgs(String.class, HugeType.class, Id.class);
Object[] args = event.args();
if ("invalid".equals(args[0])) {
if (ACTION_INVALID.equals(args[0])) {
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
if (type.isVertex()) {
Expand All @@ -153,7 +156,7 @@ private void listenChanges() {
this.edgesCache.clear();
}
return true;
} else if ("clear".equals(args[0])) {
} else if (ACTION_CLEAR.equals(args[0])) {
this.verticesCache.clear();
this.edgesCache.clear();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.baidu.hugegraph.backend.cache;

import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -104,7 +107,7 @@ private void listenChanges() {
this.graph(), event);
event.checkArgs(String.class, HugeType.class, Id.class);
Object[] args = event.args();
if ("invalid".equals(args[0])) {
if (ACTION_INVALID.equals(args[0])) {
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);
Expand All @@ -122,7 +125,7 @@ private void listenChanges() {
this.nameCache.invalidate(prefixedName);
}
return true;
} else if ("clear".equals(args[0])) {
} else if (ACTION_CLEAR.equals(args[0])) {
this.clearCache();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public Number queryNumber(Query query) {

@Override
public void beginTx() {
// Don't write raft log, commitTx(in statemachine) will call beginTx
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.baidu.hugegraph.backend.store.raft;

import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
Expand Down Expand Up @@ -255,7 +257,13 @@ public NodeOptions nodeOptions() throws IOException {
return nodeOptions;
}

public void notifyCache(HugeType type, Id id) {
public void clearCache() {
// Just choose two representatives used to represent schema and graph
this.notifyCache(ACTION_CLEAR, HugeType.VERTEX_LABEL, null);
this.notifyCache(ACTION_CLEAR, HugeType.VERTEX, null);
}

public void notifyCache(String action, HugeType type, Id id) {
EventHub eventHub;
if (type.isGraph()) {
eventHub = this.params.graphEventHub();
Expand All @@ -266,7 +274,7 @@ public void notifyCache(HugeType type, Id id) {
}
try {
// How to avoid update cache from server info
eventHub.notify(Events.CACHE, "invalid", type, id);
eventHub.notify(Events.CACHE, action, type, id);
} catch (RejectedExecutionException e) {
LOG.warn("Can't update cache due to EventHub is too busy");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ public class StoreCommand {
private final StoreType type;
private final StoreAction action;
private final byte[] data;
private final boolean forwarded;

public StoreCommand(StoreType type, StoreAction action, byte[] data) {
this(type, action, data, false);
}

public StoreCommand(StoreType type, StoreAction action,
byte[] data, boolean forwarded) {
this.type = type;
this.action = action;
if (data == null) {
Expand All @@ -42,6 +48,7 @@ public StoreCommand(StoreType type, StoreAction action, byte[] data) {
}
this.data[0] = (byte) this.type.getNumber();
this.data[1] = (byte) this.action.getNumber();
this.forwarded = forwarded;
}

public StoreType type() {
Expand All @@ -56,6 +63,10 @@ public byte[] data() {
return this.data;
}

public boolean forwarded() {
return this.forwarded;
}

public static void writeHeader(BytesBuffer buffer) {
buffer.write((byte) 0);
buffer.write((byte) 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package com.baidu.hugegraph.backend.store.raft;

import java.util.EnumMap;
import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

import org.slf4j.Logger;

Expand Down Expand Up @@ -56,13 +55,10 @@ public class StoreStateMachine extends StateMachineAdapter {

private final RaftSharedContext context;
private final StoreSnapshotFile snapshotFile;
private final Map<StoreAction, BiConsumer<BackendStore, BytesBuffer>> funcs;

public StoreStateMachine(RaftSharedContext context) {
this.context = context;
this.snapshotFile = new StoreSnapshotFile(context.stores());
this.funcs = new EnumMap<>(StoreAction.class);
this.registerCommands();
}

private BackendStore store(StoreType type) {
Expand All @@ -73,45 +69,18 @@ private RaftNode node() {
return this.context.node();
}

private void registerCommands() {
// clear
this.register(StoreAction.CLEAR, (store, buffer) -> {
boolean clearSpace = buffer.read() > 0;
store.clear(clearSpace);
});
this.register(StoreAction.TRUNCATE, (store, buffer) -> {
store.truncate();
});
this.register(StoreAction.SNAPSHOT, (store, buffer) -> {
assert store == null;
this.node().snapshot();
});
this.register(StoreAction.BEGIN_TX, (store, buffer) -> store.beginTx());
this.register(StoreAction.COMMIT_TX, (store, buffer) -> {
List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
store.beginTx();
for (BackendMutation mutation : ms) {
store.mutate(mutation);
// update cache on follower when graph run in general mode
if (this.context.graphMode() == GraphMode.NONE) {
this.updateCacheIfNeeded(mutation);
}
}
store.commitTx();
});
this.register(StoreAction.ROLLBACK_TX, (store, buffer) -> {
store.rollbackTx();
});
// increase counter
this.register(StoreAction.INCR_COUNTER, (store, buffer) -> {
IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
store.increaseCounter(counter.type(), counter.increment());
});
}

private void updateCacheIfNeeded(BackendMutation mutation) {
// Only follower need to update cache from store to tx
if (this.node().selfIsLeader()) {
private void updateCacheIfNeeded(BackendMutation mutation,
boolean forwarded) {
// Update cache only when graph run in general mode
if (this.context.graphMode() != GraphMode.NONE) {
return;
}
/*
* 1. Follower need to update cache from store to tx
* 2. If request come from leader, cache will be updated by upper layer
* 3. If request is forwarded by follower, need to update cache
*/
if (!forwarded && this.node().selfIsLeader()) {
return;
}
for (HugeType type : mutation.types()) {
Expand All @@ -121,16 +90,11 @@ private void updateCacheIfNeeded(BackendMutation mutation) {
for (java.util.Iterator<BackendAction> it = mutation.mutation(type);
it.hasNext();) {
BackendEntry entry = it.next().entry();
this.context.notifyCache(type, entry.originId());
this.context.notifyCache(ACTION_INVALID, type, entry.originId());
}
}
}

private void register(StoreAction action,
BiConsumer<BackendStore, BytesBuffer> func) {
this.funcs.put(action, func);
}

@Override
public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
Expand All @@ -141,13 +105,16 @@ public void onApply(Iterator iter) {
closure = (StoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
BytesBuffer buffer = BytesBuffer.wrap(closure.command().data());
StoreCommand command = closure.command();
BytesBuffer buffer = BytesBuffer.wrap(command.data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
boolean forwarded = command.forwarded();
// Let the producer thread to handle it
closure.complete(Status.OK(), () -> {
return this.applyCommand(type, action, buffer);
this.applyCommand(type, action, buffer, forwarded);
return null;
});
} else {
// Follower need readMutation data
Expand All @@ -161,7 +128,7 @@ public void onApply(Iterator iter) {
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
this.applyCommand(type, action, buffer);
this.applyCommand(type, action, buffer, false);
} catch (Throwable e) {
LOG.error("Failed to execute backend command: {}",
action, e);
Expand All @@ -184,12 +151,47 @@ public void onApply(Iterator iter) {
}
}

private Object applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer) {
private void applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer, boolean forwarded) {
BackendStore store = type != StoreType.ALL ? this.store(type) : null;
BiConsumer<BackendStore, BytesBuffer> func = this.funcs.get(action);
func.accept(store, buffer);
return null;
switch (action) {
case CLEAR:
boolean clearSpace = buffer.read() > 0;
store.clear(clearSpace);
this.context.clearCache();
break;
case TRUNCATE:
store.truncate();
this.context.clearCache();
break;
case SNAPSHOT:
assert store == null;
this.node().snapshot();
break;
case BEGIN_TX:
store.beginTx();
break;
case COMMIT_TX:
List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
// RaftBackendStore doesn't write raft log for beginTx
store.beginTx();
for (BackendMutation mutation : ms) {
store.mutate(mutation);
this.updateCacheIfNeeded(mutation, forwarded);
}
store.commitTx();
break;
case ROLLBACK_TX:
store.rollbackTx();
break;
// increase counter
case INCR_COUNTER:
IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
store.increaseCounter(counter.type(), counter.increment());
break;
default:
throw new IllegalArgumentException("Invalid action " + action);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ private StoreCommand parseStoreCommand(StoreCommandRequest request) {
StoreType type = request.getType();
StoreAction action = request.getAction();
byte[] data = request.getData().toByteArray();
return new StoreCommand(type, action, data);
return new StoreCommand(type, action, data, true);
}
}
Loading

0 comments on commit 9bc985c

Please sign in to comment.