Skip to content

Commit

Permalink
fix: fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
YarikRevich committed Jun 1, 2024
1 parent fa56d16 commit 11568d1
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,6 @@ public String toString() {
@JsonProperty("content")
public Content content;

/**
* Represents RepoAchiever API Server configuration used for database setup.
*/
@Getter
public static class Database {
@NotNull
@JsonProperty("re-init")
public Boolean reInit;
}

@Valid
@NotNull
@JsonProperty("database")
public Database database;

/**
* Represents RepoAchiever API Server configuration used for diagnostics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,19 @@ public ApiServerCommunicationResource() throws RemoteException {
@Override
public void performRawContentUpload(
String workspaceUnitKey, String location, String name, RemoteInputStream content) throws RemoteException {
telemetryService.increaseRawContentUploadAmount();

StateService.getCommunicationGuard().lock();

InputStream contentRaw;

try {
contentRaw = RemoteInputStreamClient.wrap(content);
} catch (IOException e) {
StateService.getCommunicationGuard().unlock();

telemetryService.decreaseRawContentUploadAmount();

throw new RuntimeException(e);
}

Expand All @@ -70,10 +76,14 @@ public void performRawContentUpload(
} catch (RawContentCreationFailureException e) {
StateService.getCommunicationGuard().unlock();

telemetryService.decreaseRawContentUploadAmount();

throw new RemoteException(e.getMessage());
}

StateService.getCommunicationGuard().unlock();

telemetryService.decreaseRawContentUploadAmount();
}

/**
Expand Down Expand Up @@ -105,10 +115,16 @@ public Boolean retrieveRawContentPresent(String workspaceUnitKey, String locatio
@Override
public void performAdditionalContentUpload(
String workspaceUnitKey, String location, String name, String data) throws RemoteException {
telemetryService.increaseAdditionalContentUploadAmount();

StateService.getCommunicationGuard().lock();

Map<String, String> rawData = JsonToAdditionalContentDataConverter.convert(data);
if (Objects.isNull(rawData)) {
StateService.getCommunicationGuard().unlock();

telemetryService.decreaseAdditionalContentUploadAmount();

throw new RemoteException();
}

Expand All @@ -124,10 +140,14 @@ public void performAdditionalContentUpload(
} catch (AdditionalContentCreationFailureException e) {
StateService.getCommunicationGuard().unlock();

telemetryService.decreaseAdditionalContentUploadAmount();

throw new RemoteException(e.getMessage());
}

StateService.getCommunicationGuard().unlock();

telemetryService.decreaseAdditionalContentUploadAmount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,18 +463,6 @@ public void removeContent(ContentCleanup contentCleanup) throws ClusterCleanupFa
}

if (Objects.nonNull(clusterAllocation)) {
logger.info(
String.format(
"Resetting RepoAchiever Cluster content retrieval: '%s'", clusterAllocation.getName()));

try {
clusterCommunicationResource.performRetrievalReset(clusterAllocation.getName());
} catch (ClusterOperationFailureException e) {
logger.fatal(new ClusterCleanupFailureException(e.getMessage()).getMessage());

return;
}

logger.info(
String.format(
"Setting RepoAchiever Cluster suspended allocation to serve state: '%s'",
Expand Down Expand Up @@ -542,18 +530,6 @@ public void removeAll(ContentCleanupAll contentCleanupAll) throws ClusterFullCle
}

for (ClusterAllocationDto suspended : suspends) {
logger.info(
String.format(
"Resetting RepoAchiever Cluster content retrieval: '%s'", suspended.getName()));

try {
clusterCommunicationResource.performRetrievalReset(suspended.getName());
} catch (ClusterOperationFailureException e) {
logger.fatal(new ClusterCleanupFailureException(e.getMessage()).getMessage());

return;
}

logger.info(
String.format(
"Setting RepoAchiever Cluster suspended allocation to serve state: '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,6 @@ public void performServe(String name) throws ClusterOperationFailureException {
}
}

/**
* Performs RepoAchiever Cluster content retrieval reset operation.
*
* @param name given name of RepoAchiever Cluster.
* @throws ClusterOperationFailureException if RepoAchiever Cluster operation fails.
*/
public void performRetrievalReset(String name) throws ClusterOperationFailureException {
IClusterCommunicationService allocation = retrieveAllocation(name);

try {
allocation.performRetrievalReset();
} catch (RemoteException e) {
throw new ClusterOperationFailureException(e.getMessage());
}
}

/**
* Retrieves health check status of the RepoAchiever Cluster with the given name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ public interface IClusterCommunicationService extends Remote {
*/
void performServe() throws RemoteException;

/**
* Performs RepoAchiever Cluster content retrieval reset operation.
*
* @throws RemoteException if remote request fails.
*/
void performRetrievalReset() throws RemoteException;

/**
* Retrieves latest RepoAchiever Cluster health check states.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ public class TelemetryService {

private final ConcurrentLinkedQueue<Runnable> clusterHealthCheckQueue = new ConcurrentLinkedQueue<>();

private final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final ConcurrentLinkedQueue<Runnable> rawContentUploadQueue = new ConcurrentLinkedQueue<>();

private final ConcurrentLinkedQueue<Runnable> additionalContentUploadQueue = new ConcurrentLinkedQueue<>();

private final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

/**
* Starts telemetries listener, which handles incoming telemetries to be processed in a sequential way.
Expand All @@ -56,6 +60,20 @@ private void configure() {
clusterStateQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
if (!rawContentUploadQueue.isEmpty()) {
System.out.println("RECEIVED");

rawContentUploadQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
if (!additionalContentUploadQueue.isEmpty()) {
additionalContentUploadQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -147,4 +165,42 @@ public void decreaseClusterHealthCheckAmount() {
clusterHealthCheckQueue.add(() -> telemetryBinding.getClusterHealthCheckAmount().set(telemetryBinding.getClusterHealthCheckAmount().get() - 1));
}
}

/**
* Increases raw content uploads for RepoAchiever Cluster allocations amount counter.
*/
public void increaseRawContentUploadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
System.out.println("GOT DATA");

rawContentUploadQueue.add(() -> telemetryBinding.getRawContentUploadAmount().set(telemetryBinding.getRawContentUploadAmount().get() + 1));
}
}

