Skip to content

Commit

Permalink
[fix] [broker] Topics failed to delete after remove cluster from repl…
Browse files Browse the repository at this point in the history
…icated clusters set and caused OOM (apache#23360)

(cherry picked from commit d9bc7af)
(cherry picked from commit 187e6d4)
  • Loading branch information
poorbarcode authored and dlg99 committed Nov 11, 2024
1 parent 1b8f873 commit 9059a6c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,16 @@ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.completedFuture(null);
}
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
TopicName changeEvents = NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject());
return pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo -> {
// If the system topic named "__change_events" has been deleted, it means all the data in the topic have
// been deleted, so we do not need to delete the message that we want to delete again.
if (!topicExistsInfo.isExists()) {
log.info("Skip delete topic-level policies because {} has been removed before", changeEvents);
return CompletableFuture.completedFuture(null);
}
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ public NamespaceEventsSystemTopicFactory(PulsarClient client) {
}

public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName,
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
TopicName topicName = getEventsTopicName(namespaceName);
log.info("Create topic policies system topic client {}", topicName.toString());
return new TopicPoliciesSystemTopicClient(client, topicName);
}

public static TopicName getEventsTopicName(NamespaceName namespaceName) {
return TopicName.get(TopicDomain.persistent.value(), namespaceName,
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
}

public <T> TransactionBufferSnapshotBaseSystemTopicClient<T> createTransactionBufferSystemTopicClient(
TopicName systemTopicName, SystemTopicTxnBufferSnapshotService<T>
systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -173,4 +181,42 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro
public void testReplicationCountMetrics() throws Exception {
super.testReplicationCountMetrics();
}

@Test
public void testRemoveCluster() throws Exception {
// Initialize.
final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8";
final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
final String topicChangeEvents = "persistent://" + ns1 + "/__change_events";
admin1.namespaces().createNamespace(ns1);
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
admin1.topics().createNonPartitionedTopic(topic);

// Wait for loading topic up.
Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
Awaitility.await().untilAsserted(() -> {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps
= pulsar1.getBrokerService().getTopics();
assertTrue(tps.containsKey(topic));
assertTrue(tps.containsKey(topicChangeEvents));
});

// The topics under the namespace of the cluster-1 will be deleted.
// Verify the result.
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2)));
Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps
= pulsar1.getBrokerService().getTopics();
assertFalse(tps.containsKey(topic));
assertFalse(tps.containsKey(topicChangeEvents));
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join().isExists());
assertFalse(pulsar1.getNamespaceService()
.checkTopicExists(TopicName.get(topicChangeEvents)).join().isExists());
});

// cleanup.
p.close();
admin2.topics().delete(topic);
admin2.namespaces().deleteNamespace(ns1);
}
}

0 comments on commit 9059a6c

Please sign in to comment.