Skip to content

Commit

Permalink
feature: improved diagnostics metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
YarikRevich committed Jun 1, 2024
1 parent 11568d1 commit 11fb163
Show file tree
Hide file tree
Showing 15 changed files with 449 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.repoachiever.model.LocationsUnit;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.util.List;

Expand All @@ -17,6 +18,12 @@ public class ClusterAllocationDto {
*/
private String name;

/**
* Represents lock state of RepoAchiever Cluster allocation.
*/
@Setter
private Boolean locked;

/**
* Represents workspace unit key used to target RepoAchiever Cluster results.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,37 @@ public void performLogsTransfer(String name, String message) throws RemoteExcept
logger.info(String.format("Transferred logs(instance: %s): %s", name, message));
}

/**
* @see IApiServerCommunicationService
*/
@Override
public void performDownloadTelemetryIncrease() throws RemoteException {
telemetryService.increaseClusterDownloadAmount();
}

/**
* @see IApiServerCommunicationService
*/
@Override
public void performDownloadTelemetryDecrease() throws RemoteException {
telemetryService.decreaseClusterDownloadAmount();
}

/**
* @see IApiServerCommunicationService
*/
@Override
public void performLockClusterAllocation(String name) throws RemoteException {
}

/**
* @see IApiServerCommunicationService
*/
@Override
public Boolean retrieveClusterAllocationLocked(String name) throws RemoteException {
return false;
}

/**
* @see IApiServerCommunicationService
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void apply(ContentApplication contentApplication) throws ClusterApplicati
}
}

candidates.add(ClusterAllocationDto.of(name, workspaceUnitKey, locations, pid, context));
candidates.add(ClusterAllocationDto.of(name, false, workspaceUnitKey, locations, pid, context));
}

for (ClusterAllocationDto candidate : candidates) {
Expand Down Expand Up @@ -628,6 +628,8 @@ public List<TopologyInfoUnit> retrieveTopology(TopologyInfoApplication topologyI
* @throws ClusterUnhealthyReapplicationFailureException if RepoAchiever Cluster unhealthy allocation reapplication fails.
*/
public void reApplyUnhealthy() throws ClusterUnhealthyReapplicationFailureException {
telemetryService.increaseClusterHealthCheckAmount();

StateService.getTopologyStateGuard().lock();

List<ClusterAllocationDto> removables = new ArrayList<>();
Expand Down Expand Up @@ -687,6 +689,8 @@ public void reApplyUnhealthy() throws ClusterUnhealthyReapplicationFailureExcept
if (removables.isEmpty()) {
StateService.getTopologyStateGuard().unlock();

telemetryService.decreaseClusterHealthCheckAmount();

return;
}

Expand All @@ -701,11 +705,14 @@ public void reApplyUnhealthy() throws ClusterUnhealthyReapplicationFailureExcept
} catch (ClusterDeploymentFailureException e) {
StateService.getTopologyStateGuard().unlock();

telemetryService.decreaseClusterHealthCheckAmount();

throw new ClusterUnhealthyReapplicationFailureException(e.getMessage());
}

candidates.add(ClusterAllocationDto.of(
removable.getName(),
false,
removable.getWorkspaceUnitKey(),
removable.getLocations(),
pid,
Expand All @@ -723,6 +730,8 @@ public void reApplyUnhealthy() throws ClusterUnhealthyReapplicationFailureExcept
} catch (ClusterOperationFailureException e1) {
StateService.getTopologyStateGuard().unlock();

telemetryService.decreaseClusterHealthCheckAmount();

throw new ClusterUnhealthyReapplicationFailureException(e1.getMessage());
}

Expand All @@ -735,5 +744,7 @@ public void reApplyUnhealthy() throws ClusterUnhealthyReapplicationFailureExcept
StateService.addClusterAllocations(candidates);

StateService.getTopologyStateGuard().unlock();

telemetryService.decreaseClusterHealthCheckAmount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,36 @@ Boolean retrieveAdditionalContentPresent(String workspaceUnitKey, String locatio
*/
void performLogsTransfer(String name, String message) throws RemoteException;

// Boolean retrieveRegistration(String name) throws RemoteException;
/**
* Handles incoming download telemetry amount increase.
*
* @throws RemoteException if remote request fails.
*/
void performDownloadTelemetryIncrease() throws RemoteException;

/**
* Handles incoming download telemetry amount decrease.
*
* @throws RemoteException if remote request fails.
*/
void performDownloadTelemetryDecrease() throws RemoteException;

// TODO: add 'retrieveRegistration' and 'performLock' 'retrieveLocked' methods
/**
* Handles RepoAchiever Cluster allocation lock.
*
* @param name given RepoAchiever Cluster allocation name.
* @throws RemoteException if remote request fails.
*/
void performLockClusterAllocation(String name) throws RemoteException;

/**
* Retrieves RepoAchiever Cluster allocation lock state.
*
* @param name given RepoAchiever Cluster allocation name.
* @return RepoAchiever Cluster allocation lock state.
* @throws RemoteException if remote request fails.
*/
Boolean retrieveClusterAllocationLocked(String name) throws RemoteException;

/**
* Retrieves latest RepoAchiever API Server health check states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ public class StateService {
@Getter
private final static List<ClusterAllocationDto> clusterAllocations = new ArrayList<>();

/**
* Retrieves RepoAchiever Cluster allocations with the given name.
*
* @param name given name.
* @return filtered RepoAchiever Cluster allocations according to the given name.
*/
public static ClusterAllocationDto getClusterAllocationsByName(String name) {
return clusterAllocations.
stream()
.filter(element -> Objects.equals(element.getName(), name))
.toList()
.getFirst();
}

/**
* Retrieves RepoAchiever Cluster allocations with the given workspace unit key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ public class TelemetryService {

private final ConcurrentLinkedQueue<Runnable> clusterHealthCheckQueue = new ConcurrentLinkedQueue<>();

private final ConcurrentLinkedQueue<Runnable> clusterDownloadQueue = new ConcurrentLinkedQueue<>();

private final ConcurrentLinkedQueue<Runnable> rawContentUploadQueue = new ConcurrentLinkedQueue<>();

private final ConcurrentLinkedQueue<Runnable> additionalContentUploadQueue = new ConcurrentLinkedQueue<>();

private final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(6);

/**
* Starts telemetries listener, which handles incoming telemetries to be processed in a sequential way.
Expand All @@ -62,9 +64,13 @@ private void configure() {
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
if (!rawContentUploadQueue.isEmpty()) {
System.out.println("RECEIVED");
if (!clusterDownloadQueue.isEmpty()) {
clusterDownloadQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
if (!rawContentUploadQueue.isEmpty()) {
rawContentUploadQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);
Expand All @@ -76,24 +82,6 @@ private void configure() {
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);
}

/**
* Increases allocated workers amount counter.
*/
public void increaseWorkersAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
clusterStateQueue.add(() -> telemetryBinding.getWorkerAmount().set(telemetryBinding.getWorkerAmount().get() + 1));
}
}

/**
* Decreases allocated workers amount counter.
*/
public void decreaseWorkersAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
clusterStateQueue.add(() -> telemetryBinding.getWorkerAmount().set(telemetryBinding.getWorkerAmount().get() - 1));
}
}

