Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[b/376629469] Cloudera CPU timeseries API #653

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2022-2024 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.ByteSink;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The task collects hosts from Cloudera Manager <a
* href="https://cldr2-aw-dl-gateway.cldr2-cd.svye-dcxb.a5.cloudera.site/static/apidocs/resource_TimeSeriesResource.html">TimeSeries
* API</a> The chart is for whole cluster is available on {@code /cmf/home/} and {@code
* cmf/clusters/{clusterId}/status} pages in Cloudera Manager UI. <b/> The query to chart is written
* on <a
* href="https://docs.cloudera.com/documentation/enterprise/latest/topics/cm_dg_tsquery.html">tsquery</a>
* language.
*/
public class ClouderaClusterCPUChartTask extends AbstractClouderaManagerTask {
enum TimeSeriesAggregation {
RAW,
TEN_MINUTELY,
HOURLY,
SIX_HOURLY,
DAILY,
WEEKLY,
}

private static final Logger LOG = LoggerFactory.getLogger(ClouderaCMFHostsTask.class);
/*
SELECT cpu_percent_across_hosts WHERE entityName = "1546336862" AND category = CLUSTER
*/
private static final String TS_CPU_QUERY_TEMPLATE =
"SELECT cpu_percent_across_hosts WHERE entityName = \"%s\" AND category = CLUSTER";

private final ObjectMapper objectMapper = new ObjectMapper();

private final int includedLastDays;
private final TimeSeriesAggregation tsAggregation;

public ClouderaClusterCPUChartTask() {
this(1, TimeSeriesAggregation.RAW);
}

public ClouderaClusterCPUChartTask(int includedLastDays, TimeSeriesAggregation tsAggregation) {
super(
String.format(
"cmf-cluster-cpu-%s-%s.jsonl",
includedLastDays, tsAggregation.toString().toLowerCase()));
this.includedLastDays = includedLastDays;
this.tsAggregation = tsAggregation;
}

@Override
protected Void doRun(
TaskRunContext context, @Nonnull ByteSink sink, @Nonnull ClouderaManagerHandle handle)
throws Exception {
CloseableHttpClient httpClient = handle.getHttpClient();
List<ClouderaClusterDTO> clusters = getClustersFromHandle(handle);

final String timeSeriesAPIUrl = buildTimeSeriesUrl(handle.getApiURI().toString());
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
for (ClouderaClusterDTO cluster : clusters) {
String cpuPerClusterQuery = buildQueryToFetchCPUTimeSeriesOnCluster(cluster.getId());
LOG.debug(
"Execute charts query: [{}] for the cluster: [{}].",
cpuPerClusterQuery,
cluster.getName());

LOG.debug(this.tsAggregation.toString());

URIBuilder uriBuilder = new URIBuilder(timeSeriesAPIUrl);
uriBuilder.addParameter("query", cpuPerClusterQuery);
uriBuilder.addParameter("desiredRollup", this.tsAggregation.toString());
uriBuilder.addParameter("mustUseDesiredRollup", "true");
uriBuilder.addParameter("from", buildISODateTime(this.includedLastDays));

try (CloseableHttpResponse chart = httpClient.execute(new HttpGet(uriBuilder.build()))) {
JsonNode jsonNode = objectMapper.readTree(chart.getEntity().getContent());
writer.write(jsonNode.toString());
writer.write('\n');
}
}
}

return null;
}

private List<ClouderaClusterDTO> getClustersFromHandle(@Nonnull ClouderaManagerHandle handle) {
List<ClouderaClusterDTO> clusters = handle.getClusters();
if (clusters == null) {
throw new MetadataDumperUsageException(
"Cloudera clusters must be initialized before CPU charts dumping.");
}
List<ClouderaClusterDTO> cpuClusters = new ArrayList<>();
for (ClouderaClusterDTO cluster : clusters) {
if (cluster.getId() == null) {
LOG.warn(
"Cloudera cluster id is null for cluster [{}]. " + "Skip CPU metrics for the cluster.",
cluster.getName());
// todo it's might be critical data for TCO calculation and we should fail the dump
// process. Discuss with product
} else {
cpuClusters.add(cluster);
}
}
return cpuClusters;
}

private String buildTimeSeriesUrl(String apiUri) {
return apiUri + "/timeseries";
}

private String buildQueryToFetchCPUTimeSeriesOnCluster(String clusterId) {
return String.format(TS_CPU_QUERY_TEMPLATE, clusterId);
}