/**
* Decreases raw content uploads for RepoAchiever Cluster allocations amount counter.
*/
public void decreaseRawContentUploadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
rawContentUploadQueue.add(() -> telemetryBinding.getRawContentUploadAmount().set(telemetryBinding.getRawContentUploadAmount().get() - 1));
}
}

/**
* Increases additional content uploads for RepoAchiever Cluster allocations amount counter.
*/
public void increaseAdditionalContentUploadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
additionalContentUploadQueue.add(() -> telemetryBinding.getAdditionalContentUploadAmount().set(telemetryBinding.getAdditionalContentUploadAmount().get() + 1));
}
}

/**
* Decreases additional content uploads for RepoAchiever Cluster allocations amount counter.
*/
public void decreaseAdditionalContentUploadAmount() {
if (configService.getConfig().getDiagnostics().getEnabled()) {
additionalContentUploadQueue.add(() -> telemetryBinding.getAdditionalContentUploadAmount().set(telemetryBinding.getAdditionalContentUploadAmount().get() - 1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public class TelemetryBinding implements MeterBinder {
@Getter
private final AtomicInteger clusterHealthCheckAmount = new AtomicInteger();

@Getter
private final AtomicInteger rawContentUploadAmount = new AtomicInteger();

@Getter
private final AtomicInteger additionalContentUploadAmount = new AtomicInteger();

/**
* @see MeterBinder
*/
Expand All @@ -54,5 +60,13 @@ public void bindTo(@NotNull MeterRegistry meterRegistry) {
Gauge.builder("general.cluster_health_check_amount", clusterHealthCheckAmount, AtomicInteger::get)
.description("Represents amount of performed health check requests for RepoAchiever Cluster allocations")
.register(meterRegistry);

Gauge.builder("general.raw_content_upload_amount", rawContentUploadAmount, AtomicInteger::get)
.description("Represents amount of performed raw content upload requests for RepoAchiever Cluster allocations")
.register(meterRegistry);

Gauge.builder("general.additional_content_upload_amount", additionalContentUploadAmount, AtomicInteger::get)
.description("Represents amount of performed additional content upload requests for RepoAchiever Cluster allocations")
.register(meterRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,6 @@ public void performServe() throws RemoteException {
StateService.getSuspendGuard().unlock();
}

/**
* @see IClusterCommunicationService
*/
@Override
public void performRetrievalReset() throws RemoteException {
// StateService.resetContentUpdatesHeadCounter();
}

/**
* @see IClusterCommunicationService
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ public interface IClusterCommunicationService extends Remote {
*/
void performServe() throws RemoteException;

/**
* Performs RepoAchiever Cluster content retrieval reset operation.
*
* @throws RemoteException if remote request fails.
*/
void performRetrievalReset() throws RemoteException;

/**
* Retrieves latest RepoAchiever Cluster health check states.
*
Expand Down
Loading

0 comments on commit 11568d1

Please sign in to comment.