/**
* Increases serving RepoAchiever Cluster allocations amount counter.
*/
Expand Down Expand Up @@ -166,13 +154,29 @@ public void decreaseClusterHealthCheckAmount() {
}
}

/**
* Increases downloads for RepoAchiever Cluster allocations amount counter.
*/
public void increaseClusterDownloadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
clusterDownloadQueue.add(() -> telemetryBinding.getClusterDownloadAmount().set(telemetryBinding.getClusterDownloadAmount().get() + 1));
}
}

/**
* Decreases downloads for RepoAchiever Cluster allocations amount counter.
*/
public void decreaseClusterDownloadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
clusterDownloadQueue.add(() -> telemetryBinding.getClusterDownloadAmount().set(telemetryBinding.getClusterDownloadAmount().get() - 1));
}
}

/**
* Increases raw content uploads for RepoAchiever Cluster allocations amount counter.
*/
public void increaseRawContentUploadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
System.out.println("GOT DATA");

rawContentUploadQueue.add(() -> telemetryBinding.getRawContentUploadAmount().set(telemetryBinding.getRawContentUploadAmount().get() + 1));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,28 @@
/**
* Service used to create custom telemetry bindings used to distribute application metrics.
*/
@Getter
@ApplicationScoped
public class TelemetryBinding implements MeterBinder {
@Getter
private final AtomicInteger workerAmount = new AtomicInteger();

@Getter
private final AtomicInteger servingClusterAmount = new AtomicInteger();

@Getter
private final AtomicInteger suspendedClusterAmount = new AtomicInteger();

@Getter
private final AtomicInteger apiServerHealthCheckAmount = new AtomicInteger();

@Getter
private final AtomicInteger clusterHealthCheckAmount = new AtomicInteger();

@Getter
private final AtomicInteger clusterDownloadAmount = new AtomicInteger();

private final AtomicInteger rawContentUploadAmount = new AtomicInteger();

@Getter
private final AtomicInteger additionalContentUploadAmount = new AtomicInteger();

/**
* @see MeterBinder
*/
@Override
public void bindTo(@NotNull MeterRegistry meterRegistry) {
Gauge.builder("general.worker_amount", workerAmount, AtomicInteger::get)
.description("Represents amount of allocated workers")
.register(meterRegistry);

Gauge.builder("general.serving_cluster_amount", servingClusterAmount, AtomicInteger::get)
.description("Represents amount of serving RepoAchiever Cluster allocations")
.register(meterRegistry);
Expand All @@ -61,6 +51,10 @@ public void bindTo(@NotNull MeterRegistry meterRegistry) {
.description("Represents amount of performed health check requests for RepoAchiever Cluster allocations")
.register(meterRegistry);

Gauge.builder("general.cluster_download_amount", clusterDownloadAmount, AtomicInteger::get)
.description("Represents amount of performed download requests for RepoAchiever Cluster allocations")
.register(meterRegistry);

Gauge.builder("general.raw_content_upload_amount", rawContentUploadAmount, AtomicInteger::get)
.description("Represents amount of performed raw content upload requests for RepoAchiever Cluster allocations")
.register(meterRegistry);
Expand Down
2 changes: 2 additions & 0 deletions cluster/src/main/java/com/repoachiever/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.repoachiever.service.apiserver.resource.ApiServerCommunicationResource;
import com.repoachiever.service.config.ConfigService;
import com.repoachiever.service.integration.communication.apiserver.healthcheck.ApiServerHealthCheckCommunicationService;
import com.repoachiever.service.integration.communication.apiserver.lock.ApiServerLockCommunicationService;
import com.repoachiever.service.integration.communication.cluster.ClusterCommunicationConfigService;
import com.repoachiever.service.integration.logging.state.LoggingStateService;
import com.repoachiever.service.integration.logging.transfer.LoggingTransferService;
Expand Down Expand Up @@ -32,6 +33,7 @@
VendorConfigService.class,
SchedulerConfigService.class,
ApiServerHealthCheckCommunicationService.class,
ApiServerLockCommunicationService.class,
ClusterCommunicationResource.class,
ClusterCommunicationConfigService.class,
ApiServerCommunicationResource.class,
Expand Down
Loading

0 comments on commit 11fb163

Please sign in to comment.