private String buildISODateTime(int deltaInDays) {
ZonedDateTime dt =
ZonedDateTime.of(LocalDateTime.now().minusDays(deltaInDays), ZoneId.of("UTC"));
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
return dt.format(formatter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsInput;
import com.google.edwmigration.dumper.application.dumper.connector.AbstractConnector;
import com.google.edwmigration.dumper.application.dumper.connector.Connector;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaClusterCPUChartTask.TimeSeriesAggregation;
import com.google.edwmigration.dumper.application.dumper.task.DumpMetadataTask;
import com.google.edwmigration.dumper.application.dumper.task.FormatTask;
import com.google.edwmigration.dumper.application.dumper.task.Task;
Expand Down Expand Up @@ -82,6 +83,10 @@ public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArg
out.add(new ClouderaCMFHostsTask());
out.add(new ClouderaAPIHostsTask());
out.add(new ClouderaServicesTask());
out.add(new ClouderaClusterCPUChartTask(1, TimeSeriesAggregation.HOURLY));
out.add(new ClouderaClusterCPUChartTask(7, TimeSeriesAggregation.DAILY));
out.add(new ClouderaClusterCPUChartTask(30, TimeSeriesAggregation.DAILY));
out.add(new ClouderaClusterCPUChartTask(90, TimeSeriesAggregation.DAILY));
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2022-2024 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyChar;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteSink;
import com.google.common.io.CharSink;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ClouderaClusterCPUChartTaskTest {
private final ClouderaClusterCPUChartTask task = new ClouderaClusterCPUChartTask();
private ClouderaManagerHandle handle;
private String servicesJson;

@Mock private TaskRunContext context;
@Mock private ByteSink sink;
@Mock private Writer writer;
@Mock private CharSink charSink;
@Mock private CloseableHttpClient httpClient;
private URI uri;

@Before
public void setUp() throws Exception {
servicesJson = readFileAsString("/cloudera/manager/cluster-cpu-status.json");
uri = URI.create("http://localhost/api");
handle = new ClouderaManagerHandle(uri, httpClient);

when(sink.asCharSink(eq(StandardCharsets.UTF_8))).thenReturn(charSink);
when(charSink.openBufferedStream()).thenReturn(writer);
}

@Test
public void noClusterId_skipWrites() throws Exception {
// GIVEN: There is no cluster with a valid cluster ID
handle.initClusters(ImmutableList.of(ClouderaClusterDTO.create(null, "single cluster")));

// WHEN
task.doRun(context, sink, handle);

// THEN: Task for such clusters should be skipped
verify(httpClient, never()).execute(any());
verifyNoWrites();
}

@Test
public void validCluster_doWrites() throws Exception {
// GIVEN: Valid cluster
handle.initClusters(ImmutableList.of(ClouderaClusterDTO.create("id1", "first-cluster")));

CloseableHttpResponse firstResponse = mock(CloseableHttpResponse.class);
HttpEntity firstEntity = mock(HttpEntity.class);
when(firstResponse.getEntity()).thenReturn(firstEntity);
when(firstEntity.getContent()).thenReturn(new ByteArrayInputStream(servicesJson.getBytes()));
when(httpClient.execute(argThat(get -> get != null))).thenReturn(firstResponse);

// WHEN
task.doRun(context, sink, handle);

// THEN: the output should be dumped into the jsonl format
Set<String> fileLines = new HashSet<>();
verify(writer, times(1))
.write(
(String)
argThat(
content -> {
String str = (String) content;
assertFalse(str.contains("\n"));
assertFalse(str.contains("\r"));
fileLines.add(str);
return true;
}));
assertEquals(ImmutableSet.of(tojsonl(servicesJson)), fileLines);
}

private void verifyNoWrites() throws IOException {
verify(writer, never()).write(anyChar());
verify(writer, never()).write(anyString());
verify(writer, never()).write(anyString(), anyInt(), anyInt());
verify(writer, never()).write(any(char[].class));
verify(writer, never()).write(any(char[].class), anyInt(), anyInt());
}

private String readFileAsString(String fileName) throws IOException, URISyntaxException {
return new String(Files.readAllBytes(Paths.get(this.getClass().getResource(fileName).toURI())));
}

private String tojsonl(String json) throws Exception {
return new ObjectMapper().readTree(json).toString();
}
}
Loading