Skip to content

Commit 411c858

Browse files
committed
FME-15373-impressions: introduce ImpressionsManagerConfig and ImpressionsTelemetryRecorder; decouple ImpressionsManagerImpl and ProcessImpressionOptimized
- Add ImpressionsManagerConfig (replaces SplitClientConfig reads) - Add ImpressionsTelemetryRecorder interface + NoopImpressionsTelemetryRecorder - git mv ImpressionsManagerImpl: accepts ImpressionsManagerConfig + ImpressionsTelemetryRecorder, removes SplitClientConfig/TelemetryRuntimeProducer/SplitExecutorFactory deps, inlines scheduler with Executors.newScheduledThreadPool, removes @VisibleForTesting/checkNotNull - git mv ProcessImpressionOptimized: uses ImpressionsTelemetryRecorder instead of TelemetryRuntimeProducer - git mv ImpressionsManagerImplTest: updated all 25 tests to use new abstractions AI-Session-Id: 52375eb8-af89-45b8-bbad-1698b6636202 AI-Tool: claude-code AI-Model: unknown
1 parent 585508f commit 411c858

6 files changed

Lines changed: 263 additions & 302 deletions

File tree

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.split.client.impressions;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
5+
public final class ImpressionsManagerConfig {
6+
7+
private final ImpressionsManager.Mode _mode;
8+
private final int _impressionsRefreshRateSeconds;
9+
private final ThreadFactory _threadFactory;
10+
private final boolean _debugEnabled;
11+
12+
private ImpressionsManagerConfig(Builder builder) {
13+
_mode = builder._mode;
14+
_impressionsRefreshRateSeconds = builder._impressionsRefreshRateSeconds;
15+
_threadFactory = builder._threadFactory;
16+
_debugEnabled = builder._debugEnabled;
17+
}
18+
19+
public ImpressionsManager.Mode mode() { return _mode; }
20+
public int impressionsRefreshRateSeconds() { return _impressionsRefreshRateSeconds; }
21+
public ThreadFactory threadFactory() { return _threadFactory; }
22+
public boolean debugEnabled() { return _debugEnabled; }
23+
24+
public static Builder builder() { return new Builder(); }
25+
26+
public static final class Builder {
27+
private ImpressionsManager.Mode _mode = ImpressionsManager.Mode.OPTIMIZED;
28+
private int _impressionsRefreshRateSeconds = 60;
29+
private ThreadFactory _threadFactory = null;
30+
private boolean _debugEnabled = false;
31+
32+
public Builder mode(ImpressionsManager.Mode mode) { _mode = mode; return this; }
33+
public Builder impressionsRefreshRateSeconds(int rate) { _impressionsRefreshRateSeconds = rate; return this; }
34+
public Builder threadFactory(ThreadFactory tf) { _threadFactory = tf; return this; }
35+
public Builder debugEnabled(boolean debug) { _debugEnabled = debug; return this; }
36+
37+
public ImpressionsManagerConfig build() { return new ImpressionsManagerConfig(this); }
38+
}
39+
}

client/src/main/java/io/split/client/impressions/ImpressionsManagerImpl.java renamed to impressions/src/main/java/io/split/client/impressions/ImpressionsManagerImpl.java

Lines changed: 69 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,114 @@
11
package io.split.client.impressions;
22

3-
import com.google.common.annotations.VisibleForTesting;
4-
import io.split.client.SplitClientConfig;
53
import io.split.client.dtos.DecoratedImpression;
64
import io.split.client.dtos.KeyImpression;
75
import io.split.client.dtos.TestImpressions;
86
import io.split.client.impressions.strategy.ProcessImpressionNone;
97
import io.split.client.impressions.strategy.ProcessImpressionStrategy;
10-
import io.split.client.utils.SplitExecutorFactory;
11-
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
12-
import io.split.telemetry.storage.TelemetryRuntimeProducer;
138
import org.slf4j.Logger;
149
import org.slf4j.LoggerFactory;
1510

