Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ trait ClusterApi extends ElasticClientHelpers {
// Cache cluster name (avoids calling it every time)
private val cachedClusterName = new AtomicReference[Option[String]](None)

// Cache cluster UUID (avoids calling it every time)
private val cachedClusterUuid = new AtomicReference[Option[String]](None)

/** Get Elasticsearch cluster name.
* @return
* the Elasticsearch cluster name
Expand All @@ -50,9 +53,32 @@ trait ClusterApi extends ElasticClientHelpers {
}
}

/** Get Elasticsearch cluster UUID. This is a stable, unique identifier for the cluster.
* @return
* the Elasticsearch cluster UUID
*/
def clusterUuid: ElasticResult[String] = {
cachedClusterUuid.get match {
case Some(uuid) =>
ElasticSuccess(uuid)
case None =>
executeGetClusterUuid() match {
case ElasticSuccess(uuid) =>
logger.info(s"✅ Elasticsearch cluster uuid: $uuid")
cachedClusterUuid.compareAndSet(None, Some(uuid))
ElasticSuccess(cachedClusterUuid.get.getOrElse(uuid))
case failure @ ElasticFailure(error) =>
logger.error(s"❌ Failed to get Elasticsearch cluster UUID: ${error.message}")
failure
}
}
}

// ========================================================================
// METHODS TO IMPLEMENT
// ========================================================================

private[client] def executeGetClusterName(): ElasticResult[String]

private[client] def executeGetClusterUuid(): ElasticResult[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
override private[client] def executeGetClusterName(): ElasticResult[String] =
delegate.executeGetClusterName()

override private[client] def executeGetClusterUuid(): ElasticResult[String] =
delegate.executeGetClusterUuid()

// ==================== IndicesApi ====================

/** Create an index with the provided name and settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ trait ExtensionApi { self: ElasticClientApi =>
*/
lazy val licenseRefreshStrategy: LicenseRefreshStrategy = {
val ret = LicenseRefreshStrategyFactory.create(config, metrics)
clusterName match {
case ElasticSuccess(name) =>
clusterUuid match {
case ElasticSuccess(uuid) =>
ret.telemetryCollector.setClusterInfo(
id = name,
name = Some(name),
id = uuid,
name = clusterName match {
case ElasticSuccess(n) => Some(n)
case _ => None
},
version = version match {
case ElasticSuccess(v) => Some(v)
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ trait NopeClientApi extends ElasticClientApi {
override private[client] def executeGetClusterName(): ElasticResult[String] =
ElasticResult.success("nope-cluster")

override private[client] def executeGetClusterUuid(): ElasticResult[String] =
ElasticResult.success("nope-uuid")

override private[client] def executeGetIndex(index: String): ElasticResult[Option[String]] =
ElasticResult.success(None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
package app.softnetwork.elastic.client.metrics

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

import scala.jdk.CollectionConverters._

class MetricsCollector extends MetricsApi {

private val metrics = new ConcurrentHashMap[String, MetricAccumulator]()
private val indexMetrics = new ConcurrentHashMap[String, MetricAccumulator]()
private val metrics =
new AtomicReference(new ConcurrentHashMap[String, MetricAccumulator]())
private val indexMetrics =
new AtomicReference(new ConcurrentHashMap[String, MetricAccumulator]())

private class MetricAccumulator {
val totalOps = new AtomicLong(0)
Expand Down Expand Up @@ -100,19 +102,20 @@ class MetricsCollector extends MetricsApi {
index: Option[String] = None
): Unit = {
// Record operation metrics
val accumulator = metrics.computeIfAbsent(operation, _ => new MetricAccumulator())
val accumulator = metrics.get().computeIfAbsent(operation, _ => new MetricAccumulator())
accumulator.record(duration, success)

// Record index metrics if provided
index.foreach { idx =>
val idxAccumulator = indexMetrics.computeIfAbsent(idx, _ => new MetricAccumulator())
val idxAccumulator = indexMetrics.get().computeIfAbsent(idx, _ => new MetricAccumulator())
idxAccumulator.record(duration, success)
}
}

override def getMetrics: OperationMetrics = {
val allMetrics = metrics.asScala.values.toSeq

private def computeGlobal(
metricsMap: ConcurrentHashMap[String, MetricAccumulator]
): OperationMetrics = {
val allMetrics = metricsMap.asScala.values.toSeq
if (allMetrics.isEmpty) {
OperationMetrics("all", 0, 0, 0, 0, 0, 0, 0)
} else {
Expand All @@ -133,32 +136,47 @@ class MetricsCollector extends MetricsApi {
}
}

override def getMetricsByOperation(operation: String): Option[OperationMetrics] = {
Option(metrics.get(operation)).map(_.toMetrics(operation))
}

override def getMetricsByIndex(index: String): Option[OperationMetrics] = {
Option(indexMetrics.get(index)).map(_.toMetrics(index))
}

override def getAggregatedMetrics: AggregatedMetrics = {
val globalMetrics = getMetrics
private def buildAggregated(
opsMap: ConcurrentHashMap[String, MetricAccumulator],
idxMap: ConcurrentHashMap[String, MetricAccumulator]
): AggregatedMetrics = {
val globalMetrics = computeGlobal(opsMap)
AggregatedMetrics(
totalOperations = globalMetrics.totalOperations,
successCount = globalMetrics.successCount,
failureCount = globalMetrics.failureCount,
totalDuration = globalMetrics.totalDuration,
operationMetrics = metrics.asScala.map { case (op, acc) =>
operationMetrics = opsMap.asScala.map { case (op, acc) =>
op -> acc.toMetrics(op)
}.toMap,
indexMetrics = indexMetrics.asScala.map { case (idx, acc) =>
indexMetrics = idxMap.asScala.map { case (idx, acc) =>
idx -> acc.toMetrics(idx)
}.toMap
)
}

override def getMetrics: OperationMetrics = computeGlobal(metrics.get())

override def getMetricsByOperation(operation: String): Option[OperationMetrics] =
Option(metrics.get().get(operation)).map(_.toMetrics(operation))

override def getMetricsByIndex(index: String): Option[OperationMetrics] =
Option(indexMetrics.get().get(index)).map(_.toMetrics(index))

override def getAggregatedMetrics: AggregatedMetrics =
buildAggregated(metrics.get(), indexMetrics.get())

override def resetMetrics(): Unit = {
metrics.clear()
indexMetrics.clear()
metrics.set(new ConcurrentHashMap[String, MetricAccumulator]())
indexMetrics.set(new ConcurrentHashMap[String, MetricAccumulator]())
}

/** Atomically swap both maps with fresh empty ones, then build the snapshot from the old maps.
* Operations recorded after the swap go into the new maps and are not lost.
*/
override def collectAndResetAggregatedMetrics: AggregatedMetrics = {
val oldMetrics = metrics.getAndSet(new ConcurrentHashMap[String, MetricAccumulator]())
val oldIndexMetrics = indexMetrics.getAndSet(new ConcurrentHashMap[String, MetricAccumulator]())
buildAggregated(oldMetrics, oldIndexMetrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class MetricsElasticClient(
delegate.clusterName
}

override def clusterUuid: ElasticResult[String] =
measureResult("cluster_uuid") {
delegate.clusterUuid
}

// ==================== IndicesApi ====================

override def createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,17 @@ trait JestClusterApi extends ClusterApi with JestClientHelpers {
val json = JsonMethods.parse(jsonString)
(json \ "cluster_name").extract[String]
}

override private[client] def executeGetClusterUuid(): result.ElasticResult[String] =
executeJestAction(
"cluster_uuid",
retryable = true
)(
new GetClusterInfo.Builder().build()
) { result =>
val jsonString = result.getJsonString
implicit val formats: DefaultFormats.type = DefaultFormats
val json = JsonMethods.parse(jsonString)
(json \ "cluster_uuid").extract[String]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package app.softnetwork.elastic.client

import app.softnetwork.elastic.client.spi.JestClientSpi
import app.softnetwork.elastic.scalatest.ElasticDockerTestKit

class JestClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit {
override lazy val client: ClusterApi = new JestClientSpi().client(elasticConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ trait RestHighLevelClientClusterApi extends ClusterApi with RestHighLevelClientH
}
)

override private[client] def executeGetClusterUuid(): ElasticResult[String] =
executeRestLowLevelAction[String](
operation = "cluster_uuid",
index = None,
retryable = true
)(
request = new Request("GET", "/")
)(
transformer = resp => {
val jsonString = EntityUtils.toString(resp.getEntity)
implicit val formats: DefaultFormats.type = DefaultFormats
val json = JsonMethods.parse(jsonString)
(json \ "cluster_uuid").extract[String]
}
)

}

/** Indices management API for RestHighLevelClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package app.softnetwork.elastic.client

import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi
import app.softnetwork.elastic.scalatest.ElasticDockerTestKit

class RestHighLevelClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit {
override lazy val client: ClusterApi = new RestHighLevelClientSpi().client(elasticConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,22 @@ trait RestHighLevelClientClusterApi extends ClusterApi with RestHighLevelClientH
}
)

override private[client] def executeGetClusterUuid(): ElasticResult[String] =
executeRestLowLevelAction[String](
operation = "cluster_uuid",
index = None,
retryable = true
)(
request = new Request("GET", "/")
)(
transformer = resp => {
val jsonString = EntityUtils.toString(resp.getEntity)
implicit val formats: DefaultFormats.type = DefaultFormats
val json = JsonMethods.parse(jsonString)
(json \ "cluster_uuid").extract[String]
}
)

}

/** Indices management API for RestHighLevelClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package app.softnetwork.elastic.client

import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi
import app.softnetwork.elastic.scalatest.ElasticDockerTestKit

class RestHighLevelClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit {
override lazy val client: ClusterApi = new RestHighLevelClientSpi().client(elasticConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ trait JavaClientClusterApi extends ClusterApi with JavaClientHelpers {
) { response =>
response.clusterName()
}

override private[client] def executeGetClusterUuid(): ElasticResult[String] =
executeJavaAction(
operation = "cluster_uuid",
index = None,
retryable = true
)(
apply().info()
) { response =>
response.clusterUuid()
}
}

/** Elasticsearch client implementation of Indices API using the Java Client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package app.softnetwork.elastic.client

import app.softnetwork.elastic.client.spi.JavaClientSpi
import app.softnetwork.elastic.scalatest.ElasticDockerTestKit

class JavaClientClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit {
override lazy val client: ClusterApi = new JavaClientSpi().client(elasticConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ trait JavaClientClusterApi extends ClusterApi with JavaClientHelpers {
) { response =>
response.clusterName()
}

override private[client] def executeGetClusterUuid(): ElasticResult[String] =
executeJavaAction(
operation = "cluster_uuid",
index = None,
retryable = true
)(
apply().info()
) { response =>
response.clusterUuid()
}
}

/** Elasticsearch client implementation of Indices API using the Java Client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package app.softnetwork.elastic.client

import app.softnetwork.elastic.client.spi.JavaClientSpi
import app.softnetwork.elastic.scalatest.ElasticDockerTestKit

class JavaClientClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit {
override lazy val client: ClusterApi = new JavaClientSpi().client(elasticConfig)
}
Loading
Loading