AsyncLoadingCache.java
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package uk.co.spudsoft.jwtvalidatorvertx.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class backed by a Guava Cache that returns a Future for all elements whilst
* still ensuring that the loader is only called once at a time per element.
* @author njt
* @param <K> The key type for the cache.
* @param <V> The value type stored in the cache.
*/
public class AsyncLoadingCache<K, V> {
@SuppressWarnings("constantname")
private static final Logger logger = LoggerFactory.getLogger(AsyncLoadingCache.class);
/**
* Data class for items stored in the backing cache.
*/
private class Data {
private List<Promise<V>> initialPromises;
private boolean completed;
private boolean succeeded;
private long expiry;
private V result;
Data() {
this.initialPromises = new ArrayList<>();
this.expiry = Long.MAX_VALUE;
}
void update(boolean succeeded, V value) {
this.initialPromises = new ArrayList<>();
this.completed = true;
this.succeeded = succeeded;
if (succeeded) {
this.expiry = getExpiry.apply(value);
this.result = value;
}
}
}
private final Object lock = new Object();
private final Cache<K, Data> backing = CacheBuilder.newBuilder().build();
private final Function<V, Long> getExpiry;
/**
* Constructor.
* @param getExpiry Function to extract the expiry time (in ms since epoch) from a V type.
*/
@SuppressWarnings("unchecked")
public AsyncLoadingCache(Function<V, Long> getExpiry) {
this.getExpiry = getExpiry;
}
/**
* Return true if the cache already contains a value for the provided key.
* @param key the key to check.
* @return true if the cache already contains a value for the provided key.
*/
public boolean containsKey(K key) {
return backing.asMap().containsKey(key);
}
/**
* Associates {@code value} with {@code key} in this cache.
*
* If the cache previously contained a value associated with {@code key}, the old value is replaced by {@code value}.
*
* p>Prefer {@link #get(Object, Callable)} when using the conventional "if cached, return; otherwise create, cache and return" pattern.
*
* @param key the key to set.
* @param value the value to set.
*/
public void put(K key, V value) {
Data data = new Data();
data.update(true, value);
backing.put(key, new Data());
}
/**
* Get an item from the cache, returning a Future in case the item is not already there.
* @param key The key for the item in the cache.
* @param loader Callable that actually gets the value.
* @return The value returned either by this Callable or some previous instance of it.
*/
public Future<V> get(K key, Callable<Future<V>> loader) {
Promise<V> promise;
Data data;
synchronized (lock) {
data = backing.getIfPresent(key);
if (data != null && (data.expiry > System.currentTimeMillis())) {
if (data.completed) {
if (data.succeeded) {
return Future.succeededFuture(data.result);
} else {
// Previous request failed, so try again (don't cache failures)
promise = createAndAddInitialPromise(data.initialPromises);
data.completed = false;
}
} else {
promise = createAndAddInitialPromise(data.initialPromises);
return promise.future();
}
} else {
data = new Data();
promise = createAndAddInitialPromise(data.initialPromises);
backing.put(key, data);
}
}
Data finalData = data;
try {
loader.call().onComplete(ar -> handleAfterLoaderCall(ar, finalData));
} catch (Throwable ex) {
logger.error("Failed to call loader: ", ex);
return Future.failedFuture(ex);
}
return promise.future();
}
/**
* Get an immutable view of the keys currently in the backing map.
* @return an immutable view of the keys currently in the backing map.
*/
public Set<K> keySet() {
return ImmutableSet.copyOf(backing.asMap().keySet());
}
private Promise<V> createAndAddInitialPromise(List<Promise<V>> initialPromises) {
Promise<V> promise = Promise.promise();
initialPromises.add(promise);
return promise;
}
private void handleAfterLoaderCall(AsyncResult<V> asyncResult, Data data) {
boolean succeeded = asyncResult.succeeded();
V result = asyncResult.result();
List<Promise<V>> initialPromises;
synchronized (lock) {
initialPromises = data.initialPromises;
data.update(succeeded, result);
}
if (succeeded) {
for (Promise<V> initialPromise : initialPromises) {
initialPromise.complete(result);
}
} else {
for (Promise<V> initialPromise : initialPromises) {
initialPromise.fail(asyncResult.cause());
}
}
}
}