Unverified Commit 6d867eac authored by NielsCharlier's avatar NielsCharlier Committed by GitHub
Browse files

Merge pull request #3366 from NielsCharlier/2.15.x

 GEOS-9096 jdbcconfig/hazelcast thread safety fixes  (backport)
parents 4ab52dc4 dfc32cc5
......@@ -96,7 +96,8 @@ public class ConfigChangeEvent extends Event {
public enum Type {
ADD,
REMOVE,
MODIFY
MODIFY,
POST_MODIFY
}
/** id of object */
......
......@@ -38,10 +38,12 @@ import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogListener;
import org.geoserver.catalog.event.CatalogModifyEvent;
import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent;
import org.geoserver.catalog.event.impl.CatalogAddEventImpl;
import org.geoserver.catalog.event.impl.CatalogEventImpl;
import org.geoserver.catalog.event.impl.CatalogModifyEventImpl;
import org.geoserver.catalog.event.impl.CatalogPostModifyEventImpl;
import org.geoserver.catalog.event.impl.CatalogRemoveEventImpl;
import org.geoserver.cluster.ConfigChangeEvent;
......@@ -202,6 +204,13 @@ public class EventHzSynchronizer extends HzSynchronizer {
evt = new CatalogAddEventImpl();
break;
case MODIFY:
subj = getCatalogInfo(cat, id, clazz);
notifyMethod =
CatalogListener.class.getMethod(
"handleModifyEvent", CatalogModifyEvent.class);
evt = new CatalogModifyEventImpl();
break;
case POST_MODIFY:
subj = getCatalogInfo(cat, id, clazz);
notifyMethod =
CatalogListener.class.getMethod(
......@@ -245,7 +254,11 @@ public class EventHzSynchronizer extends HzSynchronizer {
for (CatalogListener l : ImmutableList.copyOf(cat.getListeners())) {
// Don't notify self otherwise the event bounces back out into the
// cluster.
if (l != this && isStarted()) {
if (l != this
&& isStarted()
&& // HACK-HACK-HACK -- prevent infinite loop with update sequence listener
!"org.geoserver.config.UpdateSequenceListener"
.equals(l.getClass().getCanonicalName())) {
notifyMethod.invoke(l, evt);
}
}
......@@ -293,7 +306,12 @@ public class EventHzSynchronizer extends HzSynchronizer {
for (ConfigurationListener l : gs.getListeners()) {
try {
if (l != this) notifyMethod.invoke(l, subj);
if (l != this
&& // HACK-HACK-HACK -- prevent infinite loop with update sequence listener
!"org.geoserver.config.UpdateSequenceListener"
.equals(l.getClass().getCanonicalName())) {
notifyMethod.invoke(l, subj);
}
} catch (Exception ex) {
LOGGER.log(
Level.WARNING, format("%s - Event dispatch failed: %s", nodeId(), ce), ex);
......
......@@ -29,6 +29,7 @@ import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogEvent;
import org.geoserver.catalog.event.CatalogModifyEvent;
import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent;
import org.geoserver.cluster.ConfigChangeEvent;
......@@ -184,10 +185,15 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer
}
@Override
public void handlePostModifyEvent(CatalogPostModifyEvent event) throws CatalogException {
public void handleModifyEvent(CatalogModifyEvent event) throws CatalogException {
dispatch(newChangeEvent(event, Type.MODIFY));
}
@Override
public void handlePostModifyEvent(CatalogPostModifyEvent event) throws CatalogException {
dispatch(newChangeEvent(event, Type.POST_MODIFY));
}
@Override
public void handleRemoveEvent(CatalogRemoveEvent event) throws CatalogException {
dispatch(newChangeEvent(event, Type.REMOVE));
......@@ -207,10 +213,24 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer
}
@Override
public void handlePostServiceChange(ServiceInfo service) {
public void handlePostGlobalChange(GeoServerInfo global) {
dispatch(newChangeEvent(global, Type.POST_MODIFY));
}
@Override
public void handleServiceChange(
ServiceInfo service,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
dispatch(newChangeEvent(service, Type.MODIFY));
}
@Override
public void handlePostServiceChange(ServiceInfo service) {
dispatch(newChangeEvent(service, Type.POST_MODIFY));
}
@Override
public void handleServiceRemove(ServiceInfo service) {
dispatch(newChangeEvent(service, Type.REMOVE));
......@@ -221,6 +241,19 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer
dispatch(newChangeEvent(settings, Type.ADD));
}
@Override
public void handleSettingsModified(
SettingsInfo settings,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// optimization for update sequence
if (propertyNames.size() == 1 && propertyNames.contains("updateSequence")) {
return;
}
dispatch(newChangeEvent(settings, Type.MODIFY));
}
@Override
public void handleSettingsPostModified(SettingsInfo settings) {
dispatch(newChangeEvent(settings, Type.MODIFY));
......
......@@ -28,6 +28,7 @@ import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogEvent;
import org.geoserver.catalog.event.CatalogListener;
import org.geoserver.catalog.event.CatalogModifyEvent;
import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent;
import org.geoserver.catalog.impl.DataStoreInfoImpl;
......@@ -307,6 +308,9 @@ public class EventHzSynchronizerRecvTest extends HzSynchronizerRecvTest {
catListener.handleAddEvent((CatalogAddEvent) catEvent(info));
break;
case MODIFY:
catListener.handleModifyEvent((CatalogModifyEvent) catEvent(info));
break;
case POST_MODIFY:
catListener.handlePostModifyEvent((CatalogPostModifyEvent) catEvent(info));
break;
case REMOVE:
......
......@@ -61,6 +61,13 @@ public abstract class HzSynchronizerSendTest extends HzSynchronizerTest {
expectEvent(
localAddress, layerName, layerWorkspace, layerId, LayerInfo.class, Type.MODIFY);
expectEvent(
localAddress,
layerName,
layerWorkspace,
layerId,
LayerInfo.class,
Type.POST_MODIFY);
}
replay(info);
{
......@@ -156,6 +163,13 @@ public abstract class HzSynchronizerSendTest extends HzSynchronizerTest {
globalId,
GeoServerInfo.class,
Type.MODIFY);
expectEvent(
localAddress,
globalName,
globalWorkspace,
globalId,
GeoServerInfo.class,
Type.POST_MODIFY);
}
replay(info);
{
......
......@@ -17,6 +17,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.geoserver.catalog.Catalog;
......@@ -40,6 +42,7 @@ import org.geoserver.catalog.util.CloseableIterator;
import org.geoserver.jdbcconfig.internal.ConfigDatabase;
import org.geoserver.ows.util.OwsUtils;
import org.geotools.util.Utilities;
import org.geotools.util.logging.Logging;
import org.opengis.filter.Filter;
import org.opengis.filter.sort.SortBy;
import org.springframework.util.Assert;
......@@ -48,6 +51,8 @@ import org.springframework.util.Assert;
@ParametersAreNonnullByDefault
public class JDBCCatalogFacade implements CatalogFacade {
public static final Logger LOGGER = Logging.getLogger(JDBCCatalogFacade.class);
private final ConfigDatabase db;
public JDBCCatalogFacade(final ConfigDatabase db) {
......@@ -873,9 +878,12 @@ public class JDBCCatalogFacade implements CatalogFacade {
protected <T extends CatalogInfo> T commitProxy(T object) {
// get the real object
T real = db.save(object);
return real;
try {
return db.save(object);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed to save object " + object.getId(), e);
return null;
}
}
protected void afterSaved(
......
......@@ -46,7 +46,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
......@@ -55,22 +59,14 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.wicket.util.string.Strings;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.CatalogVisitorAdapter;
import org.geoserver.catalog.CoverageInfo;
import org.geoserver.catalog.CoverageStoreInfo;
import org.geoserver.catalog.DataStoreInfo;
import org.geoserver.catalog.FeatureTypeInfo;
import org.geoserver.catalog.Info;
import org.geoserver.catalog.LayerGroupInfo;
import org.geoserver.catalog.LayerInfo;
import org.geoserver.catalog.MetadataMap;
import org.geoserver.catalog.NamespaceInfo;
import org.geoserver.catalog.Predicates;
import org.geoserver.catalog.PublishedInfo;
import org.geoserver.catalog.ResourceInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WMSLayerInfo;
import org.geoserver.catalog.WMSStoreInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogListener;
......@@ -124,6 +120,8 @@ public class ConfigDatabase {
public static final Logger LOGGER = Logging.getLogger(ConfigDatabase.class);
private static final int LOCK_TIMEOUT_SECONDS = 60;
private Dialect dialect;
private DataSource dataSource;
......@@ -148,9 +146,10 @@ public class ConfigDatabase {
private InfoRowMapper<Info> configRowMapper;
private CatalogClearingListener catalogListener;
private ConfigClearingListener configListener;
private ConcurrentMap<String, Semaphore> locks;
/** Protected default constructor needed by spring-jdbc instrumentation */
protected ConfigDatabase() {
//
......@@ -180,6 +179,7 @@ public class ConfigDatabase {
cache = cacheProvider.getCache("catalog");
identityCache = cacheProvider.getCache("catalogNames");
serviceCache = cacheProvider.getCache("services");
locks = new ConcurrentHashMap<>();
}
private Dialect dialect() {
......@@ -315,7 +315,6 @@ public class ConfigDatabase {
final Filter simplifiedFilter =
(Filter) sqlBuilder.getSupportedFilter().accept(filterSimplifier, null);
if (simplifiedFilter instanceof PropertyIsEqualTo) {
String id = null;
PropertyIsEqualTo isEqualTo = (PropertyIsEqualTo) simplifiedFilter;
if (isEqualTo.getExpression1() instanceof PropertyName
&& isEqualTo.getExpression2() instanceof Literal
......@@ -535,17 +534,6 @@ public class ConfigDatabase {
}
addAttributes(info, key);
cache.put(id, info);
for (InfoIdentity identity : InfoIdentities.get().getIdentities(info)) {
if (identityCache.getIfPresent(identity) == null) {
identityCache.put(identity, id);
} else {
// not a unique identity
identityCache.invalidate(identity);
}
}
return getById(id, interf);
}
......@@ -761,12 +749,6 @@ public class ConfigDatabase {
return;
}
if (info instanceof ServiceInfo) {
disposeServiceCache();
}
identityCache.invalidateAll(InfoIdentities.get().getIdentities(info));
cache.invalidate(info.getId());
String deleteObject = "delete from object where id = :id";
String deleteRelatedProperties = "delete from object_property where related_oid = :oid";
......@@ -805,12 +787,6 @@ public class ConfigDatabase {
final Info oldObject = (Info) modificationProxy.getProxyObject();
if (info instanceof ServiceInfo) {
disposeServiceCache();
}
identityCache.invalidateAll(InfoIdentities.get().getIdentities(oldObject));
cache.invalidate(id);
// get changed properties before h.commit()s
final Iterable<Property> changedProperties = dbMappings.changedProperties(oldObject, info);
......@@ -849,7 +825,6 @@ public class ConfigDatabase {
updateQueryableProperties(oldObject, objectId, changedProperties);
cache.invalidate(id);
Class<T> clazz = ClassMappings.fromImpl(oldObject.getClass()).getInterface();
// / <HACK>
......@@ -878,15 +853,6 @@ public class ConfigDatabase {
}
// / </HACK>
for (InfoIdentity identity : InfoIdentities.get().getIdentities(info)) {
if (identityCache.getIfPresent(identity) == null) {
identityCache.put(identity, id);
} else {
// not a unique identity
identityCache.invalidate(identity);
}
}
return getById(id, clazz);
}
......@@ -1040,7 +1006,29 @@ public class ConfigDatabase {
valueLoader = new ConfigLoader(id);
}
info = cache.get(id, valueLoader);
Semaphore lock = locks.computeIfAbsent(id, x -> new Semaphore(1));
info = cache.getIfPresent(id);
if (info == null) {
// we try the write lock
if (lock.tryAcquire()) {
try {
info = cache.get(id, valueLoader);
} finally {
lock.release();
}
}
}
if (info == null) {
// if the write lock was locked, we fall back
// to a read-only method
try {
info = valueLoader.call();
} catch (Exception e) {
throw new ExecutionException(e);
}
}
} catch (CacheLoader.InvalidCacheLoadException notFound) {
return null;
......@@ -1477,7 +1465,7 @@ public class ConfigDatabase {
return !propertyTypes.isEmpty();
}
void clear(Info info) {
void clearCache(Info info) {
if (info instanceof ServiceInfo) {
// need to figure out how to remove only the relevant cache
// entries for the service info, like with InfoIdenties below,
......@@ -1488,6 +1476,27 @@ public class ConfigDatabase {
cache.invalidate(info.getId());
}
void clearCacheIfPresent(String id) {
Info info = cache.getIfPresent(id);
if (info != null) {
clearCache(info);
}
}
void updateCache(Info info) {
info = ModificationProxy.unwrap(info);
cache.put(info.getId(), info);
List<InfoIdentity> identities = InfoIdentities.get().getIdentities(info);
for (InfoIdentity identity : identities) {
if (identityCache.getIfPresent(identity) == null) {
identityCache.put(identity, info.getId());
} else {
// not a unique identity
identityCache.invalidate(identity);
}
}
}
public <T extends Info> T get(Class<T> type, Filter filter) throws IllegalArgumentException {
CloseableIterator<T> it =
......@@ -1526,110 +1535,144 @@ public class ConfigDatabase {
return result;
}
/** Listens to catalog events clearing cache entires when resources are modified. */
// Copied from org.geoserver.catalog.ResourcePool
public class CatalogClearingListener extends CatalogVisitorAdapter implements CatalogListener {
public void handleAddEvent(CatalogAddEvent event) {}
public void handleModifyEvent(CatalogModifyEvent event) {}
public void handlePostModifyEvent(CatalogPostModifyEvent event) {
event.getSource().accept(this);
}
private void acquireWriteLock(String id) {
Semaphore lock = locks.computeIfAbsent(id, x -> new Semaphore(1));
try {
if (!lock.tryAcquire(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOGGER.severe(
"Time-out waiting for lock on "
+ id
+ ", assuming it was abandoned and moving on. This shouldn't happen!");
}
} catch (InterruptedException e) {
public void handleRemoveEvent(CatalogRemoveEvent event) {
event.getSource().accept(this);
}
}
public void reloaded() {}
private void releaseWriteLock(String id) {
locks.get(id).release();
}
@Override
public void visit(DataStoreInfo dataStore) {
clear(dataStore);
}
/** Listens to catalog events clearing cache entires when resources are modified. */
// Copied from org.geoserver.catalog.ResourcePool
public class CatalogClearingListener implements CatalogListener {
@Override
public void visit(CoverageStoreInfo coverageStore) {
clear(coverageStore);
public void handleAddEvent(CatalogAddEvent event) {
updateCache(event.getSource());
}
@Override
public void visit(FeatureTypeInfo featureType) {
clear(featureType);
public void handleModifyEvent(CatalogModifyEvent event) {
// make sure that cache is not refilled before commit
if (event.getSource() instanceof ResourceInfo) {
String liId =
getIdByIdentity(LayerInfo.class, "resource.id", event.getSource().getId());
acquireWriteLock(liId);
clearCacheIfPresent(liId);
}
acquireWriteLock(event.getSource().getId());
clearCache(event.getSource());
}
@Override
public void visit(WMSStoreInfo wmsStore) {
clear(wmsStore);
public void handlePostModifyEvent(CatalogPostModifyEvent event) {
updateCache(event.getSource());
releaseWriteLock(event.getSource().getId());
if (event.getSource() instanceof ResourceInfo) {
String liId =
getIdByIdentity(LayerInfo.class, "resource.id", event.getSource().getId());
releaseWriteLock(liId);
}
}
@Override
public void visit(StyleInfo style) {
clear(style);
public void handleRemoveEvent(CatalogRemoveEvent event) {
clearCache(event.getSource());
}
@Override
public void visit(WorkspaceInfo workspace) {
clear(workspace);
}
public void reloaded() {}
}
/** Listens to configuration events clearing cache entires when resources are modified. */
public class ConfigClearingListener extends ConfigurationListenerAdapter {
@Override
public void visit(NamespaceInfo workspace) {
clear(workspace);
public void handleSettingsRemoved(SettingsInfo settings) {
clearCache(settings);
}
@Override
public void visit(CoverageInfo coverage) {
clear(coverage);
public void handleServiceRemove(ServiceInfo service) {
clearCache(service);
}
@Override
public void visit(LayerInfo layer) {
clear(layer);
public void handleGlobalChange(
GeoServerInfo global,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// make sure that cache is not refilled before commit
acquireWriteLock(global.getId());
clearCache(global);
}
@Override
public void visit(LayerGroupInfo layerGroup) {
clear(layerGroup);
public void handlePostGlobalChange(GeoServerInfo global) {
updateCache(global);
releaseWriteLock(global.getId());
}
@Override
public void visit(WMSLayerInfo wmsLayerInfoImpl) {
clear(wmsLayerInfoImpl);
public void handleSettingsModified(
SettingsInfo settings,
List<String> propertyNames,
List<Object> oldValues,
List<Object> newValues) {
// make sure that cache is not refilled before commit
acquireWriteLock(settings.getId());
clearCache(settings);