1611
import java.io.Closeable;
17-
import java.net.URISyntaxException;
1812
import java.util.ArrayList;
1913
import java.util.List;
2014
import java.util.Objects;
15+
import java.util.concurrent.Executors;
2116
import java.util.concurrent.ScheduledExecutorService;
17+
import java.util.concurrent.ThreadFactory;
2218
import java.util.concurrent.TimeUnit;
2319
import java.util.stream.Collectors;
2420
import java.util.stream.Stream;
2521

26-
import static com.google.common.base.Preconditions.checkNotNull;
27-
28-
/**
29-
* Created by patricioe on 6/17/16.
30-
*/
3122
public class ImpressionsManagerImpl implements ImpressionsManager, Closeable {
3223

3324
private static final Logger _log = LoggerFactory.getLogger(ImpressionsManagerImpl.class);
3425

3526
private static final long BULK_INITIAL_DELAY_SECONDS = 10L;
3627
private static final long COUNT_INITIAL_DELAY_SECONDS = 100L;
3728
private static final long COUNT_REFRESH_RATE_SECONDS = 30 * 60;
38-
private final SplitClientConfig _config;
29+
30+
private final ImpressionsManagerConfig _config;
3931
private final ImpressionsStorageProducer _impressionsStorageProducer;
4032
private final ImpressionsStorageConsumer _impressionsStorageConsumer;
4133
private final ScheduledExecutorService _scheduler;
4234
private final ImpressionsSender _impressionsSender;
4335
private final ImpressionListener _listener;
4436
private final ImpressionsManager.Mode _impressionsMode;
45-
private TelemetryRuntimeProducer _telemetryRuntimeProducer;
37+
private final ImpressionsTelemetryRecorder _telemetryRecorder;
4638
private ImpressionCounter _counter;
4739
private ProcessImpressionStrategy _processImpressionStrategy;
4840
private ProcessImpressionNone _processImpressionNone;
4941

50-
private final int _impressionsRefreshRate;
51-
52-
public static ImpressionsManagerImpl instance(SplitClientConfig config,
53-
TelemetryRuntimeProducer telemetryRuntimeProducer,
54-
ImpressionsStorageConsumer impressionsStorageConsumer,
55-
ImpressionsStorageProducer impressionsStorageProducer,
56-
ImpressionsSender impressionsSender,
57-
ProcessImpressionNone processImpressionNone,
58-
ProcessImpressionStrategy processImpressionStrategy,
59-
ImpressionCounter counter,
60-
ImpressionListener listener) throws URISyntaxException {
61-
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
42+
public static ImpressionsManagerImpl instance(ImpressionsManagerConfig config,
43+
ImpressionsTelemetryRecorder telemetryRecorder,
44+
ImpressionsStorageConsumer impressionsStorageConsumer,
45+
ImpressionsStorageProducer impressionsStorageProducer,
46+
ImpressionsSender impressionsSender,
47+
ProcessImpressionNone processImpressionNone,
48+
ProcessImpressionStrategy processImpressionStrategy,
49+
ImpressionCounter counter,
50+
ImpressionListener listener) {
51+
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRecorder, impressionsStorageConsumer,
6252
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
6353
}
6454

65-
public static ImpressionsManagerImpl instanceForTest(SplitClientConfig config,
66-
ImpressionsSender impressionsSender,
67-
TelemetryRuntimeProducer telemetryRuntimeProducer,
68-
ImpressionsStorageConsumer impressionsStorageConsumer,
69-
ImpressionsStorageProducer impressionsStorageProducer,
70-
ProcessImpressionNone processImpressionNone,
71-
ProcessImpressionStrategy processImpressionStrategy,
72-
ImpressionCounter counter,
73-
ImpressionListener listener) {
74-
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
55+
public static ImpressionsManagerImpl instanceForTest(ImpressionsManagerConfig config,
56+
ImpressionsSender impressionsSender,
57+
ImpressionsTelemetryRecorder telemetryRecorder,
58+
ImpressionsStorageConsumer impressionsStorageConsumer,
59+
ImpressionsStorageProducer impressionsStorageProducer,
60+
ProcessImpressionNone processImpressionNone,
61+
ProcessImpressionStrategy processImpressionStrategy,
62+
ImpressionCounter counter,
63+
ImpressionListener listener) {
64+
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRecorder, impressionsStorageConsumer,
7565
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
7666
}
7767

78-
private ImpressionsManagerImpl(SplitClientConfig config,
79-
ImpressionsSender impressionsSender,
80-
TelemetryRuntimeProducer telemetryRuntimeProducer,
81-
ImpressionsStorageConsumer impressionsStorageConsumer,
82-
ImpressionsStorageProducer impressionsStorageProducer,
83-
ProcessImpressionNone processImpressionNone,
84-
ProcessImpressionStrategy processImpressionStrategy,
85-
ImpressionCounter impressionCounter,
86-
ImpressionListener impressionListener) {
87-
88-
89-
_config = checkNotNull(config);
90-
_impressionsMode = checkNotNull(config.impressionsMode());
91-
_impressionsStorageConsumer = checkNotNull(impressionsStorageConsumer);
92-
_impressionsStorageProducer = checkNotNull(impressionsStorageProducer);
93-
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
94-
_processImpressionNone = checkNotNull(processImpressionNone);
95-
_processImpressionStrategy = checkNotNull(processImpressionStrategy);
68+
private ImpressionsManagerImpl(ImpressionsManagerConfig config,
69+
ImpressionsSender impressionsSender,
70+
ImpressionsTelemetryRecorder telemetryRecorder,
71+
ImpressionsStorageConsumer impressionsStorageConsumer,
72+
ImpressionsStorageProducer impressionsStorageProducer,
73+
ProcessImpressionNone processImpressionNone,
74+
ProcessImpressionStrategy processImpressionStrategy,
75+
ImpressionCounter impressionCounter,
76+
ImpressionListener impressionListener) {
77+
_config = Objects.requireNonNull(config);
78+
_impressionsMode = Objects.requireNonNull(config.mode());
79+
_impressionsStorageConsumer = Objects.requireNonNull(impressionsStorageConsumer);
80+
_impressionsStorageProducer = Objects.requireNonNull(impressionsStorageProducer);
81+
_telemetryRecorder = Objects.requireNonNull(telemetryRecorder);
82+
_processImpressionNone = Objects.requireNonNull(processImpressionNone);
83+
_processImpressionStrategy = Objects.requireNonNull(processImpressionStrategy);
9684
_impressionsSender = impressionsSender;
9785
_counter = impressionCounter;
9886

99-
_scheduler = SplitExecutorFactory.buildScheduledExecutorService(config.getThreadFactory(), "Split-ImpressionsManager-%d", 2);
87+
ThreadFactory tf = config.threadFactory();
88+
_scheduler = tf != null
89+
? Executors.newScheduledThreadPool(2, tf)
90+
: Executors.newScheduledThreadPool(2);
10091
_listener = impressionListener;
101-
102-
_impressionsRefreshRate = config.impressionsRefreshRate();
10392
}
10493

10594
@Override
106-
public void start(){
107-
switch (_impressionsMode){
95+
public void start() {
96+
switch (_impressionsMode) {
10897
case OPTIMIZED:
109-
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
110-
TimeUnit.SECONDS);
111-
_scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS, _impressionsRefreshRate, TimeUnit.SECONDS);
98+
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS,
99+
COUNT_REFRESH_RATE_SECONDS, TimeUnit.SECONDS);
100+
_scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS,
101+
_config.impressionsRefreshRateSeconds(), TimeUnit.SECONDS);
112102
break;
113103
case DEBUG:
114-
_scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS, _impressionsRefreshRate, TimeUnit.SECONDS);
115-
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
116-
TimeUnit.SECONDS);
104+
_scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS,
105+
_config.impressionsRefreshRateSeconds(), TimeUnit.SECONDS);
106+
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS,
107+
COUNT_REFRESH_RATE_SECONDS, TimeUnit.SECONDS);
117108
break;
118109
case NONE:
119-
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
120-
TimeUnit.SECONDS);
110+
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS,
111+
COUNT_REFRESH_RATE_SECONDS, TimeUnit.SECONDS);
121112
break;
122113
}
123114
}
@@ -142,40 +133,40 @@ public void track(List<DecoratedImpression> decoratedImpressions) {
142133
if (!Objects.isNull(impressionsResult.getImpressionsToQueue())) {
143134
impressionsForLogs.addAll(impressionsResult.getImpressionsToQueue());
144135
}
145-
if (!Objects.isNull(impressionsResult.getImpressionsToListener()))
136+
if (!Objects.isNull(impressionsResult.getImpressionsToListener())) {
146137
impressionsToListener.addAll(impressionsResult.getImpressionsToListener());
138+
}
147139
}
148140
int totalImpressions = impressionsForLogs.size();
149-
long queued = _impressionsStorageProducer.put(impressionsForLogs.stream().map(KeyImpression::fromImpression).collect(Collectors.toList()));
141+
long queued = _impressionsStorageProducer.put(
142+
impressionsForLogs.stream().map(KeyImpression::fromImpression).collect(Collectors.toList()));
150143
if (queued < totalImpressions) {
151-
_telemetryRuntimeProducer.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, totalImpressions-queued);
144+
_telemetryRecorder.recordImpressionsDropped(totalImpressions - queued);
152145
}
153-
_telemetryRuntimeProducer.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED, queued);
146+
_telemetryRecorder.recordImpressionsQueued(queued);
154147

