diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index a81beb91ba..f0b9956fcb 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -25,6 +25,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.*; + +import com.palantir.logsafe.SafeArg; import com.palantir.tracing.CloseableTracer; import java.lang.management.ManagementFactory; @@ -62,10 +64,13 @@ public class MigrationManager private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + private static final ConcurrentHashMap> scheduledSchemaPulls = new ConcurrentHashMap<>(); + public static final int MIGRATION_DELAY_IN_MS = 60000; + public static final int MAX_SCHEDULED_SCHEMA_PULL_REQUESTS = 3; private final List listeners = new CopyOnWriteArrayList<>(); - + private MigrationManager() {} public void register(MigrationListener listener) @@ -103,7 +108,7 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA return; } - if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) + if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint, theirVersion)) { logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); return; @@ -113,7 +118,7 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA { // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately logger.debug("Submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); + submitMigrationTask(endpoint, theirVersion); } if (onChange && Boolean.getBoolean("palantir_cassandra.unsafe_schema_migration")) @@ -136,6 +141,7 @@ public void run() if (epState == null) { logger.debug("epState vanished for {}, not submitting migration task", endpoint); + removeEndpointFromSchemaPullVersion(theirVersion, endpoint); return; } VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); @@ -149,25 +155,52 @@ public void run() endpoint, theirVersion, currentVersion); + removeEndpointFromSchemaPullVersion(theirVersion, endpoint); return; } if (Schema.instance.getVersion().equals(currentVersion)) { logger.debug("not submitting migration task for {} because our versions match", endpoint); + removeEndpointFromSchemaPullVersion(theirVersion, endpoint); return; } - logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version", - endpoint, - currentVersion, - Schema.instance.getVersion()); - submitMigrationTask(endpoint); + logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version {}", + SafeArg.of("endpoint", endpoint), + SafeArg.of("endpointVersion", currentVersion), + SafeArg.of("schemaVersion", Schema.instance.getVersion())); + submitMigrationTask(endpoint, currentVersion); } }; + addEndpointToSchemaPullVersion(theirVersion, endpoint); ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); } } + public static void addEndpointToSchemaPullVersion(UUID version, InetAddress endpoint) { + scheduledSchemaPulls.compute(version, (v, s) -> { + if (s == null) { + s = new HashSet<>(); + } + s.add(endpoint); + return s; + }); + } + + public static void removeEndpointFromSchemaPullVersion(UUID version, InetAddress endpoint) { + scheduledSchemaPulls.computeIfPresent(version, (v, s) -> { + logger.debug("Removing endpoint from scheduled schema pulls", + SafeArg.of("endpoint", endpoint), + SafeArg.of("schemaVersion", v), + SafeArg.of("scheduledPulls", s)); + s.remove(endpoint); + if (!s.isEmpty()) { + return s; + } + return null; + }); + } + private static Future submitMigrationTask(InetAddress endpoint) { /* @@ -177,10 +210,22 @@ private static Future submitMigrationTask(InetAddress endpoint) return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); } + /** + * + */ + private static Future submitMigrationTask(InetAddress endpoint, UUID theirVersion) + { + /* + * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are + * running in the gossip stage. + */ + return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint, theirVersion)); + } + public static boolean shouldPullSchemaFrom(InetAddress endpoint) { /* - * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) + * Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema) * Don't request schema from fat clients */ return MessagingService.instance().knowsVersion(endpoint) @@ -188,6 +233,24 @@ public static boolean shouldPullSchemaFrom(InetAddress endpoint) && !Gossiper.instance.isGossipOnlyMember(endpoint); } + public static boolean shouldPullSchemaFrom(InetAddress endpoint, UUID theirVersion) + { + /* + * Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema) + * Don't request schema from fat clients + * Don't request schema from bootstrapping nodes (?) + * Don't request schema if we have scheduled a pull request for that schema version + */ + Set currentlyScheduledRequests = scheduledSchemaPulls.getOrDefault(theirVersion, Collections.emptySet()); + boolean noScheduledRequests = currentlyScheduledRequests.size() < MAX_SCHEDULED_SCHEMA_PULL_REQUESTS + && !currentlyScheduledRequests.contains(endpoint); + return MessagingService.instance().knowsVersion(endpoint) + && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version + && !Gossiper.instance.isGossipOnlyMember(endpoint) + && !Schema.emptyVersion.equals(theirVersion) + && noScheduledRequests; + } + public static boolean isReadyForBootstrap() { return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java index b065d9093e..74685829b6 100644 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ b/src/java/org/apache/cassandra/service/MigrationTask.java @@ -20,12 +20,16 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; +import java.util.Optional; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.palantir.logsafe.SafeArg; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallback; @@ -40,10 +44,18 @@ class MigrationTask extends WrappedRunnable private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); private final InetAddress endpoint; + private final Optional version; MigrationTask(InetAddress endpoint) { this.endpoint = endpoint; + this.version = Optional.empty(); + } + + MigrationTask(InetAddress endpoint, UUID version) + { + this.endpoint = endpoint; + this.version = Optional.of(version); } public void runMayThrow() throws Exception @@ -65,13 +77,14 @@ public void runMayThrow() throws Exception MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); - IAsyncCallback> cb = new IAsyncCallback>() + IAsyncCallbackWithFailure> cb = new IAsyncCallbackWithFailure>() { @Override public void response(MessageIn> message) { try { + logger.debug("Processing response to schema pull from endpoint", SafeArg.of("endpoint", endpoint)); LegacySchemaTables.mergeSchema(message.payload); } catch (IOException e) @@ -82,6 +95,28 @@ public void response(MessageIn> message) { logger.error("Configuration exception merging remote schema", e); } + finally + { + // always attempt to clean up our outstanding schema pull request if created with a version + version.ifPresent(v -> { + logger.debug("Successfully processed response to schema pull", + SafeArg.of("endpoint", endpoint), + SafeArg.of("schemaVersion", v)); + MigrationManager.removeEndpointFromSchemaPullVersion(v, endpoint); + }); + } + } + + @Override + public void onFailure(InetAddress from) + { + // always attempt to clean up our outstanding schema pull request if created with a version + version.ifPresent(v -> { + logger.debug("Timed out waiting for response to schema pull", + SafeArg.of("endpoint", endpoint), + SafeArg.of("schemaVersion", v)); + MigrationManager.removeEndpointFromSchemaPullVersion(v, endpoint); + }); } public boolean isLatencyForSnitch() @@ -89,6 +124,6 @@ public boolean isLatencyForSnitch() return false; } }; - MessagingService.instance().sendRR(message, endpoint, cb); - } + MessagingService.instance().sendRRWithFailure(message, endpoint, cb); + } } diff --git a/test/unit/org/apache/cassandra/service/MigrationManagerTest.java b/test/unit/org/apache/cassandra/service/MigrationManagerTest.java new file mode 100644 index 0000000000..50ecf962a7 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/MigrationManagerTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.net.MessagingService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MigrationManagerTest +{ + private static final UUID UUID_1 = UUID.randomUUID(); + private static final UUID UUID_2 = UUID.randomUUID(); + + private static InetAddress HOST_1; + private static InetAddress HOST_2; + private static InetAddress HOST_3; + private static InetAddress HOST_4; + + @BeforeClass + public static void setup() throws UnknownHostException + { + HOST_1 = InetAddress.getByName("10.0.0.1"); + HOST_2 = InetAddress.getByName("10.0.0.2"); + HOST_3 = InetAddress.getByName("10.0.0.3"); + HOST_4 = InetAddress.getByName("10.0.0.4"); + MessagingService.instance().setVersion(HOST_1, MessagingService.VERSION_22); + MessagingService.instance().setVersion(HOST_2, MessagingService.VERSION_22); + MessagingService.instance().setVersion(HOST_3, MessagingService.VERSION_22); + MessagingService.instance().setVersion(HOST_4, MessagingService.VERSION_22); + } + + @Test + public void shouldPullSchemaIfNoOutstandingRequests() + { + assertTrue(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_1)); + } + + @Test + public void onlyRequestOncePerEndpointVersion() { + MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1); + assertFalse(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_1)); + } + + @Test + public void removalAllowsForPull() { + MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1); + MigrationManager.removeEndpointFromSchemaPullVersion(UUID_1, HOST_1); + assertTrue(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_1)); + } + + @Test + public void multipleSchemaVersionsDontInteract() { + MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1); + assertTrue(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_2)); + } + + @Test + public void noRequestOverMaxOutstanding() { + MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1); + MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_2); + MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_3); + assertFalse(MigrationManager.shouldPullSchemaFrom(HOST_4, UUID_1)); + } +}