diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ClusterApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/ClusterApi.scala index a8d15663..29eb5442 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ClusterApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ClusterApi.scala @@ -29,6 +29,9 @@ trait ClusterApi extends ElasticClientHelpers { // Cache cluster name (avoids calling it every time) private val cachedClusterName = new AtomicReference[Option[String]](None) + // Cache cluster UUID (avoids calling it every time) + private val cachedClusterUuid = new AtomicReference[Option[String]](None) + /** Get Elasticsearch cluster name. * @return * the Elasticsearch cluster name @@ -50,9 +53,32 @@ trait ClusterApi extends ElasticClientHelpers { } } + /** Get Elasticsearch cluster UUID. This is a stable, unique identifier for the cluster. + * @return + * the Elasticsearch cluster UUID + */ + def clusterUuid: ElasticResult[String] = { + cachedClusterUuid.get match { + case Some(uuid) => + ElasticSuccess(uuid) + case None => + executeGetClusterUuid() match { + case ElasticSuccess(uuid) => + logger.info(s"✅ Elasticsearch cluster uuid: $uuid") + cachedClusterUuid.compareAndSet(None, Some(uuid)) + ElasticSuccess(cachedClusterUuid.get.getOrElse(uuid)) + case failure @ ElasticFailure(error) => + logger.error(s"❌ Failed to get Elasticsearch cluster UUID: ${error.message}") + failure + } + } + } + // ======================================================================== // METHODS TO IMPLEMENT // ======================================================================== private[client] def executeGetClusterName(): ElasticResult[String] + + private[client] def executeGetClusterUuid(): ElasticResult[String] } diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala index 9e3c9088..2c7b299b 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala @@ -96,6 +96,9 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { override private[client] def executeGetClusterName(): ElasticResult[String] = delegate.executeGetClusterName() + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + delegate.executeGetClusterUuid() + // ==================== IndicesApi ==================== /** Create an index with the provided name and settings. diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala index f369b573..04c25816 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala @@ -31,11 +31,14 @@ trait ExtensionApi { self: ElasticClientApi => */ lazy val licenseRefreshStrategy: LicenseRefreshStrategy = { val ret = LicenseRefreshStrategyFactory.create(config, metrics) - clusterName match { - case ElasticSuccess(name) => + clusterUuid match { + case ElasticSuccess(uuid) => ret.telemetryCollector.setClusterInfo( - id = name, - name = Some(name), + id = uuid, + name = clusterName match { + case ElasticSuccess(n) => Some(n) + case _ => None + }, version = version match { case ElasticSuccess(v) => Some(v) case _ => None diff --git a/core/src/main/scala/app/softnetwork/elastic/client/NopeClientApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/NopeClientApi.scala index cba9869c..fa151763 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/NopeClientApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/NopeClientApi.scala @@ -266,6 +266,9 @@ trait NopeClientApi extends ElasticClientApi { override private[client] def executeGetClusterName(): ElasticResult[String] = ElasticResult.success("nope-cluster") + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + ElasticResult.success("nope-uuid") + override private[client] def executeGetIndex(index: String): ElasticResult[Option[String]] = ElasticResult.success(None) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsCollector.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsCollector.scala index 76ce8f92..58d1f317 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsCollector.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsCollector.scala @@ -17,14 +17,16 @@ package app.softnetwork.elastic.client.metrics import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import scala.jdk.CollectionConverters._ class MetricsCollector extends MetricsApi { - private val metrics = new ConcurrentHashMap[String, MetricAccumulator]() - private val indexMetrics = new ConcurrentHashMap[String, MetricAccumulator]() + private val metrics = + new AtomicReference(new ConcurrentHashMap[String, MetricAccumulator]()) + private val indexMetrics = + new AtomicReference(new ConcurrentHashMap[String, MetricAccumulator]()) private class MetricAccumulator { val totalOps = new AtomicLong(0) @@ -100,19 +102,20 @@ class MetricsCollector extends MetricsApi { index: Option[String] = None ): Unit = { // Record operation metrics - val accumulator = metrics.computeIfAbsent(operation, _ => new MetricAccumulator()) + val accumulator = metrics.get().computeIfAbsent(operation, _ => new MetricAccumulator()) accumulator.record(duration, success) // Record index metrics if provided index.foreach { idx => - val idxAccumulator = indexMetrics.computeIfAbsent(idx, _ => new MetricAccumulator()) + val idxAccumulator = indexMetrics.get().computeIfAbsent(idx, _ => new MetricAccumulator()) idxAccumulator.record(duration, success) } } - override def getMetrics: OperationMetrics = { - val allMetrics = metrics.asScala.values.toSeq - + private def computeGlobal( + metricsMap: ConcurrentHashMap[String, MetricAccumulator] + ): OperationMetrics = { + val allMetrics = metricsMap.asScala.values.toSeq if (allMetrics.isEmpty) { OperationMetrics("all", 0, 0, 0, 0, 0, 0, 0) } else { @@ -133,32 +136,47 @@ class MetricsCollector extends MetricsApi { } } - override def getMetricsByOperation(operation: String): Option[OperationMetrics] = { - Option(metrics.get(operation)).map(_.toMetrics(operation)) - } - - override def getMetricsByIndex(index: String): Option[OperationMetrics] = { - Option(indexMetrics.get(index)).map(_.toMetrics(index)) - } - - override def getAggregatedMetrics: AggregatedMetrics = { - val globalMetrics = getMetrics + private def buildAggregated( + opsMap: ConcurrentHashMap[String, MetricAccumulator], + idxMap: ConcurrentHashMap[String, MetricAccumulator] + ): AggregatedMetrics = { + val globalMetrics = computeGlobal(opsMap) AggregatedMetrics( totalOperations = globalMetrics.totalOperations, successCount = globalMetrics.successCount, failureCount = globalMetrics.failureCount, totalDuration = globalMetrics.totalDuration, - operationMetrics = metrics.asScala.map { case (op, acc) => + operationMetrics = opsMap.asScala.map { case (op, acc) => op -> acc.toMetrics(op) }.toMap, - indexMetrics = indexMetrics.asScala.map { case (idx, acc) => + indexMetrics = idxMap.asScala.map { case (idx, acc) => idx -> acc.toMetrics(idx) }.toMap ) } + override def getMetrics: OperationMetrics = computeGlobal(metrics.get()) + + override def getMetricsByOperation(operation: String): Option[OperationMetrics] = + Option(metrics.get().get(operation)).map(_.toMetrics(operation)) + + override def getMetricsByIndex(index: String): Option[OperationMetrics] = + Option(indexMetrics.get().get(index)).map(_.toMetrics(index)) + + override def getAggregatedMetrics: AggregatedMetrics = + buildAggregated(metrics.get(), indexMetrics.get()) + override def resetMetrics(): Unit = { - metrics.clear() - indexMetrics.clear() + metrics.set(new ConcurrentHashMap[String, MetricAccumulator]()) + indexMetrics.set(new ConcurrentHashMap[String, MetricAccumulator]()) + } + + /** Atomically swap both maps with fresh empty ones, then build the snapshot from the old maps. + * Operations recorded after the swap go into the new maps and are not lost. + */ + override def collectAndResetAggregatedMetrics: AggregatedMetrics = { + val oldMetrics = metrics.getAndSet(new ConcurrentHashMap[String, MetricAccumulator]()) + val oldIndexMetrics = indexMetrics.getAndSet(new ConcurrentHashMap[String, MetricAccumulator]()) + buildAggregated(oldMetrics, oldIndexMetrics) } } diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala index 9f647001..a2d6cb23 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala @@ -106,6 +106,11 @@ class MetricsElasticClient( delegate.clusterName } + override def clusterUuid: ElasticResult[String] = + measureResult("cluster_uuid") { + delegate.clusterUuid + } + // ==================== IndicesApi ==================== override def createIndex( diff --git a/es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClusterApi.scala b/es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClusterApi.scala index a8d0dd65..9faa0ba2 100644 --- a/es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClusterApi.scala +++ b/es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClusterApi.scala @@ -35,4 +35,17 @@ trait JestClusterApi extends ClusterApi with JestClientHelpers { val json = JsonMethods.parse(jsonString) (json \ "cluster_name").extract[String] } + + override private[client] def executeGetClusterUuid(): result.ElasticResult[String] = + executeJestAction( + "cluster_uuid", + retryable = true + )( + new GetClusterInfo.Builder().build() + ) { result => + val jsonString = result.getJsonString + implicit val formats: DefaultFormats.type = DefaultFormats + val json = JsonMethods.parse(jsonString) + (json \ "cluster_uuid").extract[String] + } } diff --git a/es6/jest/src/test/scala/app/softnetwork/elastic/client/JestClusterApiSpec.scala b/es6/jest/src/test/scala/app/softnetwork/elastic/client/JestClusterApiSpec.scala new file mode 100644 index 00000000..b505864d --- /dev/null +++ b/es6/jest/src/test/scala/app/softnetwork/elastic/client/JestClusterApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JestClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class JestClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit { + override lazy val client: ClusterApi = new JestClientSpi().client(elasticConfig) +} diff --git a/es6/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala b/es6/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala index def0752a..c8b2c42d 100644 --- a/es6/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala +++ b/es6/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala @@ -199,6 +199,22 @@ trait RestHighLevelClientClusterApi extends ClusterApi with RestHighLevelClientH } ) + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + executeRestLowLevelAction[String]( + operation = "cluster_uuid", + index = None, + retryable = true + )( + request = new Request("GET", "/") + )( + transformer = resp => { + val jsonString = EntityUtils.toString(resp.getEntity) + implicit val formats: DefaultFormats.type = DefaultFormats + val json = JsonMethods.parse(jsonString) + (json \ "cluster_uuid").extract[String] + } + ) + } /** Indices management API for RestHighLevelClient diff --git a/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClusterApiSpec.scala b/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClusterApiSpec.scala new file mode 100644 index 00000000..8d024aa5 --- /dev/null +++ b/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClusterApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class RestHighLevelClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit { + override lazy val client: ClusterApi = new RestHighLevelClientSpi().client(elasticConfig) +} diff --git a/es7/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala b/es7/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala index a44e844e..fff10fcd 100644 --- a/es7/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala +++ b/es7/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala @@ -257,6 +257,22 @@ trait RestHighLevelClientClusterApi extends ClusterApi with RestHighLevelClientH } ) + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + executeRestLowLevelAction[String]( + operation = "cluster_uuid", + index = None, + retryable = true + )( + request = new Request("GET", "/") + )( + transformer = resp => { + val jsonString = EntityUtils.toString(resp.getEntity) + implicit val formats: DefaultFormats.type = DefaultFormats + val json = JsonMethods.parse(jsonString) + (json \ "cluster_uuid").extract[String] + } + ) + } /** Indices management API for RestHighLevelClient diff --git a/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClusterApiSpec.scala b/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClusterApiSpec.scala new file mode 100644 index 00000000..8d024aa5 --- /dev/null +++ b/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClusterApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class RestHighLevelClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit { + override lazy val client: ClusterApi = new RestHighLevelClientSpi().client(elasticConfig) +} diff --git a/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala b/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala index 86488176..884ecf5d 100644 --- a/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala +++ b/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala @@ -183,6 +183,17 @@ trait JavaClientClusterApi extends ClusterApi with JavaClientHelpers { ) { response => response.clusterName() } + + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + executeJavaAction( + operation = "cluster_uuid", + index = None, + retryable = true + )( + apply().info() + ) { response => + response.clusterUuid() + } } /** Elasticsearch client implementation of Indices API using the Java Client diff --git a/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaClientClusterApiSpec.scala b/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaClientClusterApiSpec.scala new file mode 100644 index 00000000..bb4419fd --- /dev/null +++ b/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaClientClusterApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JavaClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class JavaClientClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit { + override lazy val client: ClusterApi = new JavaClientSpi().client(elasticConfig) +} diff --git a/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala b/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala index 7e94f2a2..86cb94ad 100644 --- a/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala +++ b/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala @@ -178,6 +178,17 @@ trait JavaClientClusterApi extends ClusterApi with JavaClientHelpers { ) { response => response.clusterName() } + + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + executeJavaAction( + operation = "cluster_uuid", + index = None, + retryable = true + )( + apply().info() + ) { response => + response.clusterUuid() + } } /** Elasticsearch client implementation of Indices API using the Java Client diff --git a/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaClientClusterApiSpec.scala b/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaClientClusterApiSpec.scala new file mode 100644 index 00000000..bb4419fd --- /dev/null +++ b/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaClientClusterApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JavaClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class JavaClientClusterApiSpec extends ClusterApiSpec with ElasticDockerTestKit { + override lazy val client: ClusterApi = new JavaClientSpi().client(elasticConfig) +} diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala index 1dc159aa..c03ead37 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala @@ -22,6 +22,7 @@ import com.typesafe.scalalogging.LazyLogging import java.util.ServiceLoader import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec import scala.jdk.CollectionConverters._ /** Factory for creating and caching LicenseRefreshStrategy instances. @@ -91,9 +92,11 @@ object LicenseRefreshStrategyFactory extends LazyLogging { else Some(LicenseMode.Driver) } - /** Resolve strategy via SPI, initialize it, and cache it. Synchronized to prevent concurrent - * creation of duplicate strategies (which would leak resources like Akka schedulers). + /** Resolve strategy via SPI, initialize it, and cache it via CAS. If two threads race, only one + * will successfully CAS; the loser shuts down its unused strategy and retries to pick up the + * winner's cached instance. */ + @tailrec private def resolveStrategy( config: Config, metrics: MetricsApi @@ -101,43 +104,30 @@ object LicenseRefreshStrategyFactory extends LazyLogging { _strategy.get() match { case Some(s) => s case None => - synchronized { - // Double-check after acquiring lock - _strategy.get() match { - case Some(s) => s - case None => - val mode = resolveMode(config) - val loader = ServiceLoader.load(classOf[LicenseManagerSpi]) - val spis = loader.iterator().asScala.toSeq.sortBy(_.priority) - val strategy = spis.headOption - .map { spi => - try { - val s = spi.createStrategy(config, mode, metrics) - s.initialize() - logger.info( - s"License strategy initialized: ${s.getClass.getSimpleName} " + - s"(mode=${mode.getOrElse("default")}, type=${s.licenseManager.licenseType})" - ) - s - } catch { - case e: Exception => - logger.error( - s"Failed to create license strategy from ${spi.getClass.getName}: ${e.getMessage}", - e - ) - val fallback = new NopRefreshStrategy() - fallback.initialize() - fallback - } - } - .getOrElse { - val fallback = new NopRefreshStrategy() - fallback.initialize() - fallback - } - _strategy.set(Some(strategy)) - strategy + val mode = resolveMode(config) + val loader = ServiceLoader.load(classOf[LicenseManagerSpi]) + val spis = loader.iterator().asScala.toSeq.sortBy(_.priority) + val strategy = spis.headOption + .map { spi => + val s = spi.createStrategy(config, mode, metrics) + s.initialize() + logger.info( + s"License strategy initialized: ${s.getClass.getSimpleName} " + + s"(mode=${mode.getOrElse("default")}, type=${s.licenseManager.licenseType})" + ) + s } + .getOrElse { + val fallback = new NopRefreshStrategy() + fallback.initialize() + fallback + } + if (_strategy.compareAndSet(None, Some(strategy))) { + strategy + } else { + // Another thread won the race — shut down ours, retry to pick up the winner's + strategy.shutdown() + resolveStrategy(config, metrics) } } } diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala index 7bec992b..371529de 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala @@ -45,46 +45,68 @@ case class TelemetryData( * update counters via increment/set methods through * `LicenseRefreshStrategyFactory.create(config).telemetryCollector`. * - * Thread-safe: counters use `AtomicLong`, gauges use `@volatile`. + * Thread-safe: `queriesTotal` uses `AtomicLong` for high-frequency increments. Cluster info fields + * (`clusterId`, `clusterName`, `clusterVersion`) and gauges (`mvsActive`, `clustersConnected`) are + * guarded by `clusterInfoLock` to ensure atomic reads and writes. */ class TelemetryCollector { private val _queriesTotal = new AtomicLong(0L) - @volatile private var _mvsActive: Int = 0 - @volatile private var _clustersConnected: Int = 0 - @volatile private var _clusterId: Option[String] = None - @volatile private var _clusterName: Option[String] = None - @volatile private var _clusterVersion: Option[String] = None + private val clusterInfoLock = new AnyRef + private var _mvsActive: Int = 0 + private var _clustersConnected: Int = 0 + private var _clusterId: Option[String] = None + private var _clusterName: Option[String] = None + private var _clusterVersion: Option[String] = None // --- Write methods (called by extensions) --- def incrementQueries(): Unit = { val _ = _queriesTotal.incrementAndGet() } - def setMvsActive(count: Int): Unit = { _mvsActive = count } + def setMvsActive(count: Int): Unit = clusterInfoLock.synchronized { _mvsActive = count } - def setClustersConnected(count: Int): Unit = { _clustersConnected = count } + def setClustersConnected(count: Int): Unit = clusterInfoLock.synchronized { + _clustersConnected = count + } def setClusterInfo( id: String, name: Option[String], version: Option[String] - ): Unit = { + ): Unit = clusterInfoLock.synchronized { _clusterId = Some(id) _clusterName = name _clusterVersion = version } - // --- Read method (called by AutoRefreshStrategy.doScheduleRefresh) --- - - def collect(metrics: MetricsApi): TelemetryData = TelemetryData( - queriesTotal = _queriesTotal.get(), - mvsActive = _mvsActive, - clustersConnected = _clustersConnected, - clusterId = _clusterId, - clusterName = _clusterName, - clusterVersion = _clusterVersion, - aggregatedMetrics = metrics.getAggregatedMetrics - ) + // --- Read methods (called by AutoRefreshStrategy.doScheduleRefresh) --- + + def collect(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized { + TelemetryData( + queriesTotal = _queriesTotal.get(), + mvsActive = _mvsActive, + clustersConnected = _clustersConnected, + clusterId = _clusterId, + clusterName = _clusterName, + clusterVersion = _clusterVersion, + aggregatedMetrics = metrics.getAggregatedMetrics + ) + } + + /** Collect a snapshot and atomically reset the metrics, ensuring no operations recorded between + * snapshot and reset are lost. + */ + def collectAndReset(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized { + TelemetryData( + queriesTotal = _queriesTotal.get(), + mvsActive = _mvsActive, + clustersConnected = _clustersConnected, + clusterId = _clusterId, + clusterName = _clusterName, + clusterVersion = _clusterVersion, + aggregatedMetrics = metrics.collectAndResetAggregatedMetrics + ) + } } object TelemetryCollector { diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala index e57f6485..7687a6df 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala @@ -253,6 +253,19 @@ trait MetricsApi { */ //format:on def resetMetrics(): Unit + + /** Atomically collect aggregated metrics and reset them in one operation. This avoids the race + * window between `getAggregatedMetrics` and `resetMetrics()` where concurrent `recordOperation` + * calls could be lost. + * + * Default implementation is non-atomic (collect then reset). Concrete implementations should + * override with an atomic swap. + */ + def collectAndResetAggregatedMetrics: AggregatedMetrics = { + val result = getAggregatedMetrics + resetMetrics() + result + } } object MetricsApi { @@ -269,5 +282,6 @@ object MetricsApi { override def getMetricsByIndex(index: String): Option[OperationMetrics] = None override def getAggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty override def resetMetrics(): Unit = () + override def collectAndResetAggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty } } diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala index 7b5b5f73..b2fb5029 100644 --- a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala @@ -16,7 +16,7 @@ package app.softnetwork.elastic.licensing -import app.softnetwork.elastic.licensing.metrics.{AggregatedMetrics, MetricsApi} +import app.softnetwork.elastic.licensing.metrics.{AggregatedMetrics, MetricsApi, OperationMetrics} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -96,6 +96,42 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { data.aggregatedMetrics shouldBe AggregatedMetrics.empty } + // --- collectAndReset --- + + it should "atomically collect and reset aggregated metrics" in { + val collector = new TelemetryCollector + collector.incrementQueries() + collector.incrementQueries() + collector.setMvsActive(3) + collector.setClusterInfo("uuid-1", Some("cluster-1"), Some("8.0.0")) + + // Create a mock MetricsApi that tracks reset calls + var resetCalled = false + val mockMetrics = new MetricsApi { + override def recordOperation( + operation: String, + duration: Long, + success: Boolean, + index: Option[String] + ): Unit = () + override def getMetrics: OperationMetrics = OperationMetrics.empty + override def getMetricsByOperation(operation: String): Option[OperationMetrics] = None + override def getMetricsByIndex(index: String): Option[OperationMetrics] = None + override def getAggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty + override def resetMetrics(): Unit = { resetCalled = true } + override def collectAndResetAggregatedMetrics: AggregatedMetrics = { + resetCalled = true + AggregatedMetrics.empty + } + } + + val data = collector.collectAndReset(mockMetrics) + data.queriesTotal shouldBe 2 + data.mvsActive shouldBe 3 + data.clusterId shouldBe Some("uuid-1") + resetCalled shouldBe true + } + // --- concurrent access --- it should "handle concurrent increments safely" in { diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/ClusterApiSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/ClusterApiSpec.scala new file mode 100644 index 00000000..2cc19ff3 --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/ClusterApiSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticSuccess} +import app.softnetwork.elastic.scalatest.ElasticTestKit +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import org.slf4j.{Logger, LoggerFactory} + +trait ClusterApiSpec extends AnyFlatSpecLike with Matchers { + self: ElasticTestKit => + + lazy val log: Logger = LoggerFactory getLogger getClass.getName + + def client: ClusterApi + + "clusterName" should "return a non-empty cluster name" in { + client.clusterName match { + case ElasticSuccess(name) => + name should not be empty + case ElasticFailure(error) => + fail(s"Expected cluster name but got failure: ${error.message}") + } + } + + it should "return a cached result on subsequent calls" in { + val first = client.clusterName + val second = client.clusterName + first shouldBe second + } + + "clusterUuid" should "return a non-empty cluster UUID" in { + client.clusterUuid match { + case ElasticSuccess(uuid) => + uuid should not be empty + case ElasticFailure(error) => + fail(s"Expected cluster UUID but got failure: ${error.message}") + } + } + + it should "return a cached result on subsequent calls" in { + val first = client.clusterUuid + val second = client.clusterUuid + first shouldBe second + } + + it should "return a UUID different from the cluster name" in { + (client.clusterName, client.clusterUuid) match { + case (ElasticSuccess(name), ElasticSuccess(uuid)) => + uuid should not equal name + case _ => + // If either fails, skip this assertion (tested individually above) + } + } +} diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala index b13019b8..60213b9d 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala @@ -94,6 +94,9 @@ trait MockElasticClientApi extends NopeClientApi { override private[client] def executeGetClusterName(): ElasticResult[String] = ElasticResult.success("test-cluster") + override private[client] def executeGetClusterUuid(): ElasticResult[String] = + ElasticResult.success("test-cluster-uuid") + // ==================== IndicesApi ==================== override private[client] def executeCreateIndex(