diff --git a/src/java/com/palantir/cassandra/utils/SchemaAgreementCheck.java b/src/java/com/palantir/cassandra/utils/SchemaAgreementCheck.java new file mode 100644 index 0000000000..2050a2a240 --- /dev/null +++ b/src/java/com/palantir/cassandra/utils/SchemaAgreementCheck.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.cassandra.utils; + +import java.net.InetAddress; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; + +public class SchemaAgreementCheck +{ + private final Supplier localSchemaVersionSupplier; + private final Supplier>> endpointStatesSupplier; + + public SchemaAgreementCheck() + { + this(Schema.instance::getVersion, Gossiper.instance::getEndpointStates); + } + + @VisibleForTesting + SchemaAgreementCheck(Supplier localSchemaVersionSupplier, Supplier>> endpointStatesSupplier) + { + this.localSchemaVersionSupplier = localSchemaVersionSupplier; + this.endpointStatesSupplier = endpointStatesSupplier; + } + + public boolean isSchemaInAgreement() + { + UUID localSchemaVersion = localSchemaVersionSupplier.get(); + return endpointStatesSupplier.get().stream() + .map(Map.Entry::getValue) + .filter(value -> !isLeft(value)) + .allMatch(value -> schemaIsEqualToLocalVersion(localSchemaVersion, value)); + } + + private boolean isLeft(EndpointState endpointState) + { + VersionedValue status = endpointState.getApplicationState(ApplicationState.STATUS); + return status == null || status.value.startsWith(VersionedValue.STATUS_LEFT); + } + + private boolean schemaIsEqualToLocalVersion(UUID localSchemaVersion, EndpointState endpointState) + { + VersionedValue schema = endpointState.getApplicationState(ApplicationState.SCHEMA); + return schema != null && localSchemaVersion.toString().equals(schema.value); + } +} diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 608f853abd..baf1c779c8 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -42,6 +42,7 @@ import com.google.common.util.concurrent.*; import com.palantir.cassandra.db.BootstrappingSafetyException; import com.palantir.cassandra.settings.LocalQuorumReadForSerialCasSetting; +import com.palantir.cassandra.utils.SchemaAgreementCheck; import com.palantir.logsafe.Safe; import com.palantir.logsafe.SafeArg; import org.apache.cassandra.schema.LegacySchemaTables; @@ -949,7 +950,8 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection // if our schema hasn't matched yet, keep sleeping until it does // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) - while (!MigrationManager.isReadyForBootstrap() || !isSchemaConsistent()) + SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(); + while (!MigrationManager.isReadyForBootstrap() || !schemaAgreementCheck.isSchemaInAgreement()) { setMode(Mode.JOINING, "waiting for schema information to complete", true); logger.info( @@ -1116,14 +1118,6 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection } } - private static boolean isSchemaConsistent() - { - String localSchemaVersion = Schema.instance.getVersion().toString(); - return Gossiper.instance.getEndpointStates().stream() - .map(entry -> entry.getValue().getApplicationState(ApplicationState.SCHEMA).value) - .allMatch(localSchemaVersion::equals); - } - private void joinTokenRing(int delay) throws ConfigurationException { joinTokenRing(delay, DatabaseDescriptor.isAutoBootstrap(), DatabaseDescriptor.getInitialTokens()); diff --git a/test/unit/com/palantir/cassandra/utils/SchemaAgreementCheckTest.java b/test/unit/com/palantir/cassandra/utils/SchemaAgreementCheckTest.java new file mode 100644 index 0000000000..63fe86dad7 --- /dev/null +++ b/test/unit/com/palantir/cassandra/utils/SchemaAgreementCheckTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.cassandra.utils; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import com.google.common.collect.ImmutableMap; +import com.google.common.net.InetAddresses; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.EndpointStateFactory; +import org.apache.cassandra.gms.VersionedValue; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SchemaAgreementCheckTest +{ + private static final VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + + @Test + public void checkSchemaAgreement_passes() + { + UUID schema = UUID.randomUUID(); + EndpointState state = createNormal(schema); + + SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema, + () -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state, + InetAddresses.forString("127.0.0.2"), state, + InetAddresses.forString("127.0.0.3"), state).entrySet()); + assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue(); + } + + @Test + public void checkSchemaAgreement_failsOnNullSchema() + { + UUID schema = UUID.randomUUID(); + EndpointState state1 = createNormal(schema); + + EndpointState state2 = EndpointStateFactory.create(); + List tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()); + state2.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens)); + state2.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + + SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema, + () -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1, + InetAddresses.forString("127.0.0.2"), state1, + InetAddresses.forString("127.0.0.3"), state2).entrySet()); + assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse(); + } + + @Test + public void checkSchemaAgreement_failsOnIncorrectSchema() + { + UUID schema1 = UUID.randomUUID(); + EndpointState state1 = createNormal(schema1); + EndpointState state2 = createNormal(UUID.randomUUID()); + + SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1, + () -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1, + InetAddresses.forString("127.0.0.2"), state1, + InetAddresses.forString("127.0.0.3"), state2).entrySet()); + assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse(); + } + + @Test + public void checkSchemaAgreement_ignoresLeftNode() + { + UUID schema1 = UUID.randomUUID(); + UUID schema2 = UUID.randomUUID(); + EndpointState state1 = createNormal(schema1); + EndpointState state2 = createLeft(schema2); + + SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1, + () -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1, + InetAddresses.forString("127.0.0.2"), state1, + InetAddresses.forString("127.0.0.3"), state2).entrySet()); + assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue(); + } + + @Test + public void checkSchemaAgreement_ignoresNullStatus() + { + UUID schema1 = UUID.randomUUID(); + UUID schema2 = UUID.randomUUID(); + EndpointState state1 = createNormal(schema1); + + EndpointState state2 = EndpointStateFactory.create(); + List tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()); + state2.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema2)); + state2.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + + SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1, + () -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1, + InetAddresses.forString("127.0.0.2"), state1, + InetAddresses.forString("127.0.0.3"), state2).entrySet()); + assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue(); + } + + private static EndpointState createNormal(UUID schema) + { + EndpointState state = EndpointStateFactory.create(); + List tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()); + state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens)); + state.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema)); + state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + return state; + } + + private static EndpointState createLeft(UUID schema) + { + EndpointState state = EndpointStateFactory.create(); + List tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()); + state.addApplicationState(ApplicationState.STATUS, valueFactory.left(tokens, 1000)); + state.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema)); + state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + return state; + } +} diff --git a/test/unit/org/apache/cassandra/gms/EndpointStateFactory.java b/test/unit/org/apache/cassandra/gms/EndpointStateFactory.java new file mode 100644 index 0000000000..907826fcaf --- /dev/null +++ b/test/unit/org/apache/cassandra/gms/EndpointStateFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +public class EndpointStateFactory +{ + public static EndpointState create() + { + return new EndpointState(new HeartBeatState(0)); + } +}