Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package app.softnetwork.elastic.client

import app.softnetwork.common.ClientCompanion
import app.softnetwork.elastic.licensing.metrics.MetricsApi
import com.typesafe.config.{Config, ConfigFactory}
import org.slf4j.Logger

Expand Down Expand Up @@ -54,6 +55,8 @@ trait ElasticClientApi

protected def logger: Logger

def metrics: MetricsApi = MetricsApi.Noop

def config: Config = ConfigFactory.load()

final lazy val elasticConfig: ElasticConfig = ElasticConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,40 @@

package app.softnetwork.elastic.client

import app.softnetwork.elastic.licensing.{LicenseManager, LicenseManagerFactory}
import app.softnetwork.elastic.client.result.ElasticSuccess
import app.softnetwork.elastic.licensing.{
LicenseManager,
LicenseRefreshStrategy,
LicenseRefreshStrategyFactory
}

trait ExtensionApi { self: ElasticClientApi =>

/** License manager resolved via LicenseManagerFactory. The factory uses SPI discovery and derives
* LicenseMode from config (refreshEnabled=true -> LongRunning, else -> Driver).
/** License refresh strategy resolved via LicenseRefreshStrategyFactory. The factory uses SPI
* discovery and derives LicenseMode from config (refreshEnabled=true -> LongRunning, else ->
* Driver). Cluster info is set once during initialization.
*/
lazy val licenseManager: LicenseManager = LicenseManagerFactory.create(config)
lazy val licenseRefreshStrategy: LicenseRefreshStrategy = {
val ret = LicenseRefreshStrategyFactory.create(config, metrics)
clusterName match {
case ElasticSuccess(name) =>
ret.telemetryCollector.setClusterInfo(
id = name,
name = Some(name),
version = version match {
case ElasticSuccess(v) => Some(v)
case _ => None
}
)
case _ =>
}
ret
}

/** License manager resolved from the refresh strategy. */
lazy val licenseManager: LicenseManager = licenseRefreshStrategy.licenseManager

/** Extension registry (lazy loaded) */
lazy val extensionRegistry: ExtensionRegistry =
new ExtensionRegistry(config, licenseManager)
new ExtensionRegistry(config, licenseRefreshStrategy)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package app.softnetwork.elastic.client

import app.softnetwork.elastic.licensing.LicenseManager
import app.softnetwork.elastic.licensing.LicenseRefreshStrategy
import app.softnetwork.elastic.sql.query.Statement
import com.typesafe.config.Config

/** Registry for managing loaded extensions.
*/
class ExtensionRegistry(
config: Config,
licenseManager: LicenseManager
licenseRefreshStrategy: LicenseRefreshStrategy
) {

import scala.jdk.CollectionConverters._
Expand All @@ -42,7 +42,7 @@ class ExtensionRegistry(
s"🔌 Discovered extension: ${ext.extensionName} v${ext.version} (priority: ${ext.priority})"
)

ext.initialize(config, licenseManager) match {
ext.initialize(config, licenseRefreshStrategy) match {
case Right(_) =>
logger.info(s"✅ Extension ${ext.extensionName} initialized successfully")
Some(ext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package app.softnetwork.elastic.client

import akka.actor.ActorSystem
import app.softnetwork.elastic.client.result._
import app.softnetwork.elastic.licensing.LicenseManager
import app.softnetwork.elastic.licensing.LicenseRefreshStrategy
import app.softnetwork.elastic.sql.query.Statement
import com.typesafe.config.Config

Expand Down Expand Up @@ -49,18 +49,18 @@ trait ExtensionSpi {
*/
def priority: Int = 100

/** Initialize the extension with configuration and license manager.
/** Initialize the extension with configuration and license refresh strategy.
*
* @param config
* Extension-specific configuration
* @param licenseManager
* License manager for checking quotas
* @param licenseRefreshStrategy
* License refresh strategy providing licenseManager, telemetryCollector, and metrics
* @return
* Success or error message
*/
def initialize(
config: Config,
licenseManager: LicenseManager
licenseRefreshStrategy: LicenseRefreshStrategy
): Either[String, Unit]

/** Check if this extension can handle a given SQL statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,7 @@
package app.softnetwork.elastic.client

import akka.actor.ActorSystem
import app.softnetwork.elastic.licensing.{
CommunityLicenseManager,
Feature,
GraceStatus,
InvalidLicense,
LicenseError,
LicenseKey,
LicenseManager,
LicenseManagerFactory,
LicenseRefreshStrategy,
LicenseType,
NopRefreshStrategy,
Quota
}
import app.softnetwork.elastic.licensing.{GraceStatus, LicenseRefreshStrategy}
import app.softnetwork.elastic.client.result.{
DdlResult,
DmlResult,
Expand Down Expand Up @@ -1744,11 +1731,6 @@ class DdlRouterExecutor(
trait GatewayApi extends IndicesApi with ElasticClientHelpers {
self: ElasticClientApi =>

/** License refresh strategy. Resolved via LicenseManagerFactory (SPI + config-driven mode).
* Override in concrete implementations if custom strategy wiring is needed.
*/
def licenseRefreshStrategy: LicenseRefreshStrategy = LicenseManagerFactory.currentStrategy

lazy val searchExecutor = new SearchExecutor(
api = this,
logger = logger
Expand Down Expand Up @@ -1884,7 +1866,13 @@ trait GatewayApi extends IndicesApi with ElasticClientHelpers {
statement match {
case dql: DqlStatement =>
logger.debug("🔧 Executing DQL with base executor")
dqlExecutor.execute(dql)
val result = dqlExecutor.execute(dql)
result.foreach {
case ElasticSuccess(_) =>
licenseRefreshStrategy.telemetryCollector.incrementQueries()
case _ =>
}(system.dispatcher)
result

case dml: DmlStatement =>
logger.debug("🔧 Executing DML with base executor")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class CoreDdlExtension extends ExtensionSpi {

override def initialize(
config: Config,
manager: LicenseManager
licenseRefreshStrategy: LicenseRefreshStrategy
): Either[String, Unit] = {
logger.info("🔌 Initializing Core DDL extension")
licenseManager = Some(manager)
licenseManager = Some(licenseRefreshStrategy.licenseManager)
Right(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class CoreDqlExtension extends ExtensionSpi {

override def initialize(
config: Config,
manager: LicenseManager
licenseRefreshStrategy: LicenseRefreshStrategy
): Either[String, Unit] = {
logger.info("🔌 Initializing Core DQL extension")
licenseManager = Some(manager)
licenseManager = Some(licenseRefreshStrategy.licenseManager)
Right(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class MetricsElasticClient(
) extends ElasticClientDelegator
with MetricsApi {

override def metrics: MetricsApi = this

// Helper for measuring operations
private def measureAsync[T](operation: String, index: Option[String] = None)(
block: => Future[T]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2025 SOFTNETWORK
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package app.softnetwork.elastic.client

package object metrics {
type OperationMetrics = app.softnetwork.elastic.licensing.metrics.OperationMetrics
val OperationMetrics = app.softnetwork.elastic.licensing.metrics.OperationMetrics

type AggregatedMetrics = app.softnetwork.elastic.licensing.metrics.AggregatedMetrics
val AggregatedMetrics = app.softnetwork.elastic.licensing.metrics.AggregatedMetrics

type MetricsApi = app.softnetwork.elastic.licensing.metrics.MetricsApi
val MetricsApi = app.softnetwork.elastic.licensing.metrics.MetricsApi
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package app.softnetwork.elastic.licensing

import app.softnetwork.elastic.licensing.metrics.MetricsApi
import com.typesafe.config.Config

/** Fallback SPI that provides Community-tier licensing with no external dependencies. Priority
Expand All @@ -26,6 +27,7 @@ class CommunityLicenseManagerSpi extends LicenseManagerSpi {
override def priority: Int = Int.MaxValue
override protected def buildStrategy(
config: Config,
mode: Option[LicenseMode]
mode: Option[LicenseMode],
metrics: MetricsApi
): LicenseRefreshStrategy = new NopRefreshStrategy()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,128 +16,20 @@

package app.softnetwork.elastic.licensing

import app.softnetwork.elastic.licensing.metrics.MetricsApi
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging

import java.util.ServiceLoader
import java.util.concurrent.atomic.AtomicReference
import scala.jdk.CollectionConverters._

/** Factory for creating and caching LicenseRefreshStrategy instances.
*
* Owns the full strategy lifecycle: SPI discovery -> build -> initialize -> cache. The LicenseMode
* is derived from configuration (`refreshEnabled=true` -> LongRunning, else -> Driver).
/** Factory for creating LicenseManager instances.
*
* Thread-safe: all AtomicReference mutations use compareAndSet.
* Delegates to `LicenseRefreshStrategyFactory` for strategy lifecycle management and returns the
* strategy's `licenseManager`.
*/
object LicenseManagerFactory extends LazyLogging {

private val _strategy: AtomicReference[Option[LicenseRefreshStrategy]] =
new AtomicReference(None)

/** Create a LicenseManager from config. Derives LicenseMode from config. Resolves the best SPI,
* builds the strategy, initializes it, and caches it.
*/
def create(config: Config): LicenseManager =
resolveStrategy(config).licenseManager

/** Get the cached strategy (resolved during create()). Falls back to NopRefreshStrategy. */
def currentStrategy: LicenseRefreshStrategy =
_strategy.get().getOrElse(new NopRefreshStrategy())

/** Replace the cached strategy. Initializes the new strategy before caching. Use for license
* upgrade/downgrade at runtime. Shuts down the old strategy before replacing.
*/
def setStrategy(strategy: LicenseRefreshStrategy): Unit = {
strategy.initialize()
var old: Option[LicenseRefreshStrategy] = None
var updated = false
while (!updated) {
val current = _strategy.get()
if (_strategy.compareAndSet(current, Some(strategy))) {
old = current
updated = true
}
}
old.foreach(_.shutdown())
}

/** Shutdown the cached strategy's background resources (if any) and clear the cache. Called
* during process shutdown to stop the refresh scheduler.
*/
def shutdown(): Unit = {
var old: Option[LicenseRefreshStrategy] = None
var updated = false
while (!updated) {
val current = _strategy.get()
if (_strategy.compareAndSet(current, None)) {
old = current
updated = true
}
}
old.foreach(_.shutdown())
}

/** Reset cached strategy (for testing). Uses CAS to ensure atomic clear. */
def reset(): Unit = {
var updated = false
while (!updated) {
val current = _strategy.get()
updated = _strategy.compareAndSet(current, None)
}
}

/** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. */
private def resolveMode(config: Config): Option[LicenseMode] = {
val licenseConfig = LicenseConfig.load(config)
if (licenseConfig.refreshEnabled) Some(LicenseMode.LongRunning)
else Some(LicenseMode.Driver)
}

/** Resolve strategy via SPI, initialize it, and cache it. Synchronized to prevent concurrent
* creation of duplicate strategies (which would leak resources like Akka schedulers).
* builds the strategy, initializes it, caches it, and returns the strategy's licenseManager.
*/
private def resolveStrategy(config: Config): LicenseRefreshStrategy =
_strategy.get() match {
case Some(s) => s
case None =>
synchronized {
// Double-check after acquiring lock
_strategy.get() match {
case Some(s) => s
case None =>
val mode = resolveMode(config)
val loader = ServiceLoader.load(classOf[LicenseManagerSpi])
val spis = loader.iterator().asScala.toSeq.sortBy(_.priority)
val strategy = spis.headOption
.map { spi =>
try {
val s = spi.createStrategy(config, mode)
s.initialize()
logger.info(
s"License strategy initialized: ${s.getClass.getSimpleName} " +
s"(mode=${mode.getOrElse("default")}, type=${s.licenseManager.licenseType})"
)
s
} catch {
case e: Exception =>
logger.error(
s"Failed to create license strategy from ${spi.getClass.getName}: ${e.getMessage}",
e
)
val fallback = new NopRefreshStrategy()
fallback.initialize()
fallback
}
}
.getOrElse {
val fallback = new NopRefreshStrategy()
fallback.initialize()
fallback
}
_strategy.set(Some(strategy))
strategy
}
}
}
def create(config: Config, metrics: MetricsApi = MetricsApi.Noop): LicenseManager =
LicenseRefreshStrategyFactory.create(config, metrics).licenseManager
}
Loading
Loading