diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala index f53f4e67..b37c58e0 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala @@ -17,6 +17,7 @@ package app.softnetwork.elastic.client import app.softnetwork.common.ClientCompanion +import app.softnetwork.elastic.licensing.metrics.MetricsApi import com.typesafe.config.{Config, ConfigFactory} import org.slf4j.Logger @@ -54,6 +55,8 @@ trait ElasticClientApi protected def logger: Logger + def metrics: MetricsApi = MetricsApi.Noop + def config: Config = ConfigFactory.load() final lazy val elasticConfig: ElasticConfig = ElasticConfig(config) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala index d69bc9fb..f369b573 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionApi.scala @@ -16,16 +16,40 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.licensing.{LicenseManager, LicenseManagerFactory} +import app.softnetwork.elastic.client.result.ElasticSuccess +import app.softnetwork.elastic.licensing.{ + LicenseManager, + LicenseRefreshStrategy, + LicenseRefreshStrategyFactory +} trait ExtensionApi { self: ElasticClientApi => - /** License manager resolved via LicenseManagerFactory. The factory uses SPI discovery and derives - * LicenseMode from config (refreshEnabled=true -> LongRunning, else -> Driver). + /** License refresh strategy resolved via LicenseRefreshStrategyFactory. The factory uses SPI + * discovery and derives LicenseMode from config (refreshEnabled=true -> LongRunning, else -> + * Driver). Cluster info is set once during initialization. */ - lazy val licenseManager: LicenseManager = LicenseManagerFactory.create(config) + lazy val licenseRefreshStrategy: LicenseRefreshStrategy = { + val ret = LicenseRefreshStrategyFactory.create(config, metrics) + clusterName match { + case ElasticSuccess(name) => + ret.telemetryCollector.setClusterInfo( + id = name, + name = Some(name), + version = version match { + case ElasticSuccess(v) => Some(v) + case _ => None + } + ) + case _ => + } + ret + } + + /** License manager resolved from the refresh strategy. */ + lazy val licenseManager: LicenseManager = licenseRefreshStrategy.licenseManager /** Extension registry (lazy loaded) */ lazy val extensionRegistry: ExtensionRegistry = - new ExtensionRegistry(config, licenseManager) + new ExtensionRegistry(config, licenseRefreshStrategy) } diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionRegistry.scala b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionRegistry.scala index d10c17ea..2ccdf903 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionRegistry.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionRegistry.scala @@ -16,7 +16,7 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.licensing.LicenseManager +import app.softnetwork.elastic.licensing.LicenseRefreshStrategy import app.softnetwork.elastic.sql.query.Statement import com.typesafe.config.Config @@ -24,7 +24,7 @@ import com.typesafe.config.Config */ class ExtensionRegistry( config: Config, - licenseManager: LicenseManager + licenseRefreshStrategy: LicenseRefreshStrategy ) { import scala.jdk.CollectionConverters._ @@ -42,7 +42,7 @@ class ExtensionRegistry( s"🔌 Discovered extension: ${ext.extensionName} v${ext.version} (priority: ${ext.priority})" ) - ext.initialize(config, licenseManager) match { + ext.initialize(config, licenseRefreshStrategy) match { case Right(_) => logger.info(s"✅ Extension ${ext.extensionName} initialized successfully") Some(ext) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionSpi.scala b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionSpi.scala index 52576194..337db874 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ExtensionSpi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ExtensionSpi.scala @@ -18,7 +18,7 @@ package app.softnetwork.elastic.client import akka.actor.ActorSystem import app.softnetwork.elastic.client.result._ -import app.softnetwork.elastic.licensing.LicenseManager +import app.softnetwork.elastic.licensing.LicenseRefreshStrategy import app.softnetwork.elastic.sql.query.Statement import com.typesafe.config.Config @@ -49,18 +49,18 @@ trait ExtensionSpi { */ def priority: Int = 100 - /** Initialize the extension with configuration and license manager. + /** Initialize the extension with configuration and license refresh strategy. * * @param config * Extension-specific configuration - * @param licenseManager - * License manager for checking quotas + * @param licenseRefreshStrategy + * License refresh strategy providing licenseManager, telemetryCollector, and metrics * @return * Success or error message */ def initialize( config: Config, - licenseManager: LicenseManager + licenseRefreshStrategy: LicenseRefreshStrategy ): Either[String, Unit] /** Check if this extension can handle a given SQL statement. diff --git a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala index de04f7d3..13157fb7 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala @@ -17,20 +17,7 @@ package app.softnetwork.elastic.client import akka.actor.ActorSystem -import app.softnetwork.elastic.licensing.{ - CommunityLicenseManager, - Feature, - GraceStatus, - InvalidLicense, - LicenseError, - LicenseKey, - LicenseManager, - LicenseManagerFactory, - LicenseRefreshStrategy, - LicenseType, - NopRefreshStrategy, - Quota -} +import app.softnetwork.elastic.licensing.{GraceStatus, LicenseRefreshStrategy} import app.softnetwork.elastic.client.result.{ DdlResult, DmlResult, @@ -1744,11 +1731,6 @@ class DdlRouterExecutor( trait GatewayApi extends IndicesApi with ElasticClientHelpers { self: ElasticClientApi => - /** License refresh strategy. Resolved via LicenseManagerFactory (SPI + config-driven mode). - * Override in concrete implementations if custom strategy wiring is needed. - */ - def licenseRefreshStrategy: LicenseRefreshStrategy = LicenseManagerFactory.currentStrategy - lazy val searchExecutor = new SearchExecutor( api = this, logger = logger @@ -1884,7 +1866,13 @@ trait GatewayApi extends IndicesApi with ElasticClientHelpers { statement match { case dql: DqlStatement => logger.debug("🔧 Executing DQL with base executor") - dqlExecutor.execute(dql) + val result = dqlExecutor.execute(dql) + result.foreach { + case ElasticSuccess(_) => + licenseRefreshStrategy.telemetryCollector.incrementQueries() + case _ => + }(system.dispatcher) + result case dml: DmlStatement => logger.debug("🔧 Executing DML with base executor") diff --git a/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDdlExtension.scala b/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDdlExtension.scala index 2241f999..d4ebedcb 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDdlExtension.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDdlExtension.scala @@ -46,10 +46,10 @@ class CoreDdlExtension extends ExtensionSpi { override def initialize( config: Config, - manager: LicenseManager + licenseRefreshStrategy: LicenseRefreshStrategy ): Either[String, Unit] = { logger.info("🔌 Initializing Core DDL extension") - licenseManager = Some(manager) + licenseManager = Some(licenseRefreshStrategy.licenseManager) Right(()) } diff --git a/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala b/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala index ff4b0b4a..ee450094 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala @@ -46,10 +46,10 @@ class CoreDqlExtension extends ExtensionSpi { override def initialize( config: Config, - manager: LicenseManager + licenseRefreshStrategy: LicenseRefreshStrategy ): Either[String, Unit] = { logger.info("🔌 Initializing Core DQL extension") - licenseManager = Some(manager) + licenseManager = Some(licenseRefreshStrategy.licenseManager) Right(()) } diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala index e9344964..9f647001 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala @@ -63,6 +63,8 @@ class MetricsElasticClient( ) extends ElasticClientDelegator with MetricsApi { + override def metrics: MetricsApi = this + // Helper for measuring operations private def measureAsync[T](operation: String, index: Option[String] = None)( block: => Future[T] diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/package.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/package.scala new file mode 100644 index 00000000..e811e3d6 --- /dev/null +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/package.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.client + +package object metrics { + type OperationMetrics = app.softnetwork.elastic.licensing.metrics.OperationMetrics + val OperationMetrics = app.softnetwork.elastic.licensing.metrics.OperationMetrics + + type AggregatedMetrics = app.softnetwork.elastic.licensing.metrics.AggregatedMetrics + val AggregatedMetrics = app.softnetwork.elastic.licensing.metrics.AggregatedMetrics + + type MetricsApi = app.softnetwork.elastic.licensing.metrics.MetricsApi + val MetricsApi = app.softnetwork.elastic.licensing.metrics.MetricsApi +} diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/CommunityLicenseManagerSpi.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/CommunityLicenseManagerSpi.scala index a9062304..8da4ef00 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/CommunityLicenseManagerSpi.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/CommunityLicenseManagerSpi.scala @@ -16,6 +16,7 @@ package app.softnetwork.elastic.licensing +import app.softnetwork.elastic.licensing.metrics.MetricsApi import com.typesafe.config.Config /** Fallback SPI that provides Community-tier licensing with no external dependencies. Priority @@ -26,6 +27,7 @@ class CommunityLicenseManagerSpi extends LicenseManagerSpi { override def priority: Int = Int.MaxValue override protected def buildStrategy( config: Config, - mode: Option[LicenseMode] + mode: Option[LicenseMode], + metrics: MetricsApi ): LicenseRefreshStrategy = new NopRefreshStrategy() } diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerFactory.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerFactory.scala index 9516c78a..3d19199a 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerFactory.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerFactory.scala @@ -16,128 +16,20 @@ package app.softnetwork.elastic.licensing +import app.softnetwork.elastic.licensing.metrics.MetricsApi import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging -import java.util.ServiceLoader -import java.util.concurrent.atomic.AtomicReference -import scala.jdk.CollectionConverters._ - -/** Factory for creating and caching LicenseRefreshStrategy instances. - * - * Owns the full strategy lifecycle: SPI discovery -> build -> initialize -> cache. The LicenseMode - * is derived from configuration (`refreshEnabled=true` -> LongRunning, else -> Driver). +/** Factory for creating LicenseManager instances. * - * Thread-safe: all AtomicReference mutations use compareAndSet. + * Delegates to `LicenseRefreshStrategyFactory` for strategy lifecycle management and returns the + * strategy's `licenseManager`. */ object LicenseManagerFactory extends LazyLogging { - private val _strategy: AtomicReference[Option[LicenseRefreshStrategy]] = - new AtomicReference(None) - /** Create a LicenseManager from config. Derives LicenseMode from config. Resolves the best SPI, - * builds the strategy, initializes it, and caches it. - */ - def create(config: Config): LicenseManager = - resolveStrategy(config).licenseManager - - /** Get the cached strategy (resolved during create()). Falls back to NopRefreshStrategy. */ - def currentStrategy: LicenseRefreshStrategy = - _strategy.get().getOrElse(new NopRefreshStrategy()) - - /** Replace the cached strategy. Initializes the new strategy before caching. Use for license - * upgrade/downgrade at runtime. Shuts down the old strategy before replacing. - */ - def setStrategy(strategy: LicenseRefreshStrategy): Unit = { - strategy.initialize() - var old: Option[LicenseRefreshStrategy] = None - var updated = false - while (!updated) { - val current = _strategy.get() - if (_strategy.compareAndSet(current, Some(strategy))) { - old = current - updated = true - } - } - old.foreach(_.shutdown()) - } - - /** Shutdown the cached strategy's background resources (if any) and clear the cache. Called - * during process shutdown to stop the refresh scheduler. - */ - def shutdown(): Unit = { - var old: Option[LicenseRefreshStrategy] = None - var updated = false - while (!updated) { - val current = _strategy.get() - if (_strategy.compareAndSet(current, None)) { - old = current - updated = true - } - } - old.foreach(_.shutdown()) - } - - /** Reset cached strategy (for testing). Uses CAS to ensure atomic clear. */ - def reset(): Unit = { - var updated = false - while (!updated) { - val current = _strategy.get() - updated = _strategy.compareAndSet(current, None) - } - } - - /** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. */ - private def resolveMode(config: Config): Option[LicenseMode] = { - val licenseConfig = LicenseConfig.load(config) - if (licenseConfig.refreshEnabled) Some(LicenseMode.LongRunning) - else Some(LicenseMode.Driver) - } - - /** Resolve strategy via SPI, initialize it, and cache it. Synchronized to prevent concurrent - * creation of duplicate strategies (which would leak resources like Akka schedulers). + * builds the strategy, initializes it, caches it, and returns the strategy's licenseManager. */ - private def resolveStrategy(config: Config): LicenseRefreshStrategy = - _strategy.get() match { - case Some(s) => s - case None => - synchronized { - // Double-check after acquiring lock - _strategy.get() match { - case Some(s) => s - case None => - val mode = resolveMode(config) - val loader = ServiceLoader.load(classOf[LicenseManagerSpi]) - val spis = loader.iterator().asScala.toSeq.sortBy(_.priority) - val strategy = spis.headOption - .map { spi => - try { - val s = spi.createStrategy(config, mode) - s.initialize() - logger.info( - s"License strategy initialized: ${s.getClass.getSimpleName} " + - s"(mode=${mode.getOrElse("default")}, type=${s.licenseManager.licenseType})" - ) - s - } catch { - case e: Exception => - logger.error( - s"Failed to create license strategy from ${spi.getClass.getName}: ${e.getMessage}", - e - ) - val fallback = new NopRefreshStrategy() - fallback.initialize() - fallback - } - } - .getOrElse { - val fallback = new NopRefreshStrategy() - fallback.initialize() - fallback - } - _strategy.set(Some(strategy)) - strategy - } - } - } + def create(config: Config, metrics: MetricsApi = MetricsApi.Noop): LicenseManager = + LicenseRefreshStrategyFactory.create(config, metrics).licenseManager } diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerSpi.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerSpi.scala index eea1e4f1..9cd4f732 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerSpi.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseManagerSpi.scala @@ -16,6 +16,7 @@ package app.softnetwork.elastic.licensing +import app.softnetwork.elastic.licensing.metrics.MetricsApi import com.typesafe.config.Config /** Runtime context that determines licensing behavior. @@ -72,24 +73,33 @@ trait LicenseManagerSpi { * @return * A freshly constructed (not initialized) LicenseRefreshStrategy */ - protected def buildStrategy(config: Config, mode: Option[LicenseMode]): LicenseRefreshStrategy + protected def buildStrategy( + config: Config, + mode: Option[LicenseMode], + metrics: MetricsApi = MetricsApi.Noop + ): LicenseRefreshStrategy /** Create a LicenseManager by building a strategy and returning its licenseManager. * * Note: the strategy is NOT initialized here — callers who need full lifecycle management should * use `LicenseManagerFactory.create()` instead. */ - def create(config: Config, mode: Option[LicenseMode] = None): LicenseManager = - buildStrategy(config, mode).licenseManager + def create( + config: Config, + mode: Option[LicenseMode] = None, + metrics: MetricsApi = MetricsApi.Noop + ): LicenseManager = + buildStrategy(config, mode, metrics).licenseManager /** Create a LicenseRefreshStrategy directly (not initialized, not cached). * - * Returns the full strategy object, enabling `LicenseManagerFactory` to manage its lifecycle - * (initialize, cache, replace). + * Returns the full strategy object, enabling `LicenseRefreshStrategyFactory` to manage its + * lifecycle (initialize, cache, replace). */ def createStrategy( config: Config, - mode: Option[LicenseMode] = None + mode: Option[LicenseMode] = None, + metrics: MetricsApi = MetricsApi.Noop ): LicenseRefreshStrategy = - buildStrategy(config, mode) + buildStrategy(config, mode, metrics) } diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategy.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategy.scala index 53fe1731..000f6831 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategy.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategy.scala @@ -16,6 +16,8 @@ package app.softnetwork.elastic.licensing +import app.softnetwork.elastic.licensing.metrics.MetricsApi + /** Strategy for license lifecycle management (initialization and refresh). * * Concrete implementations: @@ -42,6 +44,9 @@ trait LicenseRefreshStrategy { */ def telemetryCollector: TelemetryCollector = TelemetryCollector.Noop + /** Metrics API for operation-level performance tracking. Default returns `MetricsApi.Noop`. */ + def metrics: MetricsApi = MetricsApi.Noop + /** Shutdown background resources (scheduler, etc.). Default is no-op. */ def shutdown(): Unit = () } diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala new file mode 100644 index 00000000..1dc159aa --- /dev/null +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.licensing + +import app.softnetwork.elastic.licensing.metrics.MetricsApi +import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging + +import java.util.ServiceLoader +import java.util.concurrent.atomic.AtomicReference +import scala.jdk.CollectionConverters._ + +/** Factory for creating and caching LicenseRefreshStrategy instances. + * + * Owns the full strategy lifecycle: SPI discovery -> build -> initialize -> cache. The LicenseMode + * is derived from configuration (`refreshEnabled=true` -> LongRunning, else -> Driver). + * + * Thread-safe: all AtomicReference mutations use compareAndSet. + */ +object LicenseRefreshStrategyFactory extends LazyLogging { + + private val _strategy: AtomicReference[Option[LicenseRefreshStrategy]] = + new AtomicReference(None) + + /** Create a LicenseRefreshStrategy from config. Derives LicenseMode from config. Resolves the + * best SPI, builds the strategy, initializes it, and caches it. + */ + def create(config: Config, metrics: MetricsApi = MetricsApi.Noop): LicenseRefreshStrategy = + resolveStrategy(config, metrics) + + /** Replace the cached strategy. Initializes the new strategy before caching. Use for license + * upgrade/downgrade at runtime. Shuts down the old strategy before replacing. + */ + def setStrategy(strategy: LicenseRefreshStrategy): Unit = { + strategy.initialize() + var old: Option[LicenseRefreshStrategy] = None + var updated = false + while (!updated) { + val current = _strategy.get() + if (_strategy.compareAndSet(current, Some(strategy))) { + old = current + updated = true + } + } + old.foreach(_.shutdown()) + } + + /** Shutdown the cached strategy's background resources (if any) and clear the cache. Called + * during process shutdown to stop the refresh scheduler. + */ + def shutdown(): Unit = { + var old: Option[LicenseRefreshStrategy] = None + var updated = false + while (!updated) { + val current = _strategy.get() + if (_strategy.compareAndSet(current, None)) { + old = current + updated = true + } + } + old.foreach(_.shutdown()) + } + + /** Reset cached strategy (for testing). Uses CAS to ensure atomic clear. */ + def reset(): Unit = { + var updated = false + while (!updated) { + val current = _strategy.get() + updated = _strategy.compareAndSet(current, None) + } + } + + /** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. */ + private def resolveMode(config: Config): Option[LicenseMode] = { + val licenseConfig = LicenseConfig.load(config) + if (licenseConfig.refreshEnabled) Some(LicenseMode.LongRunning) + else Some(LicenseMode.Driver) + } + + /** Resolve strategy via SPI, initialize it, and cache it. Synchronized to prevent concurrent + * creation of duplicate strategies (which would leak resources like Akka schedulers). + */ + private def resolveStrategy( + config: Config, + metrics: MetricsApi + ): LicenseRefreshStrategy = + _strategy.get() match { + case Some(s) => s + case None => + synchronized { + // Double-check after acquiring lock + _strategy.get() match { + case Some(s) => s + case None => + val mode = resolveMode(config) + val loader = ServiceLoader.load(classOf[LicenseManagerSpi]) + val spis = loader.iterator().asScala.toSeq.sortBy(_.priority) + val strategy = spis.headOption + .map { spi => + try { + val s = spi.createStrategy(config, mode, metrics) + s.initialize() + logger.info( + s"License strategy initialized: ${s.getClass.getSimpleName} " + + s"(mode=${mode.getOrElse("default")}, type=${s.licenseManager.licenseType})" + ) + s + } catch { + case e: Exception => + logger.error( + s"Failed to create license strategy from ${spi.getClass.getName}: ${e.getMessage}", + e + ) + val fallback = new NopRefreshStrategy() + fallback.initialize() + fallback + } + } + .getOrElse { + val fallback = new NopRefreshStrategy() + fallback.initialize() + fallback + } + _strategy.set(Some(strategy)) + strategy + } + } + } +} diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala index 0b265b5d..7bec992b 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala @@ -16,6 +16,8 @@ package app.softnetwork.elastic.licensing +import app.softnetwork.elastic.licensing.metrics.{AggregatedMetrics, MetricsApi} + import java.util.concurrent.atomic.AtomicLong /** Snapshot of runtime telemetry for inclusion in refresh requests. All counters are cumulative @@ -28,43 +30,60 @@ import java.util.concurrent.atomic.AtomicLong */ case class TelemetryData( queriesTotal: Long = 0, - joinsTotal: Long = 0, mvsActive: Int = 0, - clustersConnected: Int = 0 + clustersConnected: Int = 0, + clusterId: Option[String] = None, + clusterName: Option[String] = None, + clusterVersion: Option[String] = None, + aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty ) /** Mutable telemetry collector with atomic counters. * - * Accessible (read-only snapshot) via `LicenseRefreshStrategy.telemetryCollector.collect()`. - * Extensions (e.g. CoreDqlExtension) update counters via increment/set methods through - * `LicenseManagerFactory.currentStrategy.telemetryCollector`. + * Accessible (read-only snapshot) via + * `LicenseRefreshStrategy.telemetryCollector.collect(metrics)`. Extensions (e.g. CoreDqlExtension) + * update counters via increment/set methods through + * `LicenseRefreshStrategyFactory.create(config).telemetryCollector`. * * Thread-safe: counters use `AtomicLong`, gauges use `@volatile`. */ class TelemetryCollector { private val _queriesTotal = new AtomicLong(0L) - private val _joinsTotal = new AtomicLong(0L) @volatile private var _mvsActive: Int = 0 @volatile private var _clustersConnected: Int = 0 + @volatile private var _clusterId: Option[String] = None + @volatile private var _clusterName: Option[String] = None + @volatile private var _clusterVersion: Option[String] = None // --- Write methods (called by extensions) --- def incrementQueries(): Unit = { val _ = _queriesTotal.incrementAndGet() } - def incrementJoins(): Unit = { val _ = _joinsTotal.incrementAndGet() } - def setMvsActive(count: Int): Unit = { _mvsActive = count } def setClustersConnected(count: Int): Unit = { _clustersConnected = count } + def setClusterInfo( + id: String, + name: Option[String], + version: Option[String] + ): Unit = { + _clusterId = Some(id) + _clusterName = name + _clusterVersion = version + } + // --- Read method (called by AutoRefreshStrategy.doScheduleRefresh) --- - def collect(): TelemetryData = TelemetryData( + def collect(metrics: MetricsApi): TelemetryData = TelemetryData( queriesTotal = _queriesTotal.get(), - joinsTotal = _joinsTotal.get(), mvsActive = _mvsActive, - clustersConnected = _clustersConnected + clustersConnected = _clustersConnected, + clusterId = _clusterId, + clusterName = _clusterName, + clusterVersion = _clusterVersion, + aggregatedMetrics = metrics.getAggregatedMetrics ) } @@ -76,8 +95,12 @@ object TelemetryCollector { */ val Noop: TelemetryCollector = new TelemetryCollector { override def incrementQueries(): Unit = () - override def incrementJoins(): Unit = () override def setMvsActive(count: Int): Unit = () override def setClustersConnected(count: Int): Unit = () + override def setClusterInfo( + id: String, + name: Option[String], + version: Option[String] + ): Unit = () } } diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/AggregatedMetrics.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/AggregatedMetrics.scala similarity index 78% rename from core/src/main/scala/app/softnetwork/elastic/client/metrics/AggregatedMetrics.scala rename to licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/AggregatedMetrics.scala index a0f52302..388ceb39 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/AggregatedMetrics.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/AggregatedMetrics.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package app.softnetwork.elastic.client.metrics +package app.softnetwork.elastic.licensing.metrics case class AggregatedMetrics( totalOperations: Long, @@ -30,3 +30,14 @@ case class AggregatedMetrics( def successRate: Double = if (totalOperations > 0) (successCount.toDouble / totalOperations) * 100 else 0.0 } + +object AggregatedMetrics { + val empty: AggregatedMetrics = AggregatedMetrics( + totalOperations = 0, + successCount = 0, + failureCount = 0, + totalDuration = 0, + operationMetrics = Map.empty, + indexMetrics = Map.empty + ) +} diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsApi.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala similarity index 92% rename from core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsApi.scala rename to licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala index 520703ac..e57f6485 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsApi.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/MetricsApi.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package app.softnetwork.elastic.client.metrics +package app.softnetwork.elastic.licensing.metrics import scala.language.implicitConversions @@ -254,3 +254,20 @@ trait MetricsApi { //format:on def resetMetrics(): Unit } + +object MetricsApi { + + val Noop: MetricsApi = new MetricsApi { + override def recordOperation( + operation: String, + duration: Long, + success: Boolean, + index: Option[String] + ): Unit = () + override def getMetrics: OperationMetrics = OperationMetrics.empty + override def getMetricsByOperation(operation: String): Option[OperationMetrics] = None + override def getMetricsByIndex(index: String): Option[OperationMetrics] = None + override def getAggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty + override def resetMetrics(): Unit = () + } +} diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/OperationMetrics.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/OperationMetrics.scala similarity index 88% rename from core/src/main/scala/app/softnetwork/elastic/client/metrics/OperationMetrics.scala rename to licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/OperationMetrics.scala index ca0611eb..7b52b6e9 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/OperationMetrics.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/metrics/OperationMetrics.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package app.softnetwork.elastic.client.metrics +package app.softnetwork.elastic.licensing.metrics case class OperationMetrics( operation: String, @@ -34,3 +34,7 @@ case class OperationMetrics( def failureRate: Double = if (totalOperations > 0) 100.0 - successRate else 0.0 } + +object OperationMetrics { + val empty: OperationMetrics = OperationMetrics("", 0, 0, 0, 0, 0, 0, 0) +} diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseManagerSpiSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseManagerSpiSpec.scala index 0371c846..31238905 100644 --- a/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseManagerSpiSpec.scala +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseManagerSpiSpec.scala @@ -20,6 +20,7 @@ import java.util.ServiceLoader import scala.jdk.CollectionConverters._ +import app.softnetwork.elastic.licensing.{metrics => lm} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -75,7 +76,8 @@ class LicenseManagerSpiSpec extends AnyFlatSpec with Matchers { override def priority: Int = 10 override protected def buildStrategy( config: Config, - mode: Option[LicenseMode] + mode: Option[LicenseMode], + metrics: lm.MetricsApi ): LicenseRefreshStrategy = new NopRefreshStrategy() { override def licenseManager: LicenseManager = new CommunityLicenseManager() { override def licenseType: LicenseType = LicenseType.Pro @@ -99,7 +101,8 @@ class LicenseManagerSpiSpec extends AnyFlatSpec with Matchers { override def priority: Int = 1 override protected def buildStrategy( config: Config, - mode: Option[LicenseMode] + mode: Option[LicenseMode], + metrics: lm.MetricsApi ): LicenseRefreshStrategy = throw new RuntimeException("boom") } @@ -114,7 +117,8 @@ class LicenseManagerSpiSpec extends AnyFlatSpec with Matchers { override def priority: Int = 1 override protected def buildStrategy( config: Config, - mode: Option[LicenseMode] + mode: Option[LicenseMode], + metrics: lm.MetricsApi ): LicenseRefreshStrategy = { receivedMode = mode new NopRefreshStrategy() diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala index b7106636..7b5b5f73 100644 --- a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala @@ -16,16 +16,25 @@ package app.softnetwork.elastic.licensing +import app.softnetwork.elastic.licensing.metrics.{AggregatedMetrics, MetricsApi} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { + private val noop = MetricsApi.Noop + // --- Noop collector --- "TelemetryCollector.Noop" should "return zero-valued TelemetryData" in { - val data = TelemetryCollector.Noop.collect() - data shouldBe TelemetryData(0, 0, 0, 0) + val data = TelemetryCollector.Noop.collect(noop) + data.queriesTotal shouldBe 0 + data.mvsActive shouldBe 0 + data.clustersConnected shouldBe 0 + data.clusterId shouldBe None + data.clusterName shouldBe None + data.clusterVersion shouldBe None + data.aggregatedMetrics shouldBe AggregatedMetrics.empty } // --- incrementQueries --- @@ -35,16 +44,7 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { collector.incrementQueries() collector.incrementQueries() collector.incrementQueries() - collector.collect().queriesTotal shouldBe 3 - } - - // --- incrementJoins --- - - it should "increment joins counter atomically" in { - val collector = new TelemetryCollector - collector.incrementJoins() - collector.incrementJoins() - collector.collect().joinsTotal shouldBe 2 + collector.collect(noop).queriesTotal shouldBe 3 } // --- setMvsActive --- @@ -52,9 +52,9 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { it should "set MVs active gauge" in { val collector = new TelemetryCollector collector.setMvsActive(5) - collector.collect().mvsActive shouldBe 5 + collector.collect(noop).mvsActive shouldBe 5 collector.setMvsActive(3) - collector.collect().mvsActive shouldBe 3 + collector.collect(noop).mvsActive shouldBe 3 } // --- setClustersConnected --- @@ -62,7 +62,18 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { it should "set clusters connected gauge" in { val collector = new TelemetryCollector collector.setClustersConnected(2) - collector.collect().clustersConnected shouldBe 2 + collector.collect(noop).clustersConnected shouldBe 2 + } + + // --- setClusterInfo --- + + it should "set cluster info" in { + val collector = new TelemetryCollector + collector.setClusterInfo("my-cluster", Some("prod-cluster-1"), Some("8.18.3")) + val data = collector.collect(noop) + data.clusterId shouldBe Some("my-cluster") + data.clusterName shouldBe Some("prod-cluster-1") + data.clusterVersion shouldBe Some("8.18.3") } // --- collect returns consistent snapshot --- @@ -71,15 +82,18 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { val collector = new TelemetryCollector collector.incrementQueries() collector.incrementQueries() - collector.incrementJoins() collector.setMvsActive(4) collector.setClustersConnected(1) + collector.setClusterInfo("test-cluster", Some("test"), Some("9.0.0")) - val data = collector.collect() + val data = collector.collect(noop) data.queriesTotal shouldBe 2 - data.joinsTotal shouldBe 1 data.mvsActive shouldBe 4 data.clustersConnected shouldBe 1 + data.clusterId shouldBe Some("test-cluster") + data.clusterName shouldBe Some("test") + data.clusterVersion shouldBe Some("9.0.0") + data.aggregatedMetrics shouldBe AggregatedMetrics.empty } // --- concurrent access --- @@ -90,15 +104,13 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { new Thread(() => { (1 to 1000).foreach { _ => collector.incrementQueries() - collector.incrementJoins() } }) } threads.foreach(_.start()) threads.foreach(_.join()) - val data = collector.collect() + val data = collector.collect(noop) data.queriesTotal shouldBe 10000 - data.joinsTotal shouldBe 10000 } }