Commit 41a454df authored by Niels Charlier's avatar Niels Charlier
Browse files

GEOS-9096 jdbcconfig/hazelcast thread safety fixes

parent b58deb6b
......@@ -96,7 +96,8 @@ public class ConfigChangeEvent extends Event {
public enum Type {
ADD,
REMOVE,
MODIFY
MODIFY,
POST_MODIFY
}
/** id of object */
......
......@@ -38,10 +38,12 @@ import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogListener;
import org.geoserver.catalog.event.CatalogModifyEvent;
import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent;
import org.geoserver.catalog.event.impl.CatalogAddEventImpl;
import org.geoserver.catalog.event.impl.CatalogEventImpl;
import org.geoserver.catalog.event.impl.CatalogModifyEventImpl;
import org.geoserver.catalog.event.impl.CatalogPostModifyEventImpl;
import org.geoserver.catalog.event.impl.CatalogRemoveEventImpl;
import org.geoserver.cluster.ConfigChangeEvent;
......@@ -202,6 +204,13 @@ public class EventHzSynchronizer extends HzSynchronizer {
evt = new CatalogAddEventImpl();
break;
case MODIFY:
subj = getCatalogInfo(cat, id, clazz);
notifyMethod =
CatalogListener.class.getMethod(
"handleModifyEvent", CatalogModifyEvent.class);
evt = new CatalogModifyEventImpl();
break;
case POST_MODIFY:
subj = getCatalogInfo(cat, id, clazz);
notifyMethod =
CatalogListener.class.getMethod(
......@@ -245,7 +254,11 @@ public class EventHzSynchronizer extends HzSynchronizer {
for (CatalogListener l : ImmutableList.copyOf(cat.getListeners())) {
// Don't notify self otherwise the event bounces back out into the
// cluster.
if (l != this && isStarted()) {
if (l != this
&& isStarted()
&& // HACK-HACK-HACK -- prevent infinite loop with update sequence listener
!"org.geoserver.config.UpdateSequenceListener"
.equals(l.getClass().getCanonicalName())) {
notifyMethod.invoke(l, evt);
}
}
......@@ -293,7 +306,12 @@ public class EventHzSynchronizer extends HzSynchronizer {
for (ConfigurationListener l : gs.getListeners()) {
try {
if (l != this) notifyMethod.invoke(l, subj);
if (l != this
&& // HACK-HACK-HACK -- prevent infinite loop with update sequence listener
!"org.geoserver.config.UpdateSequenceListener"
.equals(l.getClass().getCanonicalName())) {
notifyMethod.invoke(l, subj);
}
} catch (Exception ex) {
LOGGER.log(
Level.WARNING, format("%s - Event dispatch failed: %s", nodeId(), ce), ex);
......
......@@ -29,6 +29,7 @@ import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogEvent;
import org.geoserver.catalog.event.CatalogModifyEvent;
import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent;
import org.geoserver.cluster.ConfigChangeEvent;
......@@ -184,10 +185,15 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer
}
@Override
public void handlePostModifyEvent(CatalogPostModifyEvent event) throws CatalogException {
public void handleModifyEvent(CatalogModifyEvent event) throws CatalogException {
dispatch(newChangeEvent(event, Type.MODIFY));
}
@Override
public void handlePostModifyEvent(CatalogPostModifyEvent event) throws CatalogException {
dispatch(newChangeEvent(event, Type.POST_MODIFY));
}
@Override
public void handleRemoveEvent(CatalogRemoveEvent event) throws CatalogException {
dispatch(newChangeEvent(event, Type.REMOVE));
......@@ -207,10 +213,24 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer
}
@Override
public void handlePostServiceChange(ServiceInfo service) {
public void handlePostGlobalChange(GeoServerInfo global) {
dispatch(newChangeEvent(global, Type.POST_MODIFY));
}
@Override
public void handleServiceChange(
ServiceInfo service,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
dispatch(newChangeEvent(service, Type.MODIFY));
}
@Override
public void handlePostServiceChange(ServiceInfo service) {
dispatch(newChangeEvent(service, Type.POST_MODIFY));
}
@Override
public void handleServiceRemove(ServiceInfo service) {
dispatch(newChangeEvent(service, Type.REMOVE));
......@@ -221,6 +241,19 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer
dispatch(newChangeEvent(settings, Type.ADD));
}
@Override
public void handleSettingsModified(
SettingsInfo settings,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// optimization for update sequence
if (propertyNames.size() == 1 && propertyNames.contains("updateSequence")) {
return;
}
dispatch(newChangeEvent(settings, Type.MODIFY));
}
@Override
public void handleSettingsPostModified(SettingsInfo settings) {
dispatch(newChangeEvent(settings, Type.MODIFY));
......
......@@ -28,6 +28,7 @@ import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogEvent;
import org.geoserver.catalog.event.CatalogListener;
import org.geoserver.catalog.event.CatalogModifyEvent;
import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent;
import org.geoserver.catalog.impl.DataStoreInfoImpl;
......@@ -307,6 +308,9 @@ public class EventHzSynchronizerRecvTest extends HzSynchronizerRecvTest {
catListener.handleAddEvent((CatalogAddEvent) catEvent(info));
break;
case MODIFY:
catListener.handleModifyEvent((CatalogModifyEvent) catEvent(info));
break;
case POST_MODIFY:
catListener.handlePostModifyEvent((CatalogPostModifyEvent) catEvent(info));
break;
case REMOVE:
......
......@@ -61,6 +61,13 @@ public abstract class HzSynchronizerSendTest extends HzSynchronizerTest {
expectEvent(
localAddress, layerName, layerWorkspace, layerId, LayerInfo.class, Type.MODIFY);
expectEvent(
localAddress,
layerName,
layerWorkspace,
layerId,
LayerInfo.class,
Type.POST_MODIFY);
}
replay(info);
{
......@@ -156,6 +163,13 @@ public abstract class HzSynchronizerSendTest extends HzSynchronizerTest {
globalId,
GeoServerInfo.class,
Type.MODIFY);
expectEvent(
localAddress,
globalName,
globalWorkspace,
globalId,
GeoServerInfo.class,
Type.POST_MODIFY);
}
replay(info);
{
......
......@@ -46,7 +46,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
......@@ -55,22 +58,14 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.wicket.util.string.Strings;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.CatalogVisitorAdapter;
import org.geoserver.catalog.CoverageInfo;
import org.geoserver.catalog.CoverageStoreInfo;
import org.geoserver.catalog.DataStoreInfo;
import org.geoserver.catalog.FeatureTypeInfo;
import org.geoserver.catalog.Info;
import org.geoserver.catalog.LayerGroupInfo;
import org.geoserver.catalog.LayerInfo;
import org.geoserver.catalog.MetadataMap;
import org.geoserver.catalog.NamespaceInfo;
import org.geoserver.catalog.Predicates;
import org.geoserver.catalog.PublishedInfo;
import org.geoserver.catalog.ResourceInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WMSLayerInfo;
import org.geoserver.catalog.WMSStoreInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogListener;
......@@ -148,9 +143,10 @@ public class ConfigDatabase {
private InfoRowMapper<Info> configRowMapper;
private CatalogClearingListener catalogListener;
private ConfigClearingListener configListener;
private ConcurrentMap<String, ReentrantLock> locks;
/** Protected default constructor needed by spring-jdbc instrumentation */
protected ConfigDatabase() {
//
......@@ -180,6 +176,7 @@ public class ConfigDatabase {
cache = cacheProvider.getCache("catalog");
identityCache = cacheProvider.getCache("catalogNames");
serviceCache = cacheProvider.getCache("services");
locks = new ConcurrentHashMap<>();
}
private Dialect dialect() {
......@@ -315,7 +312,6 @@ public class ConfigDatabase {
final Filter simplifiedFilter =
(Filter) sqlBuilder.getSupportedFilter().accept(filterSimplifier, null);
if (simplifiedFilter instanceof PropertyIsEqualTo) {
String id = null;
PropertyIsEqualTo isEqualTo = (PropertyIsEqualTo) simplifiedFilter;
if (isEqualTo.getExpression1() instanceof PropertyName
&& isEqualTo.getExpression2() instanceof Literal
......@@ -535,17 +531,6 @@ public class ConfigDatabase {
}
addAttributes(info, key);
cache.put(id, info);
for (InfoIdentity identity : InfoIdentities.get().getIdentities(info)) {
if (identityCache.getIfPresent(identity) == null) {
identityCache.put(identity, id);
} else {
// not a unique identity
identityCache.invalidate(identity);
}
}
return getById(id, interf);
}
......@@ -761,12 +746,6 @@ public class ConfigDatabase {
return;
}
if (info instanceof ServiceInfo) {
disposeServiceCache();
}
identityCache.invalidateAll(InfoIdentities.get().getIdentities(info));
cache.invalidate(info.getId());
String deleteObject = "delete from object where id = :id";
String deleteRelatedProperties = "delete from object_property where related_oid = :oid";
......@@ -805,12 +784,6 @@ public class ConfigDatabase {
final Info oldObject = (Info) modificationProxy.getProxyObject();
if (info instanceof ServiceInfo) {
disposeServiceCache();
}
identityCache.invalidateAll(InfoIdentities.get().getIdentities(oldObject));
cache.invalidate(id);
// get changed properties before h.commit()s
final Iterable<Property> changedProperties = dbMappings.changedProperties(oldObject, info);
......@@ -849,7 +822,6 @@ public class ConfigDatabase {
updateQueryableProperties(oldObject, objectId, changedProperties);
cache.invalidate(id);
Class<T> clazz = ClassMappings.fromImpl(oldObject.getClass()).getInterface();
// / <HACK>
......@@ -878,15 +850,6 @@ public class ConfigDatabase {
}
// / </HACK>
for (InfoIdentity identity : InfoIdentities.get().getIdentities(info)) {
if (identityCache.getIfPresent(identity) == null) {
identityCache.put(identity, id);
} else {
// not a unique identity
identityCache.invalidate(identity);
}
}
return getById(id, clazz);
}
......@@ -1040,7 +1003,33 @@ public class ConfigDatabase {
valueLoader = new ConfigLoader(id);
}
info = cache.get(id, valueLoader);
ReentrantLock lock = locks.get(id);
if (lock == null) {
lock = new ReentrantLock();
locks.put(id, lock);
}
info = cache.getIfPresent(id);
if (info == null) {
// we try the write lock
if (!lock.isHeldByCurrentThread() && lock.tryLock()) {
try {
info = cache.get(id, valueLoader);
} finally {
lock.unlock();
}
}
}
if (info == null) {
// if the write lock was locked, we fall back
// to a read-only method
try {
info = valueLoader.call();
} catch (Exception e) {
throw new ExecutionException(e);
}
}
} catch (CacheLoader.InvalidCacheLoadException notFound) {
return null;
......@@ -1477,7 +1466,7 @@ public class ConfigDatabase {
return !propertyTypes.isEmpty();
}
void clear(Info info) {
void clearCache(Info info) {
if (info instanceof ServiceInfo) {
// need to figure out how to remove only the relevant cache
// entries for the service info, like with InfoIdenties below,
......@@ -1488,6 +1477,27 @@ public class ConfigDatabase {
cache.invalidate(info.getId());
}
void clearCacheIfPresent(String id) {
Info info = cache.getIfPresent(id);
if (info != null) {
clearCache(info);
}
}
void updateCache(Info info) {
info = ModificationProxy.unwrap(info);
cache.put(info.getId(), info);
List<InfoIdentity> identities = InfoIdentities.get().getIdentities(info);
for (InfoIdentity identity : identities) {
if (identityCache.getIfPresent(identity) == null) {
identityCache.put(identity, info.getId());
} else {
// not a unique identity
identityCache.invalidate(identity);
}
}
}
public <T extends Info> T get(Class<T> type, Filter filter) throws IllegalArgumentException {
CloseableIterator<T> it =
......@@ -1526,110 +1536,139 @@ public class ConfigDatabase {
return result;
}
/** Listens to catalog events clearing cache entires when resources are modified. */
// Copied from org.geoserver.catalog.ResourcePool
public class CatalogClearingListener extends CatalogVisitorAdapter implements CatalogListener {
public void handleAddEvent(CatalogAddEvent event) {}
public void handleModifyEvent(CatalogModifyEvent event) {}
public void handlePostModifyEvent(CatalogPostModifyEvent event) {
event.getSource().accept(this);
}
public void handleRemoveEvent(CatalogRemoveEvent event) {
event.getSource().accept(this);
private void acquireWriteLock(String id) {
ReentrantLock lock = locks.get(id);
if (lock == null) {
lock = new ReentrantLock();
locks.put(id, lock);
}
lock.lock();
}
public void reloaded() {}
private void releaseWriteLock(String id) {
locks.get(id).unlock();
}
@Override
public void visit(DataStoreInfo dataStore) {
clear(dataStore);
}
/** Listens to catalog events clearing cache entires when resources are modified. */
// Copied from org.geoserver.catalog.ResourcePool
public class CatalogClearingListener implements CatalogListener {
@Override
public void visit(CoverageStoreInfo coverageStore) {
clear(coverageStore);
public void handleAddEvent(CatalogAddEvent event) {
updateCache(event.getSource());
}
@Override
public void visit(FeatureTypeInfo featureType) {
clear(featureType);
public void handleModifyEvent(CatalogModifyEvent event) {
// make sure that cache is not refilled before commit
if (event.getSource() instanceof ResourceInfo) {
String liId =
getIdByIdentity(LayerInfo.class, "resource.id", event.getSource().getId());
acquireWriteLock(liId);
clearCacheIfPresent(liId);
}
acquireWriteLock(event.getSource().getId());
clearCache(event.getSource());
}
@Override
public void visit(WMSStoreInfo wmsStore) {
clear(wmsStore);
public void handlePostModifyEvent(CatalogPostModifyEvent event) {
updateCache(event.getSource());
releaseWriteLock(event.getSource().getId());
if (event.getSource() instanceof ResourceInfo) {
String liId =
getIdByIdentity(LayerInfo.class, "resource.id", event.getSource().getId());
releaseWriteLock(liId);
}
}
@Override
public void visit(StyleInfo style) {
clear(style);
public void handleRemoveEvent(CatalogRemoveEvent event) {
clearCache(event.getSource());
}
@Override
public void visit(WorkspaceInfo workspace) {
clear(workspace);
}
public void reloaded() {}
}
/** Listens to configuration events clearing cache entires when resources are modified. */
public class ConfigClearingListener extends ConfigurationListenerAdapter {
@Override
public void visit(NamespaceInfo workspace) {
clear(workspace);
public void handleSettingsRemoved(SettingsInfo settings) {
clearCache(settings);
}
@Override
public void visit(CoverageInfo coverage) {
clear(coverage);
public void handleServiceRemove(ServiceInfo service) {
clearCache(service);
}
@Override
public void visit(LayerInfo layer) {
clear(layer);
public void handleGlobalChange(
GeoServerInfo global,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// make sure that cache is not refilled before commit
acquireWriteLock(global.getId());
clearCache(global);
}
@Override
public void visit(LayerGroupInfo layerGroup) {
clear(layerGroup);
public void handlePostGlobalChange(GeoServerInfo global) {
updateCache(global);
releaseWriteLock(global.getId());
}
@Override
public void visit(WMSLayerInfo wmsLayerInfoImpl) {
clear(wmsLayerInfoImpl);
public void handleSettingsModified(
SettingsInfo settings,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// make sure that cache is not refilled before commit
acquireWriteLock(settings.getId());
clearCache(settings);
}
}
/** Listens to configuration events clearing cache entires when resources are modified. */
public class ConfigClearingListener extends ConfigurationListenerAdapter {
@Override
public void handlePostGlobalChange(GeoServerInfo global) {
clear(global);
public void handleSettingsPostModified(SettingsInfo settings) {
updateCache(settings);
releaseWriteLock(settings.getId());
}
@Override
public void handleSettingsPostModified(SettingsInfo settings) {
clear(settings);
public void handleLoggingChange(
LoggingInfo logging,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// make sure that cache is not refilled before commit
acquireWriteLock(logging.getId());
clearCache(logging);
}
@Override
public void handleSettingsRemoved(SettingsInfo settings) {
clear(settings);
public void handlePostLoggingChange(LoggingInfo logging) {
updateCache(logging);
releaseWriteLock(logging.getId());
}
@Override
public void handlePostLoggingChange(LoggingInfo logging) {
clear(logging);
public void handleServiceChange(
ServiceInfo service,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// make sure that cache is not refilled before commit
acquireWriteLock(service.getId());
clearCache(service);
}
@Override
public void handlePostServiceChange(ServiceInfo service) {
clear(service);