155-
if (_listener!=null){
148+
if (_listener != null) {
156149
impressionsToListener.forEach(imp -> _listener.log(imp));
157150
}
158151
}
159152

160153
@Override
161154
public void close() {
162155
try {
163-
if(_listener!= null){
156+
if (_listener != null) {
164157
_listener.close();
165158
_log.info("Successful shutdown of ImpressionListener");
166159
}
167160
_scheduler.shutdown();
168161
sendImpressions();
169-
if(_counter != null) {
162+
if (_counter != null) {
170163
sendImpressionCounters();
171164
}
172165
} catch (Exception e) {
173166
_log.warn("Unable to close ImpressionsManager properly", e);
174167
}
175-
176168
}
177169

178-
@VisibleForTesting
179170
/* package private */ void sendImpressions() {
180171
if (_impressionsStorageConsumer.isFull()) {
181172
_log.warn("Split SDK impressions queue is full. Impressions may have been dropped. Consider increasing capacity.");
@@ -184,7 +175,7 @@ public void close() {
184175
long start = System.currentTimeMillis();
185176
List<KeyImpression> impressions = _impressionsStorageConsumer.pop();
186177
if (impressions.isEmpty()) {
187-
return; // Nothing to send
178+
return;
188179
}
189180

190181
_impressionsSender.postImpressionsBulk(TestImpressions.fromKeyImpressions(impressions));
@@ -194,15 +185,13 @@ public void close() {
194185
}
195186
}
196187

197-
@VisibleForTesting
198-
/* package private */ void sendImpressionCounters() {
188+
/* package private */ void sendImpressionCounters() {
199189
if (!_counter.isEmpty()) {
200190
_impressionsSender.postCounters(_counter.popAll());
201191
}
202192
}
203193

204-
@VisibleForTesting
205194
/* package private */ ImpressionCounter getCounter() {
206195
return _counter;
207196
}
208-
}
197+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.split.client.impressions;
2+
3+
public interface ImpressionsTelemetryRecorder {
4+
void recordImpressionsDropped(long count);
5+
void recordImpressionsQueued(long count);
6+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.split.client.impressions;
2+
3+
public class NoopImpressionsTelemetryRecorder implements ImpressionsTelemetryRecorder {
4+
@Override
5+
public void recordImpressionsDropped(long count) {}
6+
7+
@Override
8+
public void recordImpressionsQueued(long count) {}
9+
}

0 commit comments

Comments
 (0)