Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.launchdarkly.sdk.internal.events;

import com.launchdarkly.sdk.LDContext;
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.internal.events.EventSummarizer.EventSummary;

import java.util.Collections;
import java.util.List;

/**
* Aggregates events from all contexts into a single summary event.
* <p>
* This implementation combines all flag evaluations across all contexts into one
* summary event (without context information), which is the behavior for server-side SDKs.
* <p>
* Note that the methods of this class are deliberately not thread-safe, because they should
* always be called from EventProcessor's single message-processing thread.
*/
final class AggregatedEventSummarizer implements EventSummarizerInterface {
private final EventSummarizer summarizer;

AggregatedEventSummarizer() {
this.summarizer = new EventSummarizer();
}

@Override
public void summarizeEvent(
long timestamp,
String flagKey,
int flagVersion,
int variation,
LDValue value,
LDValue defaultValue,
LDContext context
) {
summarizer.summarizeEvent(timestamp, flagKey, flagVersion, variation, value, defaultValue, context);
}

@Override
public List<EventSummary> getSummariesAndReset() {
EventSummary summary = summarizer.getSummaryAndReset();
// Always return a list with exactly one summary for consistency with interface
return Collections.singletonList(summary);
}

@Override
public void restoreTo(List<EventSummary> previousSummaries) {
// In aggregated mode, we only restore the first summary (should only be one anyway)
if (!previousSummaries.isEmpty()) {
summarizer.restoreTo(previousSummaries.get(0));
}
}

@Override
public boolean isEmpty() {
return summarizer.isEmpty();
}

@Override
public void clear() {
summarizer.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public Thread newThread(Runnable r) {
// all the workers are busy.
final BlockingQueue<FlushPayload> payloadQueue = new ArrayBlockingQueue<>(1);

final EventBuffer outbox = new EventBuffer(eventsConfig.capacity, logger);
final EventBuffer outbox = new EventBuffer(eventsConfig.capacity, eventsConfig.perContextSummarization, logger);
this.contextDeduplicator = eventsConfig.contextDeduplicator;

Thread mainThread = threadFactory.newThread(new Thread() {
Expand Down Expand Up @@ -608,7 +608,13 @@ private void triggerFlush(EventBuffer outbox, BlockingQueue<FlushPayload> payloa
}
FlushPayload payload = outbox.getPayload();
if (diagnosticStore != null) {
int eventCount = payload.events.length + (payload.summary.isEmpty() ? 0 : 1);
int summaryCount = 0;
for (EventSummary summary : payload.summaries) {
if (!summary.isEmpty()) {
summaryCount++;
}
}
int eventCount = payload.events.length + summaryCount;
diagnosticStore.recordEventsInBatch(eventCount);
}
busyFlushWorkersCount.incrementAndGet();
Expand All @@ -618,7 +624,7 @@ private void triggerFlush(EventBuffer outbox, BlockingQueue<FlushPayload> payloa
} else {
logger.debug("Skipped flushing because all workers are busy");
// All the workers are busy so we can't flush now; keep the events in our state
outbox.summarizer.restoreTo(payload.summary);
outbox.summarizer.restoreTo(payload.summaries);
synchronized(busyFlushWorkersCount) {
busyFlushWorkersCount.decrementAndGet();
busyFlushWorkersCount.notify();
Expand Down Expand Up @@ -661,15 +667,18 @@ public void run() {

private static final class EventBuffer {
final List<Event> events = new ArrayList<>();
final EventSummarizer summarizer = new EventSummarizer();
final EventSummarizerInterface summarizer;
private final int capacity;
private final LDLogger logger;
private boolean capacityExceeded = false;
private long droppedEventCount = 0;

EventBuffer(int capacity, LDLogger logger) {
EventBuffer(int capacity, boolean perContextSummarization, LDLogger logger) {
this.capacity = capacity;
this.logger = logger;
this.summarizer = perContextSummarization
? new PerContextEventSummarizer()
: new AggregatedEventSummarizer();
}

void add(Event e) {
Expand All @@ -694,7 +703,7 @@ void addToSummary(Event.FeatureRequest e) {
e.getValue(),
e.getDefaultVal(),
e.getContext()
);
);
}

boolean isEmpty() {
Expand All @@ -709,8 +718,8 @@ long getAndClearDroppedCount() {

FlushPayload getPayload() {
Event[] eventsOut = events.toArray(new Event[events.size()]);
EventSummarizer.EventSummary summary = summarizer.getSummaryAndReset();
return new FlushPayload(eventsOut, summary);
List<EventSummarizer.EventSummary> summaries = summarizer.getSummariesAndReset();
return new FlushPayload(eventsOut, summaries);
}

void clear() {
Expand All @@ -721,11 +730,11 @@ void clear() {

private static final class FlushPayload {
final Event[] events;
final EventSummary summary;
final List<EventSummary> summaries;

FlushPayload(Event[] events, EventSummary summary) {
FlushPayload(Event[] events, List<EventSummary> summaries) {
this.events = events;
this.summary = summary;
this.summaries = summaries;
}
}

Expand Down Expand Up @@ -774,7 +783,7 @@ public void run() {
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream(INITIAL_OUTPUT_BUFFER_SIZE);
Writer writer = new BufferedWriter(new OutputStreamWriter(buffer, Charset.forName("UTF-8")), INITIAL_OUTPUT_BUFFER_SIZE);
int outputEventCount = formatter.writeOutputEvents(payload.events, payload.summary, writer);
int outputEventCount = formatter.writeOutputEvents(payload.events, payload.summaries, writer);
writer.flush();
EventSender.Result result = eventsConfig.eventSender.sendAnalyticsEvents(
buffer.toByteArray(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.io.IOException;
import java.io.Writer;
import java.util.List;
import java.util.Map;

import static com.launchdarkly.sdk.internal.GsonHelpers.gsonInstance;
Expand All @@ -33,7 +34,7 @@ final class EventOutputFormatter {
config.privateAttributes.toArray(new AttributeRef[config.privateAttributes.size()]));
}

int writeOutputEvents(Event[] events, EventSummarizer.EventSummary summary, Writer writer) throws IOException {
int writeOutputEvents(Event[] events, List<EventSummarizer.EventSummary> summaries, Writer writer) throws IOException {
int count = 0;
JsonWriter jsonWriter = new JsonWriter(writer);
jsonWriter.beginArray();
Expand All @@ -42,9 +43,11 @@ int writeOutputEvents(Event[] events, EventSummarizer.EventSummary summary, Writ
count++;
}
}
if (!summary.isEmpty()) {
writeSummaryEvent(summary, jsonWriter);
count++;
for (EventSummarizer.EventSummary summary : summaries) {
if (!summary.isEmpty()) {
writeSummaryEvent(summary, jsonWriter);
count++;
}
}
jsonWriter.endArray();
jsonWriter.flush();
Expand Down Expand Up @@ -234,6 +237,11 @@ private void writeSummaryEvent(EventSummarizer.EventSummary summary, JsonWriter
jw.name("endDate");
jw.value(summary.endDate);

// Include context if present (per-context summarization)
if (summary.context != null) {
writeContext(summary.context, jw, true); // redact anonymous attributes
}

jw.name("features");
jw.beginObject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
*/
final class EventSummarizer {
private EventSummary eventsState;

private final LDContext context; // nullable - only set for per-context summarization

EventSummarizer() {
this.eventsState = new EventSummary();
this(null);
}

EventSummarizer(LDContext context) {
this.context = context;
this.eventsState = new EventSummary(context);
}

/**
Expand Down Expand Up @@ -76,22 +82,29 @@ boolean isEmpty() {
}

void clear() {
eventsState = new EventSummary();
eventsState = new EventSummary(context);
}

static final class EventSummary {
final Map<String, FlagInfo> counters;
long startDate;
long endDate;

final LDContext context; // nullable for backward compatibility

EventSummary() {
counters = new HashMap<>();
this((LDContext) null);
}

EventSummary(LDContext context) {
this.counters = new HashMap<>();
this.context = context;
}

EventSummary(EventSummary from) {
counters = new HashMap<>(from.counters);
startDate = from.startDate;
endDate = from.endDate;
context = from.context;
}

boolean isEmpty() {
Expand Down Expand Up @@ -142,7 +155,8 @@ void noteTimestamp(long time) {
public boolean equals(Object other) {
if (other instanceof EventSummary) {
EventSummary o = (EventSummary)other;
return o.counters.equals(counters) && startDate == o.startDate && endDate == o.endDate;
return o.counters.equals(counters) && startDate == o.startDate && endDate == o.endDate &&
Objects.equals(context, o.context);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.launchdarkly.sdk.internal.events;

import com.launchdarkly.sdk.LDContext;
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.internal.events.EventSummarizer.EventSummary;

import java.util.List;

/**
* Interface for event summarization strategies. Implementations can provide either
* single-summary (aggregated) or per-context summary behavior.
* <p>
* Note that implementations are deliberately not thread-safe, as they should always
* be called from EventProcessor's single message-processing thread.
*/
interface EventSummarizerInterface {
/**
* Adds information about an evaluation to the summary.
*
* @param timestamp the millisecond timestamp
* @param flagKey the flag key
* @param flagVersion the flag version, or -1 if the flag is unknown
* @param variation the result variation, or -1 if none
* @param value the result value
* @param defaultValue the application default value
* @param context the evaluation context
*/
void summarizeEvent(
long timestamp,
String flagKey,
int flagVersion,
int variation,
LDValue value,
LDValue defaultValue,
LDContext context
);

/**
* Gets all current summarized event data and resets the state to empty.
*
* @return list of summary states (may contain one or many summaries depending on implementation)
*/
List<EventSummary> getSummariesAndReset();

/**
* Restores the summarizer state from a previous snapshot. This is used when a flush
* operation fails, and we need to keep the summary data for the next attempt.
*
* @param previousSummaries the list of summaries to restore
*/
void restoreTo(List<EventSummary> previousSummaries);

/**
* Returns true if there is no summary data.
*
* @return true if the state is empty
*/
boolean isEmpty();

/**
* Clears all summary data.
*/
void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public final class EventsConfiguration {
final boolean initiallyInBackground;
final boolean initiallyOffline;
final List<AttributeRef> privateAttributes;
final boolean perContextSummarization;

/**
* Creates an instance.
*
*
* @param allAttributesPrivate true if all attributes are private
* @param capacity event buffer capacity (if zero or negative, a value of 1 is used to prevent errors)
* @param contextDeduplicator optional EventContextDeduplicator; null for client-side SDK
Expand Down Expand Up @@ -63,6 +64,45 @@ public EventsConfiguration(
boolean initiallyOffline,
Collection<AttributeRef> privateAttributes
) {
this(allAttributesPrivate, capacity, contextDeduplicator, diagnosticRecordingIntervalMillis,
diagnosticStore, eventSender, eventSendingThreadPoolSize, eventsUri, flushIntervalMillis,
initiallyInBackground, initiallyOffline, privateAttributes, false);
}

/**
* Creates an instance.
*
* @param allAttributesPrivate true if all attributes are private
* @param capacity event buffer capacity (if zero or negative, a value of 1 is used to prevent errors)
* @param contextDeduplicator optional EventContextDeduplicator; null for client-side SDK
* @param diagnosticRecordingIntervalMillis diagnostic recording interval
* @param diagnosticStore optional DiagnosticStore; null if diagnostics are disabled
* @param eventSender event delivery component; must not be null
* @param eventSendingThreadPoolSize number of worker threads for event delivery; zero to use the default
* @param eventsUri events base URI
* @param flushIntervalMillis event flush interval
* @param initiallyInBackground true if we should start out in background mode (see
* {@link DefaultEventProcessor#setInBackground(boolean)})
* @param initiallyOffline true if we should start out in offline mode (see
* {@link DefaultEventProcessor#setOffline(boolean)})
* @param privateAttributes list of private attribute references; may be null
* @param perContextSummarization true to generate separate summary events per context
*/
public EventsConfiguration(
boolean allAttributesPrivate,
int capacity,
EventContextDeduplicator contextDeduplicator,
long diagnosticRecordingIntervalMillis,
DiagnosticStore diagnosticStore,
EventSender eventSender,
int eventSendingThreadPoolSize,
URI eventsUri,
long flushIntervalMillis,
boolean initiallyInBackground,
boolean initiallyOffline,
Collection<AttributeRef> privateAttributes,
boolean perContextSummarization
) {
super();
this.allAttributesPrivate = allAttributesPrivate;
this.capacity = capacity >= 0 ? capacity : 1;
Expand All @@ -77,5 +117,6 @@ public EventsConfiguration(
this.initiallyInBackground = initiallyInBackground;
this.initiallyOffline = initiallyOffline;
this.privateAttributes = privateAttributes == null ? Collections.emptyList() : new ArrayList<>(privateAttributes);
this.perContextSummarization = perContextSummarization;
}
}
Loading