Skip to content

Commit

Permalink
improve IT & fix bugs in SubscriptionExecutorServiceManager.java
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Jun 12, 2024
1 parent 668030d commit 4fa1b11
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant {

public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
public static final long AWAITILITY_AT_MOST_SECOND = 240L;
public static final long AWAITILITY_AT_MOST_SECOND = 600L;

public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,30 @@

import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;

abstract class AbstractSubscriptionDualIT {

protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;

@Rule public TestName testName = new TestName();

@Before
public void setUp() {
// set thread name
Thread.currentThread().setName(String.format("%s - main", testName.getMethodName()));

// set thread pools core size
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);

MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ private void pollMessagesAndCheck(
LOGGER.info("consumer {} exiting...", consumers.get(index));
}
},
consumers.get(index).toString());
String.format("%s - %s", testName.getMethodName(), consumers.get(index).toString()));
t.start();
threads.add(t);
}
Expand All @@ -1016,6 +1016,7 @@ private void pollMessagesAndCheck(
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollInSameThread()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

final class SubscriptionExecutorServiceManager {
public final class SubscriptionExecutorServiceManager {

private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);

private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L;

private static final String CONTROL_FLOW_EXECUTOR_NAME = "SubscriptionControlFlowExecutor";
private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
Expand Down Expand Up @@ -172,9 +172,9 @@ boolean isShutdown() {
}

void setCorePoolSize(final int corePoolSize) {
if (!isShutdown()) {
if (isShutdown()) {
synchronized (this) {
if (!isShutdown()) {
if (isShutdown()) {
this.corePoolSize = corePoolSize;
return;
}
Expand Down

0 comments on commit 4fa1b11

Please sign in to comment.