Skip to content

Commit

Permalink
refactor schema agreement check and test
Browse files Browse the repository at this point in the history
  • Loading branch information
rhuffy committed Nov 27, 2024
1 parent 14a9998 commit 56b2b86
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 9 deletions.
72 changes: 72 additions & 0 deletions src/java/com/palantir/cassandra/utils/SchemaAgreementCheck.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.config.Schema;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;

public class SchemaAgreementCheck
{
private final Supplier<UUID> localSchemaVersionSupplier;
private final Supplier<Set<Map.Entry<InetAddress, EndpointState>>> endpointStatesSupplier;

public SchemaAgreementCheck()
{
this(Schema.instance::getVersion, Gossiper.instance::getEndpointStates);
}

@VisibleForTesting
SchemaAgreementCheck(Supplier<UUID> localSchemaVersionSupplier, Supplier<Set<Map.Entry<InetAddress, EndpointState>>> endpointStatesSupplier)
{
this.localSchemaVersionSupplier = localSchemaVersionSupplier;
this.endpointStatesSupplier = endpointStatesSupplier;
}

public boolean isSchemaInAgreement()
{
UUID localSchemaVersion = localSchemaVersionSupplier.get();
return endpointStatesSupplier.get().stream()
.map(Map.Entry::getValue)
.filter(value -> !isLeft(value))
.allMatch(value -> schemaIsEqualToLocalVersion(localSchemaVersion, value));
}

private boolean isLeft(EndpointState endpointState)
{
VersionedValue status = endpointState.getApplicationState(ApplicationState.STATUS);
return status == null || status.value.startsWith(VersionedValue.STATUS_LEFT);
}

private boolean schemaIsEqualToLocalVersion(UUID localSchemaVersion, EndpointState endpointState)
{
VersionedValue schema = endpointState.getApplicationState(ApplicationState.SCHEMA);
return schema != null && localSchemaVersion.toString().equals(schema.value);
}
}
12 changes: 3 additions & 9 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.util.concurrent.*;
import com.palantir.cassandra.db.BootstrappingSafetyException;
import com.palantir.cassandra.settings.LocalQuorumReadForSerialCasSetting;
import com.palantir.cassandra.utils.SchemaAgreementCheck;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.schema.LegacySchemaTables;
Expand Down Expand Up @@ -949,7 +950,8 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection<String>

// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
while (!MigrationManager.isReadyForBootstrap() || !isSchemaConsistent())
SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck();
while (!MigrationManager.isReadyForBootstrap() || !schemaAgreementCheck.isSchemaInAgreement())
{
setMode(Mode.JOINING, "waiting for schema information to complete", true);
logger.info(
Expand Down Expand Up @@ -1116,14 +1118,6 @@ private void joinTokenRing(int delay, boolean autoBootstrap, Collection<String>
}
}

private static boolean isSchemaConsistent()
{
String localSchemaVersion = Schema.instance.getVersion().toString();
return Gossiper.instance.getEndpointStates().stream()
.map(entry -> entry.getValue().getApplicationState(ApplicationState.SCHEMA).value)
.allMatch(localSchemaVersion::equals);
}

private void joinTokenRing(int delay) throws ConfigurationException
{
joinTokenRing(delay, DatabaseDescriptor.isAutoBootstrap(), DatabaseDescriptor.getInitialTokens());
Expand Down
140 changes: 140 additions & 0 deletions test/unit/com/palantir/cassandra/utils/SchemaAgreementCheckTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.EndpointStateFactory;
import org.apache.cassandra.gms.VersionedValue;

import static org.assertj.core.api.Assertions.assertThat;

public class SchemaAgreementCheckTest
{
private static final VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());

@Test
public void checkSchemaAgreement_passes()
{
UUID schema = UUID.randomUUID();
EndpointState state = createNormal(schema);

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state,
InetAddresses.forString("127.0.0.2"), state,
InetAddresses.forString("127.0.0.3"), state).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue();
}

@Test
public void checkSchemaAgreement_failsOnNullSchema()
{
UUID schema = UUID.randomUUID();
EndpointState state1 = createNormal(schema);

EndpointState state2 = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state2.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
state2.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse();
}

@Test
public void checkSchemaAgreement_failsOnIncorrectSchema()
{
UUID schema1 = UUID.randomUUID();
EndpointState state1 = createNormal(schema1);
EndpointState state2 = createNormal(UUID.randomUUID());

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isFalse();
}

@Test
public void checkSchemaAgreement_ignoresLeftNode()
{
UUID schema1 = UUID.randomUUID();
UUID schema2 = UUID.randomUUID();
EndpointState state1 = createNormal(schema1);
EndpointState state2 = createLeft(schema2);

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue();
}

@Test
public void checkSchemaAgreement_ignoresNullStatus()
{
UUID schema1 = UUID.randomUUID();
UUID schema2 = UUID.randomUUID();
EndpointState state1 = createNormal(schema1);

EndpointState state2 = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state2.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema2));
state2.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));

SchemaAgreementCheck schemaAgreementCheck = new SchemaAgreementCheck(() -> schema1,
() -> ImmutableMap.of(InetAddresses.forString("127.0.0.1"), state1,
InetAddresses.forString("127.0.0.2"), state1,
InetAddresses.forString("127.0.0.3"), state2).entrySet());
assertThat(schemaAgreementCheck.isSchemaInAgreement()).isTrue();
}

private static EndpointState createNormal(UUID schema)
{
EndpointState state = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
state.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema));
state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
return state;
}

private static EndpointState createLeft(UUID schema)
{
EndpointState state = EndpointStateFactory.create();
List<Token> tokens = Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken());
state.addApplicationState(ApplicationState.STATUS, valueFactory.left(tokens, 1000));
state.addApplicationState(ApplicationState.SCHEMA, valueFactory.schema(schema));
state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
return state;
}
}
27 changes: 27 additions & 0 deletions test/unit/org/apache/cassandra/gms/EndpointStateFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.gms;

public class EndpointStateFactory
{
public static EndpointState create()
{
return new EndpointState(new HeartBeatState(0));
}
}

0 comments on commit 56b2b86

Please sign in to comment.