diff --git a/src/main/java/edu/snu/cay/services/et/driver/impl/MigrationManager.java b/src/main/java/edu/snu/cay/services/et/driver/impl/MigrationManager.java index 561b328d8..ad65dac0f 100644 --- a/src/main/java/edu/snu/cay/services/et/driver/impl/MigrationManager.java +++ b/src/main/java/edu/snu/cay/services/et/driver/impl/MigrationManager.java @@ -71,7 +71,7 @@ private MigrationManager(final MessageSender msgSender, * @param tableId a table id * @param executorId a executor id */ - void registerSubscription(final String tableId, final String executorId) { + synchronized void registerSubscription(final String tableId, final String executorId) { subscribersPerTable.compute(tableId, (tId, executorIdSet) -> { final Set value = executorIdSet == null ? Collections.synchronizedSet(new HashSet<>()) : executorIdSet; if (!value.add(executorId)) { @@ -87,7 +87,7 @@ void registerSubscription(final String tableId, final String executorId) { * @param tableId a table id * @param executorId a executor id */ - void unregisterSubscription(final String tableId, final String executorId) { + synchronized void unregisterSubscription(final String tableId, final String executorId) { subscribersPerTable.compute(tableId, (tId, executorIdSet) -> { if (executorIdSet == null) { throw new RuntimeException(String.format("Table %s does not exist", tId)); @@ -105,7 +105,7 @@ void unregisterSubscription(final String tableId, final String executorId) { * @param tableId a table id * @return a set of unregistered executor ids */ - Set unregisterSubscribers(final String tableId) { + synchronized Set unregisterSubscribers(final String tableId) { return subscribersPerTable.remove(tableId); } @@ -114,7 +114,8 @@ Set unregisterSubscribers(final String tableId) { * @return a set of executor ids that subscribe the table */ Set getSubscribers(final String tableId) { - return new HashSet<>(subscribersPerTable.get(tableId)); + final Set subscribers = subscribersPerTable.get(tableId); + return subscribers == null ? Collections.emptySet() : new HashSet<>(subscribers); } /**