66import com .launchdarkly .logging .LDLogger ;
77import com .launchdarkly .sdk .LDContext ;
88import com .launchdarkly .sdk .android .subsystems .Callback ;
9- import com .launchdarkly .sdk .android .DataModel ;
109import com .launchdarkly .sdk .fdv2 .ChangeSet ;
1110import com .launchdarkly .sdk .fdv2 .SourceResultType ;
1211import com .launchdarkly .sdk .android .subsystems .DataSourceState ;
@@ -52,15 +51,17 @@ public interface DataSourceFactory<T> {
5251 private final long recoveryTimeoutSeconds ;
5352 private final ScheduledExecutorService sharedExecutor ;
5453
55- private final AtomicBoolean started = new AtomicBoolean (false );
54+ private final AtomicBoolean startCalled = new AtomicBoolean (false );
5655 private final AtomicBoolean startCompleted = new AtomicBoolean (false );
57- private final AtomicBoolean stopped = new AtomicBoolean (false );
58- /** Result of the first start (null = not yet completed). Used so second start() gets the same result. */
5956 private volatile Boolean startResult = null ;
6057 private volatile Throwable startError = null ;
61- private final Object startResultLock = new Object ();
6258 private final List <Callback <Boolean >> pendingStartCallbacks = new ArrayList <>();
59+ private final AtomicBoolean stopCalled = new AtomicBoolean (false );
60+ private final AtomicBoolean stopCompleted = new AtomicBoolean (false );
61+ private final List <Callback <Void >> pendingStopCallbacks = new ArrayList <>();
6362
63+ // This future is set by either the worker thread terminating or stop() being called.
64+ private final LDAwaitFuture <Throwable > shutdownCause = new LDAwaitFuture <>();
6465 /**
6566 * Convenience constructor using default fallback and recovery timeouts.
6667 * See {@link #FDv2DataSource(LDContext, List, List, DataSourceFactory, DataSourceUpdateSinkV2,
@@ -131,7 +132,7 @@ public interface DataSourceFactory<T> {
131132
132133 @ Override
133134 public void start (@ NonNull Callback <Boolean > resultCallback ) {
134- synchronized (startResultLock ) {
135+ synchronized (pendingStartCallbacks ) {
135136 // Late caller: the first start already finished, so replay its result immediately.
136137 if (startResult != null ) {
137138 if (startResult ) {
@@ -147,7 +148,7 @@ public void start(@NonNull Callback<Boolean> resultCallback) {
147148 }
148149
149150 // Only the first caller spawns the background thread; subsequent callers just queued above.
150- if (!started .compareAndSet (false , true )) {
151+ if (!startCalled .compareAndSet (false , true )) {
151152 return ;
152153 }
153154 // Do not reset stopped here: it is initialized false and start() runs once. Resetting would
@@ -160,7 +161,7 @@ public void start(@NonNull Callback<Boolean> resultCallback) {
160161 logger .info ("No initializers or synchronizers; data source will not connect." );
161162 dataSourceUpdateSink .setStatus (DataSourceState .VALID , null );
162163 tryCompleteStart (true , null );
163- return ;
164+ return ; // this will go to the finally block and block until stop sets shutdownCause
164165 }
165166
166167 if (sourceManager .hasInitializers ()) {
@@ -169,34 +170,41 @@ public void start(@NonNull Callback<Boolean> resultCallback) {
169170
170171 if (!sourceManager .hasAvailableSynchronizers ()) {
171172 if (!startCompleted .get ()) {
172- LDFailure failure = maybeReportUnexpectedExhaustion ( "All initializers exhausted and there are no available synchronizers." );
173- tryCompleteStart ( false , failure );
173+ // try to claim this is the cause of the shutdown, but it might have already been set by an intentional stop().
174+ shutdownCause . set ( new LDFailure ( "All initializers exhausted and there are no available synchronizers." , LDFailure . FailureType . UNKNOWN_ERROR ) );
174175 }
175176 return ;
176177 }
177178
178179 runSynchronizers (context , dataSourceUpdateSink );
179- LDFailure failure = maybeReportUnexpectedExhaustion ( "All data source acquisition methods have been exhausted." );
180- tryCompleteStart ( false , failure );
180+ // try to claim this is the cause of the shutdown, but it might have already been set by an intentional stop().
181+ shutdownCause . set ( new LDFailure ( "All data source acquisition methods have been exhausted." , LDFailure . FailureType . UNKNOWN_ERROR ) );
181182 } catch (Throwable t ) {
182183 logger .warn ("FDv2DataSource error: {}" , t .toString ());
183- tryCompleteStart (false , t );
184+ shutdownCause .set (t );
185+ } finally {
186+
187+ // Here we grab the cause of shutdown to report with the OFF status. This is done to ensure that
188+ // all status callbacks are handled by the worker thread. This future may have been set
189+ // by this thread itself, but it may have also been set by the stop() call via another thread.
190+ //
191+ // This intentionally blocks on this future in certain configurations and that may seem
192+ // inefficient, but it simplifies the implementation. Such cases are rare in practice.
193+ Throwable cause ;
194+ try {
195+ cause = shutdownCause .get ();
196+ } catch (Exception e ) {
197+ cause = e ;
198+ }
199+
200+ boolean intentional = cause instanceof CancellationException ;
201+ dataSourceUpdateSink .setStatus (DataSourceState .OFF , intentional ? null : cause );
202+ tryCompleteStart (false , cause ); // must always provide cause with false success
203+ tryCompleteStop ();
184204 }
185205 });
186206 }
187207
188- /**
189- * If not stopped, reports OFF with the given message (e.g. exhaustion).
190- * Returns the failure so callers can forward it to {@link #tryCompleteStart}.
191- */
192- private LDFailure maybeReportUnexpectedExhaustion (String message ) {
193- LDFailure failure = new LDFailure (message , LDFailure .FailureType .UNKNOWN_ERROR );
194- if (!stopped .get ()) {
195- dataSourceUpdateSink .setStatus (DataSourceState .OFF , failure );
196- }
197- return failure ;
198- }
199-
200208 /**
201209 * Records the start result and notifies all callbacks (first and any subsequent start() callers).
202210 * No-op if start has already completed. If success is false, error is not null.
@@ -208,7 +216,7 @@ private void tryCompleteStart(boolean success, Throwable error) {
208216 return ;
209217 }
210218 List <Callback <Boolean >> toNotify ;
211- synchronized (startResultLock ) {
219+ synchronized (pendingStartCallbacks ) {
212220 startResult = success ;
213221 startError = error ;
214222 toNotify = new ArrayList <>(pendingStartCallbacks );
@@ -225,11 +233,50 @@ private void tryCompleteStart(boolean success, Throwable error) {
225233
226234 @ Override
227235 public void stop (@ NonNull Callback <Void > completionCallback ) {
228- stopped .set (true );
229- sourceManager .close ();
230- // Caller owns sharedExecutor; we do not shut it down.
231- dataSourceUpdateSink .setStatus (DataSourceState .OFF , null );
232- completionCallback .onSuccess (null );
236+ synchronized (pendingStopCallbacks ) {
237+ if (stopCompleted .get ()) {
238+ // we have already stopped
239+ completionCallback .onSuccess (null );
240+ return ;
241+ }
242+
243+ // stopping is still in progress; queue the callback to be fired by tryCompleteStop.
244+ pendingStopCallbacks .add (completionCallback );
245+ }
246+
247+ // Only the first call to stop does anything
248+ if (!stopCalled .compareAndSet (false , true )) {
249+ return ;
250+ }
251+
252+ shutdownCause .set (new CancellationException ("Data source was stopped intentionally." ));
253+ sourceManager .close (); // unblocks worker thread so it can shutdown
254+
255+ // If the data source had never started, we need to complete the stop here
256+ if (!startCalled .get ()) {
257+ tryCompleteStop ();
258+ }
259+ }
260+
261+ /**
262+ * Notifies all stop callbacks (if there are any).
263+ * No-op if stop has already completed.
264+ */
265+ private void tryCompleteStop () {
266+ // Idempotent: only the first call wins.
267+ if (!stopCompleted .compareAndSet (false , true )) {
268+ return ;
269+ }
270+
271+ List <Callback <Void >> toNotify ;
272+ synchronized (pendingStopCallbacks ) {
273+ toNotify = new ArrayList <>(pendingStopCallbacks );
274+ pendingStopCallbacks .clear ();
275+ }
276+
277+ for (Callback <Void > c : toNotify ) {
278+ c .onSuccess (null );
279+ }
233280 }
234281
235282 @ Override
@@ -246,9 +293,6 @@ private void runInitializers(
246293 boolean anyDataReceived = false ;
247294 Initializer initializer = sourceManager .getNextInitializerAndSetActive ();
248295 while (initializer != null ) {
249- if (stopped .get ()) {
250- return ;
251- }
252296 try {
253297 FDv2SourceResult result = initializer .run ().get ();
254298
@@ -345,9 +389,6 @@ private void runSynchronizers(
345389 try {
346390 Synchronizer synchronizer = sourceManager .getNextAvailableSynchronizerAndSetActive ();
347391 while (synchronizer != null ) {
348- if (stopped .get ()) {
349- return ;
350- }
351392 int synchronizerCount = sourceManager .getAvailableSynchronizerCount ();
352393 boolean isPrime = sourceManager .isPrimeSynchronizer ();
353394 try {
0 commit comments