Skip to content

Commit

Permalink
Reduce schema pull request volume (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
andybradshaw authored Dec 4, 2024
1 parent 27f0c7c commit b6d02f0
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 12 deletions.
81 changes: 72 additions & 9 deletions src/java/org/apache/cassandra/service/MigrationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.palantir.logsafe.SafeArg;
import com.palantir.tracing.CloseableTracer;

import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -62,10 +64,13 @@ public class MigrationManager

private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();

private static final ConcurrentHashMap<UUID, Set<InetAddress>> scheduledSchemaPulls = new ConcurrentHashMap<>();

public static final int MIGRATION_DELAY_IN_MS = 60000;
public static final int MAX_SCHEDULED_SCHEMA_PULL_REQUESTS = 3;

private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();

private MigrationManager() {}

public void register(MigrationListener listener)
Expand Down Expand Up @@ -103,7 +108,7 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA
return;
}

if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint, theirVersion))
{
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return;
Expand All @@ -113,7 +118,7 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
logger.debug("Submitting migration task for {}", endpoint);
submitMigrationTask(endpoint);
submitMigrationTask(endpoint, theirVersion);
}

if (onChange && Boolean.getBoolean("palantir_cassandra.unsafe_schema_migration"))
Expand All @@ -136,6 +141,7 @@ public void run()
if (epState == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
removeEndpointFromSchemaPullVersion(theirVersion, endpoint);
return;
}
VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
Expand All @@ -149,25 +155,52 @@ public void run()
endpoint,
theirVersion,
currentVersion);
removeEndpointFromSchemaPullVersion(theirVersion, endpoint);
return;
}

if (Schema.instance.getVersion().equals(currentVersion))
{
logger.debug("not submitting migration task for {} because our versions match", endpoint);
removeEndpointFromSchemaPullVersion(theirVersion, endpoint);
return;
}
logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version",
endpoint,
currentVersion,
Schema.instance.getVersion());
submitMigrationTask(endpoint);
logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version {}",
SafeArg.of("endpoint", endpoint),
SafeArg.of("endpointVersion", currentVersion),
SafeArg.of("schemaVersion", Schema.instance.getVersion()));
submitMigrationTask(endpoint, currentVersion);
}
};
addEndpointToSchemaPullVersion(theirVersion, endpoint);
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
}
}

public static void addEndpointToSchemaPullVersion(UUID version, InetAddress endpoint) {
scheduledSchemaPulls.compute(version, (v, s) -> {
if (s == null) {
s = new HashSet<>();
}
s.add(endpoint);
return s;
});
}

public static void removeEndpointFromSchemaPullVersion(UUID version, InetAddress endpoint) {
scheduledSchemaPulls.computeIfPresent(version, (v, s) -> {
logger.debug("Removing endpoint from scheduled schema pulls",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v),
SafeArg.of("scheduledPulls", s));
s.remove(endpoint);
if (!s.isEmpty()) {
return s;
}
return null;
});
}

private static Future<?> submitMigrationTask(InetAddress endpoint)
{
/*
Expand All @@ -177,17 +210,47 @@ private static Future<?> submitMigrationTask(InetAddress endpoint)
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
}

/**
*
*/
private static Future<?> submitMigrationTask(InetAddress endpoint, UUID theirVersion)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint, theirVersion));
}

public static boolean shouldPullSchemaFrom(InetAddress endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema)
* Don't request schema from fat clients
*/
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint);
}

public static boolean shouldPullSchemaFrom(InetAddress endpoint, UUID theirVersion)
{
/*
* Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema)
* Don't request schema from fat clients
* Don't request schema from bootstrapping nodes (?)
* Don't request schema if we have scheduled a pull request for that schema version
*/
Set<InetAddress> currentlyScheduledRequests = scheduledSchemaPulls.getOrDefault(theirVersion, Collections.emptySet());
boolean noScheduledRequests = currentlyScheduledRequests.size() < MAX_SCHEDULED_SCHEMA_PULL_REQUESTS
&& !currentlyScheduledRequests.contains(endpoint);
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint)
&& !Schema.emptyVersion.equals(theirVersion)
&& noScheduledRequests;
}

public static boolean isReadyForBootstrap()
{
return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
Expand Down
41 changes: 38 additions & 3 deletions src/java/org/apache/cassandra/service/MigrationTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallback;
Expand All @@ -40,10 +44,18 @@ class MigrationTask extends WrappedRunnable
private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);

private final InetAddress endpoint;
private final Optional<UUID> version;

MigrationTask(InetAddress endpoint)
{
this.endpoint = endpoint;
this.version = Optional.empty();
}

MigrationTask(InetAddress endpoint, UUID version)
{
this.endpoint = endpoint;
this.version = Optional.of(version);
}

public void runMayThrow() throws Exception
Expand All @@ -65,13 +77,14 @@ public void runMayThrow() throws Exception

MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);

IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
IAsyncCallbackWithFailure<Collection<Mutation>> cb = new IAsyncCallbackWithFailure<Collection<Mutation>>()
{
@Override
public void response(MessageIn<Collection<Mutation>> message)
{
try
{
logger.debug("Processing response to schema pull from endpoint", SafeArg.of("endpoint", endpoint));
LegacySchemaTables.mergeSchema(message.payload);
}
catch (IOException e)
Expand All @@ -82,13 +95,35 @@ public void response(MessageIn<Collection<Mutation>> message)
{
logger.error("Configuration exception merging remote schema", e);
}
finally
{
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Successfully processed response to schema pull",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v));
MigrationManager.removeEndpointFromSchemaPullVersion(v, endpoint);
});
}
}

@Override
public void onFailure(InetAddress from)
{
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Timed out waiting for response to schema pull",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v));
MigrationManager.removeEndpointFromSchemaPullVersion(v, endpoint);
});
}

public boolean isLatencyForSnitch()
{
return false;
}
};
MessagingService.instance().sendRR(message, endpoint, cb);
}
MessagingService.instance().sendRRWithFailure(message, endpoint, cb);
}
}
88 changes: 88 additions & 0 deletions test/unit/org/apache/cassandra/service/MigrationManagerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.net.MessagingService;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MigrationManagerTest
{
private static final UUID UUID_1 = UUID.randomUUID();
private static final UUID UUID_2 = UUID.randomUUID();

private static InetAddress HOST_1;
private static InetAddress HOST_2;
private static InetAddress HOST_3;
private static InetAddress HOST_4;

@BeforeClass
public static void setup() throws UnknownHostException
{
HOST_1 = InetAddress.getByName("10.0.0.1");
HOST_2 = InetAddress.getByName("10.0.0.2");
HOST_3 = InetAddress.getByName("10.0.0.3");
HOST_4 = InetAddress.getByName("10.0.0.4");
MessagingService.instance().setVersion(HOST_1, MessagingService.VERSION_22);
MessagingService.instance().setVersion(HOST_2, MessagingService.VERSION_22);
MessagingService.instance().setVersion(HOST_3, MessagingService.VERSION_22);
MessagingService.instance().setVersion(HOST_4, MessagingService.VERSION_22);
}

@Test
public void shouldPullSchemaIfNoOutstandingRequests()
{
assertTrue(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_1));
}

@Test
public void onlyRequestOncePerEndpointVersion() {
MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1);
assertFalse(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_1));
}

@Test
public void removalAllowsForPull() {
MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1);
MigrationManager.removeEndpointFromSchemaPullVersion(UUID_1, HOST_1);
assertTrue(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_1));
}

@Test
public void multipleSchemaVersionsDontInteract() {
MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1);
assertTrue(MigrationManager.shouldPullSchemaFrom(HOST_1, UUID_2));
}

@Test
public void noRequestOverMaxOutstanding() {
MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_1);
MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_2);
MigrationManager.addEndpointToSchemaPullVersion(UUID_1, HOST_3);
assertFalse(MigrationManager.shouldPullSchemaFrom(HOST_4, UUID_1));
}
}

0 comments on commit b6d02f0

Please sign in to comment.